diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 77814fc9831fa3df423749b0cfedad58e1d8b042..c4cea3d361dad857509bd8124a45a0bb8709ff36 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -13,6 +13,7 @@ typedef uint64_t NetworkRequestId; enum Opcode : uint8_t { kOpcodeUnknownOp = 1, kOpcodeTransferData, + kOpcodeGetBufferData, kOpcodeAuthorize, kOpcodeCount, }; diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index 79a38863681f8c50c45d0b10375954e00473cf2f..b88f610b4c634cf4f9fb44d35c96b70b8390482e 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -22,7 +22,9 @@ namespace asapo { -const int SystemIO::kNetBufferSize = 1024 * 1024; //* 1024 ; //MiByte +const int SystemIO::kNetBufferSize = 1024 * 1024; +const int SystemIO::kWaitTimeoutMs = 1000; + /******************************************************************************* * system_io.cpp * @@ -628,10 +630,11 @@ ListSocketDescriptors SystemIO::WaitSocketsActivity(SocketDescriptor master_sock timeval timeout; timeout.tv_sec = 0; - timeout.tv_usec = 1000; + timeout.tv_usec = kWaitTimeoutMs; auto activity = select(max_sd + 1, &readfds, NULL, NULL, &timeout); if (activity == 0) { // timeout + *err = IOErrorTemplates::kTimeout.Generate(); return {}; } if ((activity < 0) && (errno != EINTR)) { diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index d1e551a541e664b036bcd19a4803321e26980c3f..aa1e8fedd6f48dd5a4209cc8b0167c2a638673c5 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -25,6 +25,7 @@ namespace asapo { class SystemIO final : public IO { private: static const int kNetBufferSize;//TODO: need to set by config + static const int kWaitTimeoutMs; void ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const; diff --git a/common/cpp/src/system_io/system_io_windows.cpp b/common/cpp/src/system_io/system_io_windows.cpp index d4437808db64152cf11dea6602a29d10b8681f40..07a5e3abd3cf49a2eab8cf8173473a611cda9474 100644 --- a/common/cpp/src/system_io/system_io_windows.cpp +++ b/common/cpp/src/system_io/system_io_windows.cpp @@ -18,9 +18,9 @@ namespace asapo { // use IOInstance and static variable to init window sockets on program start end cleanup on exit class IOInstance { - public: - IOInstance(); - ~IOInstance(); + public: + IOInstance(); + ~IOInstance(); }; static IOInstance instance; IOInstance::IOInstance() { @@ -307,7 +307,7 @@ std::string SystemIO::AddressFromSocket(SocketDescriptor socket) const noexcept sockaddr_in client_address{}; static size_t client_address_size = sizeof(sockaddr_in); - auto res = getpeername(socket, reinterpret_cast<sockaddr*>(&client_address),(int*) &client_address_size); + auto res = getpeername(socket, reinterpret_cast<sockaddr*>(&client_address), (int*) &client_address_size); if (res != 0) { return GetLastError()->Explain(); } diff --git a/receiver/src/receiver_data_server/receiver_data_server.cpp b/receiver/src/receiver_data_server/receiver_data_server.cpp index 09ea980658e3a5668ec0ab79e0b27fbdb88c08db..65a6b0ee8d796512790fa124401607d3a84680d8 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server.cpp @@ -12,6 +12,9 @@ void ReceiverDataServer::Run() { while (true) { Error err; auto requests = net__->GetNewRequests(&err); + if (err == IOErrorTemplates::kTimeout) { + continue; + } if (!err) { err = request_pool__->AddRequests(requests); } diff --git a/receiver/src/receiver_data_server/request.cpp b/receiver/src/receiver_data_server/request.cpp index 85a4b136f49f3e469a6faf5b230a1bcef285dc8f..fcc6b7fee0cbe6f02bab9969c722acd406b7edde 100644 --- a/receiver/src/receiver_data_server/request.cpp +++ b/receiver/src/receiver_data_server/request.cpp @@ -2,4 +2,7 @@ namespace asapo { +Request::Request(uint64_t net_id, const NetServer* server) : net_id{net_id}, server{server} { + +} } \ No newline at end of file diff --git a/receiver/src/receiver_data_server/request.h b/receiver/src/receiver_data_server/request.h index 712dcb23e8fb375bbb5158328ac5af652f41d633..01fff594673765fd8fb8c2952dd10f2d2a883502 100644 --- a/receiver/src/receiver_data_server/request.h +++ b/receiver/src/receiver_data_server/request.h @@ -1,10 +1,17 @@ #ifndef ASAPO_REQUEST_H #define ASAPO_REQUEST_H +#include "common/networking.h" + namespace asapo { -class Request { +class NetServer; +struct Request { + explicit Request(uint64_t net_id, const NetServer* server); + GenericRequestHeader header; + const uint64_t net_id; + const NetServer* server; }; } diff --git a/receiver/src/receiver_data_server/tcp_server.cpp b/receiver/src/receiver_data_server/tcp_server.cpp index e9d20df99b638fce711436c69949ed9f39c10355..fa2d2e6d966135893ad389310aea77457ef6858b 100644 --- a/receiver/src/receiver_data_server/tcp_server.cpp +++ b/receiver/src/receiver_data_server/tcp_server.cpp @@ -10,12 +10,12 @@ TcpServer::TcpServer(std::string address) : io__{GenerateDefaultIO()}, log__{Get address_{std::move(address)} {} Error TcpServer::InitializeMasterSocketIfNeeded() const noexcept { - Error err; + Error + err; if (master_socket_ == kDisconnectedSocketDescriptor) { master_socket_ = io__->CreateAndBindIPTCPSocketListener(address_, kMaxPendingConnections, &err); if (!err) { log__->Info("data server listening on " + address_); - sockets_to_listen_.push_back(master_socket_); } else { log__->Error("dataserver cannot listen on " + address_ + ": " + err->Explain()); } @@ -23,7 +23,6 @@ Error TcpServer::InitializeMasterSocketIfNeeded() const noexcept { return err; } - ListSocketDescriptors TcpServer::GetActiveSockets(Error* err) const noexcept { std::vector<std::string> new_connections; auto sockets = io__->WaitSocketsActivity(master_socket_, &sockets_to_listen_, &new_connections, err); @@ -37,6 +36,41 @@ ListSocketDescriptors TcpServer::GetActiveSockets(Error* err) const noexcept { return sockets; } +void TcpServer::CloseSocket(SocketDescriptor socket) const noexcept { + sockets_to_listen_.erase(std::remove(sockets_to_listen_.begin(), sockets_to_listen_.end(), socket), + sockets_to_listen_.end()); + io__->CloseSocket(socket, nullptr); + log__->Debug("connection " + io__->AddressFromSocket(socket) + " closed"); +} + +Request TcpServer::ReadRequest(SocketDescriptor socket, Error* err) const noexcept { + Request request{(uint64_t) socket, this}; + io__->Receive(socket, &request.header, + sizeof(GenericRequestHeader), err); + if (*err == ErrorTemplates::kEndOfFile) { + CloseSocket(socket); + } else if (*err) { + log__->Error("error getting next request from " + io__->AddressFromSocket(socket) + ": " + (*err)-> + Explain() + ); + } + return request; +} + +Requests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noexcept { + Requests requests; + for (auto client : sockets) { + Error + err; + auto request = ReadRequest(client, &err); + if (err) { + continue; + } + requests.emplace_back(std::move(request)); + } + return requests; +} + Requests TcpServer::GetNewRequests(Error* err) const noexcept { if (*err = InitializeMasterSocketIfNeeded()) { return {}; @@ -47,20 +81,15 @@ Requests TcpServer::GetNewRequests(Error* err) const noexcept { return {}; } - for (auto client: sockets) { - GenericRequestHeader generic_request_header; - io__-> Receive(client, &generic_request_header, - sizeof(GenericRequestHeader), err); - if(*err) { - log__->Error("error getting next request from " + io__->AddressFromSocket(client) + ": " + (*err)-> - Explain() - ); - continue; - } - } - + return ReadRequests(sockets); +} - return {Requests{Request{}}}; +TcpServer::~TcpServer() { + if (!io__) return; // need for test that override io__ to run + for (auto client: sockets_to_listen_) { + io__->CloseSocket(client, nullptr); + } + io__->CloseSocket(master_socket_, nullptr); } diff --git a/receiver/src/receiver_data_server/tcp_server.h b/receiver/src/receiver_data_server/tcp_server.h index 6da336518ae707fae3c50d60e98e33a910e37870..2248cf19256baf6e1921b97bd497c2534060da9e 100644 --- a/receiver/src/receiver_data_server/tcp_server.h +++ b/receiver/src/receiver_data_server/tcp_server.h @@ -12,12 +12,16 @@ const int kMaxPendingConnections = 5; class TcpServer : public NetServer { public: TcpServer(std::string address); + ~TcpServer(); virtual Requests GetNewRequests(Error* err) const noexcept override ; std::unique_ptr<IO> io__; const AbstractLogger* log__; private: + void CloseSocket(SocketDescriptor socket) const noexcept; ListSocketDescriptors GetActiveSockets(Error* err) const noexcept; Error InitializeMasterSocketIfNeeded() const noexcept; + Request ReadRequest(SocketDescriptor socket, Error* err) const noexcept; + Requests ReadRequests(const ListSocketDescriptors& sockets) const noexcept; mutable SocketDescriptor master_socket_{kDisconnectedSocketDescriptor}; mutable ListSocketDescriptors sockets_to_listen_; std::string address_; diff --git a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp index ef752a114eb6f110c43f286e15ce5653188057b5..5cb9bf6812f89b0877d8ebfa488c78718818799a 100644 --- a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp +++ b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp @@ -60,6 +60,23 @@ class ReceiverDataServerTests : public Test { } }; +TEST_F(ReceiverDataServerTests, TimeoutGetNewRequests) { + EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( + DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kTimeout.Generate().release()), + Return(asapo::Requests{}) + ) + ).WillOnce( + DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(asapo::Requests{}) + ) + ); + + EXPECT_CALL(mock_pool, AddRequests_t(_)).Times(0); + + data_server.Run(); +} + + TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) { EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( diff --git a/receiver/unittests/receiver_data_server/test_tcp_server.cpp b/receiver/unittests/receiver_data_server/test_tcp_server.cpp index 648b7d8cd613bd81af724e457871ac7c95968d92..d0cdcecdf757540a69c62f5e50c9d684621e9e56 100644 --- a/receiver/unittests/receiver_data_server/test_tcp_server.cpp +++ b/receiver/unittests/receiver_data_server/test_tcp_server.cpp @@ -4,7 +4,7 @@ #include "unittests/MockLogger.h" #include "unittests/MockIO.h" - +#include "io/io_factory.h" #include "../../src/receiver_data_server/tcp_server.h" using ::testing::Test; @@ -21,11 +21,13 @@ using ::testing::NiceMock; using ::testing::HasSubstr; using ::testing::Contains; using ::testing::IsEmpty; +using ::testing::Mock; + using asapo::TcpServer; using asapo::MockIO; using asapo::Error; - +using asapo::ListSocketDescriptors; namespace { TEST(TCPServer, Constructor) { @@ -35,25 +37,34 @@ TEST(TCPServer, Constructor) { } +uint64_t expected_id = 124; +std::string expected_address = "somehost:123"; + class TCPServerTests : public Test { public: - std::string expected_address = "somehost:123"; - TcpServer tcp_server{expected_address}; + TcpServer tcp_server {expected_address}; NiceMock<MockIO> mock_io; NiceMock<asapo::MockLogger> mock_logger; asapo::SocketDescriptor expected_master_socket = 1; - asapo::ListSocketDescriptors expected_client_sockets{2, 3, 4}; + ListSocketDescriptors expected_client_sockets{2, 3, 4}; std::vector<std::string> expected_new_connections = {"test1", "test2"}; - void SetUp() override { + void SetUp() override { tcp_server.io__ = std::unique_ptr<asapo::IO> {&mock_io}; tcp_server.log__ = &mock_logger; + for (auto conn : expected_client_sockets) { + std::string connected_uri = std::to_string(conn); + ON_CALL(mock_io, AddressFromSocket_t(conn)).WillByDefault(Return(connected_uri)); + } + } void TearDown() override { tcp_server.io__.release(); } void ExpectListenMaster(bool ok); - void WaitSockets(bool ok); + void WaitSockets(bool ok, ListSocketDescriptors clients = {}); void MockReceiveRequest(bool ok ); + void ExpectReceiveOk(); + void ExpectReceiveRequestEof(); }; void TCPServerTests::ExpectListenMaster(bool ok) { @@ -64,9 +75,10 @@ void TCPServerTests::ExpectListenMaster(bool ok) { )); } -void TCPServerTests::WaitSockets(bool ok) { - EXPECT_CALL(mock_io, WaitSocketsActivity_t(expected_master_socket, _, _, _)).WillOnce(DoAll( +void TCPServerTests::WaitSockets(bool ok, ListSocketDescriptors clients) { + EXPECT_CALL(mock_io, WaitSocketsActivity_t(expected_master_socket, testing::Pointee(clients), _, _)).WillOnce(DoAll( SetArgPointee<2>(expected_new_connections), + SetArgPointee<1>(expected_client_sockets), SetArgPointee<3>(ok ? nullptr : asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), Return(ok ? expected_client_sockets : asapo::ListSocketDescriptors{}) )); @@ -82,7 +94,6 @@ void TCPServerTests::WaitSockets(bool ok) { } } - TEST_F(TCPServerTests, GetNewRequestsInitializesSocket) { Error err; ExpectListenMaster(false); @@ -112,12 +123,44 @@ void TCPServerTests::MockReceiveRequest(bool ok ) { ); if (!ok) { std::string connected_uri = std::to_string(conn); - EXPECT_CALL(mock_io, AddressFromSocket_t(conn)).WillOnce(Return(connected_uri)); EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("request"), HasSubstr(connected_uri)))); } } } +void TCPServerTests::ExpectReceiveRequestEof() { + for (auto conn : expected_client_sockets) { + EXPECT_CALL(mock_io, Receive_t(conn, _, _, _)) + .WillOnce( + DoAll(SetArgPointee<3>(asapo::ErrorTemplates::kEndOfFile.Generate().release()), + Return(0)) + ); + EXPECT_CALL(mock_io, CloseSocket_t(conn, _)); + + std::string connected_uri = std::to_string(conn); + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("connection"), HasSubstr("closed"), HasSubstr(connected_uri)))); + } +} + + +ACTION_P(A_ReceiveData, op_code) { + ((asapo::GenericRequestHeader*)arg1)->op_code = op_code; + ((asapo::GenericRequestHeader*)arg1)->data_id = expected_id; +} + + +void TCPServerTests::ExpectReceiveOk() { + for (auto conn : expected_client_sockets) { + EXPECT_CALL(mock_io, Receive_t(conn, _, sizeof(asapo::GenericRequestHeader), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + A_ReceiveData(asapo::kOpcodeUnknownOp), + testing::ReturnArg<2>() + )); + } +} + TEST_F(TCPServerTests, GetNewRequestsWaitsSocketActivitiesError) { Error err; @@ -138,10 +181,35 @@ TEST_F(TCPServerTests, GetNewRequestsWaitsSocketReceiveFailure) { auto requests = tcp_server.GetNewRequests(&err); - ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(requests, IsEmpty()); + + Mock::VerifyAndClearExpectations(&mock_io); + + WaitSockets(false, expected_client_sockets); + tcp_server.GetNewRequests(&err); -// ASSERT_THAT(requests, IsEmpty()); } +TEST_F(TCPServerTests, GetNewRequestsReadEof) { + Error err; + ExpectListenMaster(true); + WaitSockets(true); + ExpectReceiveRequestEof(); + + auto requests = tcp_server.GetNewRequests(&err); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(requests, IsEmpty()); + + Mock::VerifyAndClearExpectations(&mock_io); + + WaitSockets(false, {}); + + tcp_server.GetNewRequests(&err); + +} + + } diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp index 1ec50811e24da62b1931bcb5c554702fdf43db25..cad61bde71fb9e87c478d4ce1d98c5b5bd507c5b 100644 --- a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp +++ b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp @@ -70,7 +70,7 @@ std::unique_ptr<std::thread> CreateEchoServerThread() { ExitIfErrIsNotOk(&err, 108); } io->CloseSocket(master_socket, &err); - std::cout << "[SERVER] finished" << std::endl; + std::cout << "[SERVER] finished" << std::endl; }); }