diff --git a/common/cpp/include/request/request_handler.h b/common/cpp/include/request/request_handler.h index 60fbd767baf396e7036bd2c430d188164324773a..06cd2a59bafe61a6cef1b4d7fe2903deaedba434 100644 --- a/common/cpp/include/request/request_handler.h +++ b/common/cpp/include/request/request_handler.h @@ -12,8 +12,8 @@ namespace asapo { class RequestHandler { public: virtual void PrepareProcessingRequestLocked() = 0; - virtual void TearDownProcessingRequestLocked(bool processing_succeeded) = 0; - virtual bool ProcessRequestUnlocked(GenericRequest* request) = 0; + virtual void TearDownProcessingRequestLocked(bool success) = 0; + virtual bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) = 0; virtual void ProcessRequestTimeout(GenericRequest* request) = 0; virtual bool ReadyProcessRequest() = 0; virtual ~RequestHandler() = default; diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index de91b3f1501b6b42507df7318c71e7187a3e629b..f49f28c75265b0e869a0d90f953b05c16a72770c 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -50,11 +50,12 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_ request_handler->PrepareProcessingRequestLocked(); requests_in_progress_++; thread_info->lock.unlock(); - auto success = request_handler->ProcessRequestUnlocked(request.get()); + bool retry; + auto success = request_handler->ProcessRequestUnlocked(request.get(), &retry); thread_info->lock.lock(); requests_in_progress_--; request_handler->TearDownProcessingRequestLocked(success); - if (!success) { + if (retry) { PutRequestBackToQueue(std::move(request)); thread_info->lock.unlock(); condition_.notify_all(); diff --git a/common/cpp/unittests/request/mocking.h b/common/cpp/unittests/request/mocking.h index d37fdd79d7ff50b58597f814b35307cc56acba48..c4ed7dfd5c07dccecea13470fdfa034d6706d1cf 100644 --- a/common/cpp/unittests/request/mocking.h +++ b/common/cpp/unittests/request/mocking.h @@ -18,7 +18,7 @@ class MockRequestHandler : public RequestHandler { MOCK_METHOD1(ProcessRequestUnlocked_t, bool (const GenericRequest* request)); MOCK_METHOD1(ProcessRequestTimeout, void(GenericRequest* request)); uint64_t retry_counter = 0; - bool ProcessRequestUnlocked(GenericRequest* request) override { + bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) override { retry_counter = request->GetRetryCounter(); std::this_thread::sleep_for(std::chrono::milliseconds(50)); return ProcessRequestUnlocked_t(request); diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 714e704c2f9e2406ab67166b02c7614f9a52d68d..97d5234e4633a80d3b4f17e61f7c3290763bff92 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -39,7 +39,7 @@ struct Args { }; void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { - if (err && err!=asapo::ProducerErrorTemplates::kServerWarning) { + if (err && err != asapo::ProducerErrorTemplates::kServerWarning) { std::cerr << "Data was not successfully send: " << err << std::endl; return; } diff --git a/producer/api/cpp/src/request_handler_filesystem.cpp b/producer/api/cpp/src/request_handler_filesystem.cpp index fd75e003f20a45be41603a7b5d1c1caa1c55a95f..5e1437d25082d95613ff5251ca17f05e93fec78d 100644 --- a/producer/api/cpp/src/request_handler_filesystem.cpp +++ b/producer/api/cpp/src/request_handler_filesystem.cpp @@ -16,13 +16,14 @@ RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folde thread_id_{thread_id} { } -bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request) { +bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request, bool* retry) { auto producer_request = static_cast<ProducerRequest*>(request); Error err; if (producer_request->DataFromFile()) { producer_request->data = io__->GetDataFromFile(producer_request->original_filepath, &producer_request->header.data_size, &err); if (err) { + *retry = true; return false; } } @@ -32,6 +33,7 @@ bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request) { if (producer_request->callback) { producer_request->callback(request->header, std::move(err)); } + *retry = false; return true; } diff --git a/producer/api/cpp/src/request_handler_filesystem.h b/producer/api/cpp/src/request_handler_filesystem.h index 72fb21061d0d20631aa457f5cf24937c14e0ff1a..66b1ca980683ead71925b3956a295b14fb65af7f 100644 --- a/producer/api/cpp/src/request_handler_filesystem.h +++ b/producer/api/cpp/src/request_handler_filesystem.h @@ -17,12 +17,12 @@ namespace asapo { class RequestHandlerFilesystem: public RequestHandler { public: explicit RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id); - bool ProcessRequestUnlocked(GenericRequest* request) override; + bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) override; bool ReadyProcessRequest() override { return true; }; void PrepareProcessingRequestLocked() override {}; - void TearDownProcessingRequestLocked(bool processing_succeeded) override {}; + void TearDownProcessingRequestLocked(bool request_processed_successfully) override {}; void ProcessRequestTimeout(GenericRequest* request) override; virtual ~RequestHandlerFilesystem() = default; diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index a1f25ffb56eef143c1ab0d8003855c827282ea51..9baccb8870601cf386960b0bca7ce1071894ce74 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -12,8 +12,8 @@ RequestHandlerTcp::RequestHandlerTcp(ReceiverDiscoveryService* discovery_service ncurrent_connections_{shared_counter} { } -Error RequestHandlerTcp::Authorize(const std::string& beamtime_id) { - GenericRequestHeader header{kOpcodeAuthorize, 0, 0, 0, beamtime_id.c_str()}; +Error RequestHandlerTcp::Authorize(const std::string& source_credentials) { + GenericRequestHeader header{kOpcodeAuthorize, 0, 0, 0, source_credentials.c_str()}; Error err; io__->Send(sd_, &header, sizeof(header), &err); if(err) { @@ -23,7 +23,7 @@ Error RequestHandlerTcp::Authorize(const std::string& beamtime_id) { } -Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const std::string& receiver_address) { +Error RequestHandlerTcp::ConnectToReceiver(const std::string& source_credentials, const std::string& receiver_address) { Error err; sd_ = io__->CreateAndConnectIPTCPSocket(receiver_address, &err); @@ -34,7 +34,7 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const log__->Debug("connected to receiver at " + receiver_address); connected_receiver_uri_ = receiver_address; - err = Authorize(beamtime_id); + err = Authorize(source_credentials); if (err != nullptr) { log__->Error("authorization failed at " + receiver_address + " - " + err->Explain()); Disconnect(); @@ -206,31 +206,44 @@ bool RequestHandlerTcp::ProcessErrorFromReceiver(const Error& error, } -bool RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request) { +void RequestHandlerTcp::ProcessRequestCallback(Error err, ProducerRequest* request,bool* retry) { + if (request->callback) { + request->callback(request->header, std::move(err)); + } + *retry = false; +} + + +bool RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request, bool* retry) { for (auto receiver_uri : receivers_list_) { if (Disconnected()) { auto err = ConnectToReceiver(request->source_credentials, receiver_uri); - if (err != nullptr ) continue; + if (err == ProducerErrorTemplates::kWrongInput) { + ProcessRequestCallback(std::move(err),request,retry); + return false; + } else { + if (err != nullptr ) continue; + } } auto err = TrySendToReceiver(request); - auto retry = ProcessErrorFromReceiver(err, request, receiver_uri); - if (retry) { + bool server_error_can_retry = ProcessErrorFromReceiver(err, request, receiver_uri); + if (server_error_can_retry) { continue; } - if (request->callback) { - request->callback(request->header, std::move(err)); - } - return true; + bool success = err && err != ProducerErrorTemplates::kServerWarning ? false : true; + ProcessRequestCallback(std::move(err),request,retry); + return success; } log__->Warning("put back to the queue, request opcode: " + std::to_string(request->header.op_code) + ", id: " + std::to_string(request->header.data_id)); + *retry = true; return false; } -bool RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request) { +bool RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request, bool* retry) { auto producer_request = static_cast<ProducerRequest*>(request); auto err = producer_request->UpdateDataSizeFromFileIfNeeded(io__.get()); @@ -238,14 +251,15 @@ bool RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request) { if (producer_request->callback) { producer_request->callback(producer_request->header, std::move(err)); } - return true; + *retry = false; + return false; } if (NeedRebalance()) { CloseConnectionToPeformRebalance(); } - return SendDataToOneOfTheReceivers(producer_request); + return SendDataToOneOfTheReceivers(producer_request, retry); } bool RequestHandlerTcp::Connected() { @@ -264,8 +278,8 @@ void RequestHandlerTcp::PrepareProcessingRequestLocked() { UpdateIfNewConnection(); } -void RequestHandlerTcp::TearDownProcessingRequestLocked(bool processing_succeeded) { - if (!processing_succeeded) { +void RequestHandlerTcp::TearDownProcessingRequestLocked(bool request_processed_successfully) { + if (!request_processed_successfully) { (*ncurrent_connections_)--; } } diff --git a/producer/api/cpp/src/request_handler_tcp.h b/producer/api/cpp/src/request_handler_tcp.h index 0cba33e5dd747a3564a4bf29c453155ac5e1efaa..fa2d39b7110c989b66899b5b6c7b4f9889065ff0 100644 --- a/producer/api/cpp/src/request_handler_tcp.h +++ b/producer/api/cpp/src/request_handler_tcp.h @@ -19,10 +19,10 @@ namespace asapo { class RequestHandlerTcp: public RequestHandler { public: explicit RequestHandlerTcp(ReceiverDiscoveryService* discovery_service, uint64_t thread_id, uint64_t* shared_counter); - bool ProcessRequestUnlocked(GenericRequest* request) override; + bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) override; bool ReadyProcessRequest() override; void PrepareProcessingRequestLocked() override; - void TearDownProcessingRequestLocked(bool processing_succeeded) override; + void TearDownProcessingRequestLocked(bool request_processed_successfully) override; void ProcessRequestTimeout(GenericRequest* request) override; virtual ~RequestHandlerTcp() = default; @@ -30,9 +30,9 @@ class RequestHandlerTcp: public RequestHandler { const AbstractLogger* log__; ReceiverDiscoveryService* discovery_service__; private: - Error Authorize(const std::string& beamtime_id); - Error ConnectToReceiver(const std::string& beamtime_id, const std::string& receiver_address); - bool SendDataToOneOfTheReceivers(ProducerRequest* request); + Error Authorize(const std::string& source_credentials); + Error ConnectToReceiver(const std::string& source_credentials, const std::string& receiver_address); + bool SendDataToOneOfTheReceivers(ProducerRequest* request, bool* retry); Error SendRequestContent(const ProducerRequest* request); Error ReceiveResponse(const GenericRequestHeader& request_header); Error TrySendToReceiver(const ProducerRequest* request); @@ -50,7 +50,7 @@ class RequestHandlerTcp: public RequestHandler { bool ProcessErrorFromReceiver(const Error& error, const ProducerRequest* request, const std::string& receiver_uri); ReceiversList receivers_list_; system_clock::time_point last_receivers_uri_update_; - + void ProcessRequestCallback(Error err, ProducerRequest* request, bool* retry); uint64_t thread_id_; uint64_t* ncurrent_connections_; std::string connected_receiver_uri_; diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp index 4e443b971714031c162298d4e418cfc6f59682af..da7be9fba592edbd80c11cf97cf591a09d16b293 100644 --- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp @@ -50,7 +50,7 @@ class RequestHandlerFilesystemTests : public testing::Test { std::string expected_destination = "destination"; std::string expected_fullpath = expected_destination + "/" + expected_file_name; std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; - + bool retry; asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, @@ -104,7 +104,7 @@ TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { ); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(callback_err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); ASSERT_THAT(called, Eq(true)); @@ -119,7 +119,7 @@ TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { ); - auto success = request_handler.ProcessRequestUnlocked(&request_nocallback); + auto success = request_handler.ProcessRequestUnlocked(&request_nocallback, &retry); ASSERT_THAT(called, Eq(false)); ASSERT_THAT(success, Eq(true)); @@ -135,7 +135,7 @@ TEST_F(RequestHandlerFilesystemTests, FileRequestErrorOnReadData) { Return(nullptr) )); - auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry); ASSERT_THAT(success, Eq(false)); } @@ -154,7 +154,7 @@ TEST_F(RequestHandlerFilesystemTests, FileRequestOK) { Return(nullptr) ); - auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry); ASSERT_THAT(success, Eq(true)); } @@ -169,7 +169,7 @@ TEST_F(RequestHandlerFilesystemTests, TransferOK) { ); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); ASSERT_THAT(callback_err, Eq(nullptr)); diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index ab5cebbc6ddb46e94d67ad476d7596fdff2bb6c1..3d898b6a12728134bb2a16290404272c80c9722d 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -101,6 +101,7 @@ class RequestHandlerTcpTests : public testing::Test { std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; + bool retry; Sequence seq_receive; void ExpectFailConnect(bool only_once = false); void ExpectFailAuthorize(bool only_once = false); @@ -339,7 +340,6 @@ void RequestHandlerTcpTests::ExpectFailSend(uint64_t expected_size, bool only_on EXPECT_CALL(mock_logger, Warning(HasSubstr("put back"))); } - void RequestHandlerTcpTests::ExpectFailSendData(bool only_once) { ExpectFailSend(expected_file_size, only_once); } @@ -495,7 +495,7 @@ void RequestHandlerTcpTests::DoSingleSend(bool connect, bool success) { } request_handler.PrepareProcessingRequestLocked(); - request_handler.ProcessRequestUnlocked(&request); + request_handler.ProcessRequestUnlocked(&request, &retry); Mock::VerifyAndClearExpectations(&mock_io); Mock::VerifyAndClearExpectations(&mock_logger); @@ -531,11 +531,9 @@ TEST_F(RequestHandlerTcpTests, DoesNotGetsUriIfAlreadyConnected) { TEST_F(RequestHandlerTcpTests, ReduceConnectionNumberAtTearDownIfError) { n_connections = 1; - request_handler.TearDownProcessingRequestLocked(false); ASSERT_THAT(n_connections, Eq(0)); - } TEST_F(RequestHandlerTcpTests, DoNotReduceConnectionNumberAtTearDownIfNoError) { @@ -551,21 +549,27 @@ TEST_F(RequestHandlerTcpTests, TriesConnectWhenNotConnected) { ExpectFailConnect(); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(retry, Eq(true)); } TEST_F(RequestHandlerTcpTests, FailsWhenCannotAuthorize) { - ExpectOKConnect(); - ExpectFailAuthorize(); + ExpectOKConnect(true); + ExpectFailAuthorize(true); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); request_handler.TearDownProcessingRequestLocked(success); - ASSERT_THAT(success, Eq(false)); ASSERT_THAT(n_connections, Eq(0)); + ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); + ASSERT_THAT(callback_called, Eq(true)); + ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(retry, Eq(false)); + + } @@ -581,11 +585,12 @@ TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { ExpectFailSendHeader(true); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(false)); -} + ASSERT_THAT(retry, Eq(true)); +} TEST_F(RequestHandlerTcpTests, DoNotCloseWhenNotConnected) { @@ -595,9 +600,11 @@ TEST_F(RequestHandlerTcpTests, DoNotCloseWhenNotConnected) { ExpectFailSendHeader(); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(retry, Eq(true)); + } @@ -610,9 +617,11 @@ TEST_F(RequestHandlerTcpTests, CloseConnectionWhenRebalance) { EXPECT_CALL(mock_io, CloseSocket_t(_, _)); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(retry, Eq(true)); + } @@ -623,9 +632,11 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendHeader) { ExpectFailSendHeader(); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(retry, Eq(true)); + } @@ -637,9 +648,11 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendData) { ExpectFailSendData(); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(retry, Eq(true)); + } TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendMetaData) { @@ -649,9 +662,11 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendMetaData) { ExpectFailSendMetaData(); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(retry, Eq(true)); + } TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { @@ -664,9 +679,11 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { ExpectFailReceive(true); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(retry, Eq(true)); + } void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode error_code, @@ -696,10 +713,12 @@ void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode e )); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(callback_err, Eq(err_template)); ASSERT_THAT(callback_called, Eq(true)); - ASSERT_THAT(success, Eq(true)); + ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(retry, Eq(false)); + } void RequestHandlerTcpTests::ExpectGetFileSize(bool ok) { @@ -732,10 +751,11 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ExpectOKReceive(); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request_nocallback); + auto success = request_handler.ProcessRequestUnlocked(&request_nocallback, &retry); ASSERT_THAT(success, Eq(true)); ASSERT_THAT(callback_called, Eq(false)); + ASSERT_THAT(retry, Eq(false)); } TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithReadError) { @@ -747,11 +767,13 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithReadError) { ExpectFailSendFile(asapo::ProducerErrorTemplates::kLocalIOError, true); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry); - ASSERT_THAT(success, Eq(true)); + ASSERT_THAT(success, Eq(false)); ASSERT_THAT(callback_called, Eq(true)); ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kLocalIOError)); + ASSERT_THAT(retry, Eq(false)); + } TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithServerError) { @@ -763,10 +785,12 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithServerError) { ExpectFailSendFile(asapo::ProducerErrorTemplates::kInternalServerError); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry); ASSERT_THAT(success, Eq(false)); ASSERT_THAT(callback_called, Eq(false)); + ASSERT_THAT(retry, Eq(true)); + } TEST_F(RequestHandlerTcpTests, FileRequestErrorGettingFileSize) { @@ -774,10 +798,12 @@ TEST_F(RequestHandlerTcpTests, FileRequestErrorGettingFileSize) { request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request_filesend); - ASSERT_THAT(success, Eq(true)); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry); + ASSERT_THAT(success, Eq(false)); ASSERT_THAT(callback_called, Eq(true)); ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kLocalIOError)); + ASSERT_THAT(retry, Eq(false)); + } @@ -793,10 +819,12 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) { request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry); ASSERT_THAT(success, Eq(true)); ASSERT_THAT(callback_called, Eq(true)); ASSERT_THAT(callback_err, Eq(nullptr)); + ASSERT_THAT(retry, Eq(false)); + } @@ -808,9 +836,10 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKReceive(); request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); + ASSERT_THAT(retry, Eq(false)); ASSERT_THAT(callback_err, Eq(nullptr)); ASSERT_THAT(callback_called, Eq(true)); ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); @@ -832,9 +861,11 @@ TEST_F(RequestHandlerTcpTests, SendMetadataIgnoresIngestMode) { request.header.op_code = asapo::kOpcodeTransferMetaData; request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); + ASSERT_THAT(retry, Eq(false)); + } @@ -849,9 +880,10 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) { request.header.custom_data[asapo::kPosIngestMode] = ingest_mode; request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); + ASSERT_THAT(retry, Eq(false)); ASSERT_THAT(callback_header.custom_data[asapo::kPosIngestMode], Eq(ingest_mode)); } @@ -870,8 +902,10 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) { auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; request_filesend.header.custom_data[asapo::kPosIngestMode] = ingest_mode; - auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry); ASSERT_THAT(success, Eq(true)); + ASSERT_THAT(retry, Eq(false)); + } @@ -902,9 +936,10 @@ TEST_F(RequestHandlerTcpTests, SendWithWarning) { request_handler.PrepareProcessingRequestLocked(); - auto success = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); + ASSERT_THAT(retry, Eq(false)); ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kServerWarning)); ASSERT_THAT(callback_err->Explain(), HasSubstr(expected_warning)); ASSERT_THAT(callback_called, Eq(true)); @@ -914,6 +949,4 @@ TEST_F(RequestHandlerTcpTests, SendWithWarning) { ASSERT_THAT(std::string{callback_header.message}, Eq(std::string{header.message})); } - - } diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp index f3d1672a8f05c144be3ac4bdd602bec2387e203b..03d8ae764549afee64c6e3e5f4a555bcf2f67ac6 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp @@ -46,7 +46,8 @@ void* ReceiverDataServerRequestHandler::GetSlot(const ReceiverDataServerRequest* } -bool ReceiverDataServerRequestHandler::ProcessRequestUnlocked(GenericRequest* request) { +bool ReceiverDataServerRequestHandler::ProcessRequestUnlocked(GenericRequest* request, bool* retry) { + *retry = false; auto receiver_request = dynamic_cast<ReceiverDataServerRequest*>(request); if (!CheckRequest(receiver_request)) { SendResponce(receiver_request, kNetErrorWrongRequest); diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.h b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h index 0ca1a46b5fb0c8d789a389e54eddc092e652f057..4bae2ee45e590dc1d3e699d72e90675c04cc2b9a 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler.h +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h @@ -13,7 +13,7 @@ namespace asapo { class ReceiverDataServerRequestHandler: public RequestHandler { public: explicit ReceiverDataServerRequestHandler(const NetServer* server, DataCache* data_cache, Statistics* statistics); - bool ProcessRequestUnlocked(GenericRequest* request) override; + bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) override; bool ReadyProcessRequest() override; void PrepareProcessingRequestLocked() override; void TearDownProcessingRequestLocked(bool processing_succeeded) override; diff --git a/receiver/src/request_handler_authorize.cpp b/receiver/src/request_handler_authorize.cpp index 0aed9549de6726afc88ccf110005368a545129ad..d11573d2bc51833c419e0b4a6d70c85762e119c8 100644 --- a/receiver/src/request_handler_authorize.cpp +++ b/receiver/src/request_handler_authorize.cpp @@ -15,10 +15,13 @@ std::string RequestHandlerAuthorize::GetRequestString(const Request* request, co return request_string; } -Error RequestHandlerAuthorize::ErrorFromServerResponse(const Error& err, HttpCode code) const { +Error RequestHandlerAuthorize::ErrorFromAuthorizationServerResponse(const Error& err, HttpCode code) const { if (err) { return asapo::ReceiverErrorTemplates::kInternalServerError.Generate("cannot authorize request: " + err->Explain()); } else { + if (code != HttpCode::Unauthorized) { + return asapo::ReceiverErrorTemplates::kInternalServerError.Generate("return code " + std::to_string(int(code))); + } return asapo::ReceiverErrorTemplates::kAuthorizationFailure.Generate("return code " + std::to_string(int(code))); } } @@ -31,7 +34,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr auto response = http_client__->Post(GetReceiverConfig()->authorization_server + "/authorize", request_string, &code, &err); if (err || code != HttpCode::OK) { - auto auth_error = ErrorFromServerResponse(err, code); + auto auth_error = ErrorFromAuthorizationServerResponse(err, code); log__->Error("failure authorizing at " + GetReceiverConfig()->authorization_server + " request: " + request_string + " - " + auth_error->Explain()); @@ -45,7 +48,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr (err = parser.GetString("Year", &beamtime_year_)) || (err = parser.GetString("Beamline", &beamline_)); if (err) { - return ErrorFromServerResponse(err, code); + return ErrorFromAuthorizationServerResponse(err, code); } else { log__->Debug(std::string("authorized connection from ") + request->GetOriginUri() + " beamline: " + beamline_ + ", beamtime id: " + beamtime_id_ + ", stream: " + stream_); diff --git a/receiver/src/request_handler_authorize.h b/receiver/src/request_handler_authorize.h index 95c78802675c1099b09f6ea77dfc5ec78414d821..7f46d3eba209e1d0406463dcd3d4eb07e11a9c10 100644 --- a/receiver/src/request_handler_authorize.h +++ b/receiver/src/request_handler_authorize.h @@ -30,7 +30,7 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler { Error ProcessAuthorizationRequest(Request* request) const; Error ProcessOtherRequest(Request* request) const; Error Authorize(Request* request, const char* source_credentials) const; - Error ErrorFromServerResponse(const Error& err, HttpCode code) const; + Error ErrorFromAuthorizationServerResponse(const Error& err, HttpCode code) const; std::string GetRequestString(const Request* request, const char* source_credentials) const; }; diff --git a/receiver/unittests/receiver_data_server/test_request_handler.cpp b/receiver/unittests/receiver_data_server/test_request_handler.cpp index 6b56beb2b9a24dd57e80461d6c1804c8de34d715..d67c63cfa8bb2f01f39ba3621b13cbf16b076f32 100644 --- a/receiver/unittests/receiver_data_server/test_request_handler.cpp +++ b/receiver/unittests/receiver_data_server/test_request_handler.cpp @@ -57,6 +57,7 @@ class RequestHandlerTests : public Test { uint64_t expected_meta_size = 100; uint64_t expected_buf_id = 12345; uint64_t expected_source_id = 11; + bool retry; asapo::GenericRequestHeader header{asapo::kOpcodeGetBufferData, expected_buf_id, expected_data_size, expected_meta_size, ""}; asapo::ReceiverDataServerRequest request{std::move(header), expected_source_id}; @@ -97,7 +98,7 @@ TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) { EXPECT_CALL(mock_logger, Error(HasSubstr("wrong request"))); - auto success = handler.ProcessRequestUnlocked(&request); + auto success = handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); } @@ -105,7 +106,7 @@ TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) { TEST_F(RequestHandlerTests, ProcessRequestReturnsNoDataWhenCacheNotUsed) { MockSendResponce(asapo::kNetErrorNoData, true); - auto success = handler_no_cache.ProcessRequestUnlocked(&request); + auto success = handler_no_cache.ProcessRequestUnlocked(&request, &retry); EXPECT_CALL(mock_logger, Debug(_)).Times(0); ASSERT_THAT(success, Eq(true)); @@ -116,7 +117,7 @@ TEST_F(RequestHandlerTests, ProcessRequestReadSlotReturnsNull) { MockSendResponce(asapo::kNetErrorNoData, true); EXPECT_CALL(mock_logger, Debug(HasSubstr("not found"))); - auto success = handler.ProcessRequestUnlocked(&request); + auto success = handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); } @@ -128,7 +129,7 @@ TEST_F(RequestHandlerTests, ProcessRequestReadSlotErrorSendingResponce) { EXPECT_CALL(mock_net, SendData_t(expected_source_id, &tmp, expected_data_size)).Times(0); EXPECT_CALL(mock_cache, UnlockSlot(_)); - auto success = handler.ProcessRequestUnlocked(&request); + auto success = handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); } @@ -145,7 +146,7 @@ TEST_F(RequestHandlerTests, ProcessRequestOk) { EXPECT_CALL(mock_logger, Debug(HasSubstr("sending"))); EXPECT_CALL(mock_stat, IncreaseRequestCounter_t()); EXPECT_CALL(mock_stat, IncreaseRequestDataVolume_t(expected_data_size)); - auto success = handler.ProcessRequestUnlocked(&request); + auto success = handler.ProcessRequestUnlocked(&request, &retry); ASSERT_THAT(success, Eq(true)); }