diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index c1911c2521b7c13c6bd59eec633ae006169f8107..dbcfb2a544e0aeaee1ffea67cf704f55e54e4e5c 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -11,8 +11,9 @@ namespace asapo { typedef uint64_t NetworkRequestId; enum Opcode : uint8_t { - kOpcodeUnknownOp, + kOpcodeUnknownOp = 1, kOpcodeTransferData, + kOpcodeAuthorize, kOpcodeCount, }; @@ -26,43 +27,31 @@ enum NetworkErrorCode : uint16_t { //TODO need to use an serialization framework to ensure struct consistency on different computers -/** - * @defgroup RPC - * RPC always return a response to a corresponding request - * @{ - */ - -const std::size_t kMaxFileNameSize = 1024; -const std::size_t kMaxFileMessageSize = 1024; +const std::size_t kMaxMessageSize = 1024; struct GenericRequestHeader { GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0, - uint64_t i_data_size = 0, const std::string& i_file_name = ""): + uint64_t i_data_size = 0, const std::string& i_message = ""): op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size} { - auto size = std::min(i_file_name.size() + 1, kMaxFileNameSize); - memcpy(file_name, i_file_name.c_str(), size); + auto size = std::min(i_message.size() + 1, kMaxMessageSize); + memcpy(message, i_message.c_str(), size); } Opcode op_code; uint64_t data_id; uint64_t data_size; - char file_name[kMaxFileNameSize]; + char message[kMaxMessageSize]; }; struct GenericNetworkResponse { Opcode op_code; NetworkRequestId request_id; NetworkErrorCode error_code; - char message[kMaxFileMessageSize]; + char message[kMaxMessageSize]; }; -/** - * Possible error codes: - * - ::NET_ERR__FILENAME_ALREADY_IN_USE - * - ::NET_ERR__ALLOCATE_STORAGE_FAILED - */ + struct SendDataResponse : GenericNetworkResponse { }; -/** @} */ } diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 59ddc768640373c0c63d2ce6661724feff7ef2fc..0fd95dafdc4da7eaa362b6819bc2a916640b2556 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -50,7 +50,7 @@ Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size, s return err; } - return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{request_header, data, callback}}); + return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{"",request_header, data, callback}}); } void ProducerImpl::SetLogLevel(LogLevel level) { diff --git a/producer/api/src/request.h b/producer/api/src/request.h index 4c0a7ccdd4867bdf8d019e43634adaf5c69e01e1..59c7589f9c93011d85a730b46bf3594ef6868c74 100644 --- a/producer/api/src/request.h +++ b/producer/api/src/request.h @@ -7,6 +7,7 @@ namespace asapo { struct Request { + std::string beamtime_id; GenericRequestHeader header; const void* data; RequestCallback callback; diff --git a/producer/api/src/request_handler_filesystem.cpp b/producer/api/src/request_handler_filesystem.cpp index 93ac0b49945030e02dbee22feec49ccaebb3406c..e43acd01fc0ab9efe18b22b40674bf9cb9775c3d 100644 --- a/producer/api/src/request_handler_filesystem.cpp +++ b/producer/api/src/request_handler_filesystem.cpp @@ -14,7 +14,7 @@ RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folde } Error RequestHandlerFilesystem::ProcessRequestUnlocked(const Request* request) { - std::string fullpath = destination_folder_ + "/" + request->header.file_name + ".bin"; + std::string fullpath = destination_folder_ + "/" + request->header.message + ".bin"; auto err = io__->WriteDataToFile(fullpath, (uint8_t*)request->data, request->header.data_size); if (request->callback) { request->callback(request->header, std::move(err)); diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index 297a7d45f462ff4709b7a6f36422be10d5f388a8..52e64325eb054d464d0c99158f2e481bfbcdc02e 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -14,8 +14,20 @@ RequestHandlerTcp::RequestHandlerTcp(ReceiverDiscoveryService* discovery_service } -Error RequestHandlerTcp::ConnectToReceiver(const std::string& receiver_address) { +Error RequestHandlerTcp::Authorize(const std::string& beamtime_id) { + GenericRequestHeader header{kOpcodeAuthorize,0,0,beamtime_id.c_str()}; Error err; + io__->Send(sd_, &header, sizeof(header), &err); + if(err) { + return err; + } + return ReceiveResponse(); +} + + +Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id,const std::string& receiver_address) { + Error err; + sd_ = io__->CreateAndConnectIPTCPSocket(receiver_address, &err); if(err != nullptr) { log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain()); @@ -23,7 +35,8 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& receiver_address) } log__->Info("connected to receiver at " + receiver_address); - err = ReceiveResponse(); + connected_receiver_uri_ = receiver_address; + err = Authorize(beamtime_id); if (err != nullptr) { log__->Error("authorization failed at " + receiver_address + " - " + err->Explain()); Disconnect(); @@ -32,7 +45,6 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& receiver_address) log__->Debug("authorized at " + receiver_address); - connected_receiver_uri_ = receiver_address; return nullptr; } @@ -152,7 +164,7 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { } for (auto receiver_uri : receivers_list_) { if (Disconnected()) { - auto err = ConnectToReceiver(receiver_uri); + auto err = ConnectToReceiver(request->beamtime_id,receiver_uri); if (err != nullptr ) continue; } diff --git a/producer/api/src/request_handler_tcp.h b/producer/api/src/request_handler_tcp.h index 08a02132c6aae4ac2f7d38eee9abf8aef7646013..4c40ddc0d25141cb8125a06afccb611e467b2d54 100644 --- a/producer/api/src/request_handler_tcp.h +++ b/producer/api/src/request_handler_tcp.h @@ -29,7 +29,8 @@ class RequestHandlerTcp: public RequestHandler { const AbstractLogger* log__; ReceiverDiscoveryService* discovery_service__; private: - Error ConnectToReceiver(const std::string& receiver_address); + Error Authorize(const std::string& beamtime_id); + Error ConnectToReceiver(const std::string& beamtime_id,const std::string& receiver_address); Error SendHeaderAndData(const Request*); Error ReceiveResponse(); Error TrySendToReceiver(const Request* request); diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index 084557fb3b41619621ead0daf8323e9905a6ad2f..2e58db4cc55105122b661b658b936d0d831d8e0e 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -30,12 +30,12 @@ using asapo::RequestPool; using asapo::Request; -MATCHER_P3(M_CheckSendDataRequest, file_id, file_size, file_name, +MATCHER_P4(M_CheckSendDataRequest, op_code,file_id, file_size,message, "Checks if a valid GenericRequestHeader was Send") { - return ((asapo::GenericRequestHeader*)arg)->op_code == asapo::kOpcodeTransferData - && ((asapo::GenericRequestHeader*)arg)->data_id == file_id - && std::string(((asapo::GenericRequestHeader*)arg)->file_name) == file_name - && ((asapo::GenericRequestHeader*)arg)->data_size == file_size; + return ((asapo::GenericRequestHeader)(arg->header)).op_code == op_code + && ((asapo::GenericRequestHeader)(arg->header)).data_id == file_id + && ((asapo::GenericRequestHeader)(arg->header)).data_size == file_size + && strcmp(((asapo::GenericRequestHeader)(arg->header)).message, message) == 0; } @@ -77,12 +77,12 @@ TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { TEST_F(ProducerImplTests, OKSendingRequest) { uint64_t expected_size = 100; uint64_t expected_id = 10; - std::string expected_name = "test_name"; + char expected_name[asapo::kMaxMessageSize] = "test_name"; - Request request{asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, nullptr, nullptr}; + Request request{"",asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, nullptr, nullptr}; - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(expected_id, expected_size, expected_name))).WillOnce(Return( + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData,expected_id, expected_size, expected_name))).WillOnce(Return( nullptr)); auto err = producer.Send(expected_id, nullptr, expected_size, expected_name, nullptr); diff --git a/producer/api/unittests/test_request_handler_filesystem.cpp b/producer/api/unittests/test_request_handler_filesystem.cpp index 7cee0ad2410d6dcb5ee78b9783c9aee690433d0e..aae42f298151c8b964dab399fd3af9de6582c1f5 100644 --- a/producer/api/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/unittests/test_request_handler_filesystem.cpp @@ -52,13 +52,13 @@ class RequestHandlerFilesystemTests : public testing::Test { asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{header, expected_data_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::Request request{"",header, expected_data_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::Request request_nocallback{header, expected_data_pointer, nullptr}; + asapo::Request request_nocallback{"",header, expected_data_pointer, nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; asapo::RequestHandlerFilesystem request_handler{expected_destination, expected_thread_id}; @@ -131,7 +131,7 @@ TEST_F(RequestHandlerFilesystemTests, TransferOK) { ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); - ASSERT_THAT(std::string{callback_header.file_name}, Eq(std::string{header.file_name})); + ASSERT_THAT(std::string{callback_header.message}, Eq(std::string{header.message})); } diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 62457388f27b37020a12026496c6c1b51767f205..a46fa1ae4450ee3f94b451e24dcf7f32797f1f83 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -51,7 +51,9 @@ class RequestHandlerTcpTests : public testing::Test { uint64_t expected_file_id = 42; uint64_t expected_file_size = 1337; - std::string expected_file_name = "test_name"; + char expected_file_name[asapo::kMaxMessageSize] = "test_name"; + char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; + uint64_t expected_thread_id = 2; asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; @@ -60,13 +62,13 @@ class RequestHandlerTcpTests : public testing::Test { asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{header, expected_file_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::Request request{expected_beamtime_id,header, expected_file_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::Request request_nocallback{header, expected_file_pointer, nullptr}; + asapo::Request request_nocallback{expected_beamtime_id,header, expected_file_pointer, nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; uint64_t n_connections{0}; asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; @@ -80,7 +82,7 @@ class RequestHandlerTcpTests : public testing::Test { std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; - Sequence s; + Sequence seq_receive; void ExpectFailConnect(bool only_once = false); void ExpectFailAuthorize(bool only_once = false); void ExpectOKAuthorize(bool only_once = false); @@ -111,11 +113,12 @@ ACTION_P(A_WriteSendDataResponse, error_code) { strcpy(((asapo::SendDataResponse*)arg1)->message, expected_auth_message.c_str()); } -MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, +MATCHER_P4(M_CheckSendDataRequest, op_code,file_id, file_size,message, "Checks if a valid GenericRequestHeader was Send") { - return ((asapo::GenericRequestHeader*)arg)->op_code == asapo::kOpcodeTransferData + return ((asapo::GenericRequestHeader*)arg)->op_code == op_code && ((asapo::GenericRequestHeader*)arg)->data_id == file_id - && ((asapo::GenericRequestHeader*)arg)->data_size == file_size; + && ((asapo::GenericRequestHeader*)arg)->data_size == file_size + && strcmp(((asapo::GenericRequestHeader*)arg)->message, message) == 0; } @@ -142,8 +145,16 @@ void RequestHandlerTcpTests::ExpectFailConnect(bool only_once) { void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize,0, 0,expected_beamtime_id), + sizeof(asapo::GenericRequestHeader), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(sizeof(asapo::GenericRequestHeader)) + )); + EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .InSequence(s) + .InSequence(seq_receive) .WillOnce( DoAll( testing::SetArgPointee<3>(nullptr), @@ -165,8 +176,17 @@ void RequestHandlerTcpTests::ExpectFailAuthorize(bool only_once) { void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeAuthorize,0, 0,expected_beamtime_id), + sizeof(asapo::GenericRequestHeader), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<3>(nullptr), + Return(sizeof(asapo::GenericRequestHeader)) + )); + + EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .InSequence(s) + .InSequence(seq_receive) .WillOnce( DoAll( testing::SetArgPointee<3>(nullptr), @@ -189,8 +209,8 @@ void RequestHandlerTcpTests::ExpectOKAuthorize(bool only_once) { void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, - expected_file_size), + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeTransferData,expected_file_id, + expected_file_size,expected_file_name), sizeof(asapo::GenericRequestHeader), _)) .WillOnce( DoAll( @@ -248,7 +268,7 @@ void RequestHandlerTcpTests::ExpectFailReceive(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .InSequence(s) + .InSequence(seq_receive) .WillOnce( DoAll( testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), @@ -292,10 +312,10 @@ void RequestHandlerTcpTests::ExpectOKSendData(bool only_once) { void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once) { for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id, - expected_file_size), + EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(asapo::kOpcodeTransferData,expected_file_id, + expected_file_size,expected_file_name), sizeof(asapo::GenericRequestHeader), _)) - .WillOnce( + .WillOnce( DoAll( testing::SetArgPointee<3>(nullptr), Return(sizeof(asapo::GenericRequestHeader)) @@ -330,7 +350,7 @@ void RequestHandlerTcpTests::ExpectOKReceive(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _)) - .InSequence(s) + .InSequence(seq_receive) .WillOnce( DoAll( testing::SetArgPointee<3>(nullptr), @@ -537,7 +557,7 @@ TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { ExpectOKSendData(true); EXPECT_CALL(mock_io, Receive_t(expected_sds[0], _, sizeof(asapo::SendDataResponse), _)) - .InSequence(s) + .InSequence(seq_receive) .WillOnce( DoAll( testing::SetArgPointee<3>(nullptr), @@ -585,7 +605,7 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); ASSERT_THAT(callback_header.op_code, Eq(header.op_code)); ASSERT_THAT(callback_header.data_id, Eq(header.data_id)); - ASSERT_THAT(std::string{callback_header.file_name}, Eq(std::string{header.file_name})); + ASSERT_THAT(std::string{callback_header.message}, Eq(std::string{header.message})); } diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp index 64a0b70e6d0b1066e01e586ba58ccefb5fc97535..2b30f8178c8da1a070ed545a49088551ca5f533d 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/producer/api/unittests/test_request_pool.cpp @@ -62,7 +62,7 @@ class RequestPoolTests : public testing::Test { MockRequestHandlerFactory request_handler_factory{mock_request_handler}; const uint8_t nthreads = 1; asapo::RequestPool pool {nthreads, &request_handler_factory}; - std::unique_ptr<Request> request{new Request{GenericRequestHeader{}, nullptr, nullptr}}; + std::unique_ptr<Request> request{new Request{"",GenericRequestHeader{}, nullptr, nullptr}}; void SetUp() override { pool.log__ = &mock_logger; } @@ -118,7 +118,7 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { - Request* request2 = new Request{GenericRequestHeader{}, nullptr, nullptr}; + Request* request2 = new Request{"",GenericRequestHeader{}, nullptr, nullptr}; ExpectSend(mock_request_handler, 2);