From 7a1da279418578c9d353f47391165832c9f909ab Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Tue, 5 Nov 2019 17:30:51 +0100 Subject: [PATCH] refactor producer and request pool --- common/cpp/include/io/io.h | 2 +- common/cpp/include/request/request_handler.h | 4 +- common/cpp/include/unittests/MockIO.h | 4 + common/cpp/src/request/request_pool.cpp | 6 +- common/cpp/src/system_io/system_io.cpp | 4 + common/cpp/src/system_io/system_io.h | 1 + common/cpp/unittests/request/mocking.h | 21 +- .../unittests/request/test_request_pool.cpp | 4 +- .../api/cpp/include/producer/producer_error.h | 76 +------ producer/api/cpp/src/producer_impl.cpp | 20 +- producer/api/cpp/src/producer_request.cpp | 8 +- producer/api/cpp/src/producer_request.h | 2 +- .../cpp/src/request_handler_filesystem.cpp | 14 +- .../api/cpp/src/request_handler_filesystem.h | 4 +- producer/api/cpp/src/request_handler_tcp.cpp | 51 ++--- producer/api/cpp/src/request_handler_tcp.h | 8 +- producer/api/cpp/unittests/test_producer.cpp | 2 +- .../api/cpp/unittests/test_producer_impl.cpp | 26 +-- .../test_request_handler_filesystem.cpp | 20 +- .../unittests/test_request_handler_tcp.cpp | 186 ++++++++++++------ producer/api/python/asapo_producer.pxd | 13 +- .../receiver_data_server_request_handler.cpp | 10 +- .../receiver_data_server_request_handler.h | 4 +- .../test_request_handler.cpp | 20 +- 24 files changed, 250 insertions(+), 260 deletions(-) diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index 978753696..6aae3311a 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -80,7 +80,7 @@ class IO { long timeout_in_usec, Error* err) const = 0; virtual size_t Send(SocketDescriptor socket_fd, const void* buf, size_t length, Error* err) const = 0; - + virtual Error SendFile(SocketDescriptor socket_fd, const std::string& fname, size_t length) const = 0; virtual void Skip(SocketDescriptor socket_fd, size_t length, Error* err) const = 0; /** * @param err Since CloseSocket if often used in an error case, it's able to accept err as nullptr. diff --git a/common/cpp/include/request/request_handler.h b/common/cpp/include/request/request_handler.h index 03f891729..25f63d653 100644 --- a/common/cpp/include/request/request_handler.h +++ b/common/cpp/include/request/request_handler.h @@ -12,8 +12,8 @@ namespace asapo { class RequestHandler { public: virtual void PrepareProcessingRequestLocked() = 0; - virtual void TearDownProcessingRequestLocked(const Error& error_from_process) = 0; - virtual Error ProcessRequestUnlocked(GenericRequest* request) = 0; + virtual void TearDownProcessingRequestLocked(bool processing_succeeded) = 0; + virtual bool ProcessRequestUnlocked(GenericRequest* request) = 0; virtual bool ReadyProcessRequest() = 0; virtual ~RequestHandler() = default; }; diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index 8c518de51..834de568d 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -211,6 +211,10 @@ class MockIO : public IO { MOCK_CONST_METHOD0(GetLastError_t, ErrorInterface * ()); + Error SendFile(SocketDescriptor socket_fd, const std::string& fname, size_t length) const override { + return Error{SendFile_t(socket_fd, fname, length)}; + } + MOCK_CONST_METHOD3(SendFile_t, ErrorInterface * (SocketDescriptor socket_fd, const std::string& fname, size_t length)); Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const FileData& data, size_t length, bool create_directories) const override { diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index 163ad4599..f84f56c32 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -43,10 +43,10 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_ request_handler->PrepareProcessingRequestLocked(); auto request = GetRequestFromQueue(); thread_info->lock.unlock(); - auto err = request_handler->ProcessRequestUnlocked(request.get()); + auto success = request_handler->ProcessRequestUnlocked(request.get()); thread_info->lock.lock(); - request_handler->TearDownProcessingRequestLocked(err); - if (err) { + request_handler->TearDownProcessingRequestLocked(success); + if (!success) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); PutRequestBackToQueue(std::move(request)); condition_.notify_all(); diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index 8a71217e9..2ad01cebf 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -618,4 +618,8 @@ std::string SystemIO::GetHostName(Error* err) const noexcept { } } +Error SystemIO::SendFile(SocketDescriptor socket_fd, const std::string& fname, size_t length) const { + return nullptr; +} + } diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index 6a7176c9c..3a8374cd6 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -114,6 +114,7 @@ class SystemIO final : public IO { size_t ReceiveWithTimeout(SocketDescriptor socket_fd, void* buf, size_t length, long timeout_in_usec, Error* err) const override; size_t Send(SocketDescriptor socket_fd, const void* buf, size_t length, Error* err) const override; + Error SendFile(SocketDescriptor socket_fd, const std::string& fname, size_t length) const override; void Skip(SocketDescriptor socket_fd, size_t length, Error* err) const override; void CloseSocket(SocketDescriptor socket_fd, Error* err) const override; std::string GetHostName(Error* err) const noexcept override; diff --git a/common/cpp/unittests/request/mocking.h b/common/cpp/unittests/request/mocking.h index 416423cd9..94cd42359 100644 --- a/common/cpp/unittests/request/mocking.h +++ b/common/cpp/unittests/request/mocking.h @@ -12,21 +12,16 @@ const std::string expected_endpoint = "expected_endpont"; class MockRequestHandler : public RequestHandler { public: - - Error ProcessRequestUnlocked(GenericRequest* request) override { - return Error{ProcessRequestUnlocked_t(request)}; - } - void TearDownProcessingRequestLocked(const Error& error_from_process) override { - if (error_from_process) { - TearDownProcessingRequestLocked_t(error_from_process.get()); - } else { - TearDownProcessingRequestLocked_t(nullptr); - } - } MOCK_METHOD0(PrepareProcessingRequestLocked, void()); MOCK_METHOD0(ReadyProcessRequest, bool()); - MOCK_METHOD1(TearDownProcessingRequestLocked_t, void(ErrorInterface* error_from_process)); - MOCK_METHOD1(ProcessRequestUnlocked_t, ErrorInterface * (const GenericRequest*)); + MOCK_METHOD1(TearDownProcessingRequestLocked, void(bool processing_succeeded)); + MOCK_METHOD1(ProcessRequestUnlocked_t, bool (const GenericRequest* request)); + + bool ProcessRequestUnlocked(GenericRequest* request) override { + return ProcessRequestUnlocked_t(request); + } + + }; } diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 98f54624b..1f71c1d39 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -101,8 +101,8 @@ TEST_F(RequestPoolTests, NRequestsInQueue) { void ExpectSend(MockRequestHandler* mock_handler, int ntimes = 1) { EXPECT_CALL(*mock_handler, ReadyProcessRequest()).Times(ntimes).WillRepeatedly(Return(true)); EXPECT_CALL(*mock_handler, PrepareProcessingRequestLocked()).Times(ntimes); - EXPECT_CALL(*mock_handler, ProcessRequestUnlocked_t(_)).Times(ntimes).WillRepeatedly(Return(nullptr)); - EXPECT_CALL(*mock_handler, TearDownProcessingRequestLocked_t(nullptr)).Times(ntimes); + EXPECT_CALL(*mock_handler, ProcessRequestUnlocked_t(_)).Times(ntimes).WillRepeatedly(Return(true)); + EXPECT_CALL(*mock_handler, TearDownProcessingRequestLocked(true)).Times(ntimes); } diff --git a/producer/api/cpp/include/producer/producer_error.h b/producer/api/cpp/include/producer/producer_error.h index 226dcaa4e..12c5440ad 100644 --- a/producer/api/cpp/include/producer/producer_error.h +++ b/producer/api/cpp/include/producer/producer_error.h @@ -6,92 +6,32 @@ namespace asapo { enum class ProducerErrorType { - kFileNameTooLong, - kEmptyFileName, - kNoData, - kZeroDataSize, - kBeamtimeIdTooLong, - kBeamtimeAlreadySet, - kFileIdAlreadyInUse, - kErrorInMetadata, - kErrorSubsetSize, - kAuthorizationFailed, kInternalServerError, - kCannotSendDataToReceivers, kRequestPoolIsFull, - kWrongIngestMode + kLocalIOError, + kWrongInput }; using ProducerErrorTemplate = ServiceErrorTemplate<ProducerErrorType, ErrorType::kProducerError>; namespace ProducerErrorTemplates { - -auto const kWrongIngestMode = ProducerErrorTemplate { - "wrong ingest mode", ProducerErrorType::kWrongIngestMode -}; - -auto const kNoData = ProducerErrorTemplate { - "no data", ProducerErrorType::kNoData -}; - - -auto const kZeroDataSize = ProducerErrorTemplate { - "zero data size", ProducerErrorType::kZeroDataSize -}; - - -auto const kErrorSubsetSize = ProducerErrorTemplate { - "Error in subset size", ProducerErrorType::kErrorSubsetSize -}; - -auto const kFileNameTooLong = ProducerErrorTemplate { - "filename too long", ProducerErrorType::kFileNameTooLong -}; - -auto const kEmptyFileName = ProducerErrorTemplate { - "empty filename", ProducerErrorType::kEmptyFileName -}; - - -auto const kCredentialsTooLong = ProducerErrorTemplate { - "beamtime id too long", ProducerErrorType::kBeamtimeIdTooLong -}; - - -auto const kCredentialsAlreadySet = ProducerErrorTemplate { - "beamtime id already set", ProducerErrorType::kBeamtimeAlreadySet +auto const kLocalIOError = ProducerErrorTemplate { + "local i/o error", ProducerErrorType::kLocalIOError }; - -auto const kFileIdAlreadyInUse = ProducerErrorTemplate { - "File already in use", ProducerErrorType::kFileIdAlreadyInUse +auto const kRequestPoolIsFull = ProducerErrorTemplate { + "Cannot add request to poll - hit pool size limit", ProducerErrorType::kRequestPoolIsFull }; -auto const kAuthorizationFailed = ProducerErrorTemplate { - "Authorization failed:", ProducerErrorType::kAuthorizationFailed +auto const kWrongInput = ProducerErrorTemplate { + "wrong input", ProducerErrorType::kWrongInput }; auto const kInternalServerError = ProducerErrorTemplate { "Internal server error", ProducerErrorType::kInternalServerError }; -auto const kCannotSendDataToReceivers = ProducerErrorTemplate { - "Cannot connect/send data to receivers", ProducerErrorType::kCannotSendDataToReceivers -}; - -auto const kRequestPoolIsFull = ProducerErrorTemplate { - "Cannot add request to poll - hit pool size limit", ProducerErrorType::kRequestPoolIsFull -}; - -auto const kErrorInMetadata = ProducerErrorTemplate { - "error in metadata", ProducerErrorType::kErrorInMetadata -}; - - - - - }; } diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 699c36a8c..e19bac070 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -45,12 +45,12 @@ GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& ev Error CheckIngestMode(uint64_t ingest_mode) { if ((ingest_mode & IngestModeFlags::kTransferData) && (ingest_mode & IngestModeFlags::kTransferMetaDataOnly)) { - return ProducerErrorTemplates::kWrongIngestMode.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("wrong ingest mode"); } if (!(ingest_mode & IngestModeFlags::kTransferData) && !(ingest_mode & IngestModeFlags::kTransferMetaDataOnly)) { - return ProducerErrorTemplates::kWrongIngestMode.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("wrong ingest mode"); } return nullptr; @@ -58,15 +58,15 @@ Error CheckIngestMode(uint64_t ingest_mode) { Error CheckProducerRequest(const EventHeader& event_header, uint64_t ingest_mode) { if (event_header.file_name.size() > kMaxMessageSize) { - return ProducerErrorTemplates::kFileNameTooLong.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("too long filename"); } if (event_header.file_name.empty() ) { - return ProducerErrorTemplates::kEmptyFileName.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("empty filename"); } if (event_header.subset_id > 0 && event_header.subset_size == 0) { - return ProducerErrorTemplates::kErrorSubsetSize.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("subset dimensions"); } return CheckIngestMode(ingest_mode); @@ -99,10 +99,10 @@ bool WandTransferData(uint64_t ingest_mode) { Error CheckData(uint64_t ingest_mode, const EventHeader& event_header, const FileData* data) { if (WandTransferData(ingest_mode)) { if (*data == nullptr) { - return ProducerErrorTemplates::kNoData.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("need data for this ingest mode"); } if (event_header.file_size == 0) { - return ProducerErrorTemplates::kZeroDataSize.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("zero data size"); } } return nullptr; @@ -120,7 +120,7 @@ Error ProducerImpl::SendData(const EventHeader& event_header, FileData data, Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, RequestCallback callback) { if (full_path.empty()) { - return ProducerErrorTemplates::kEmptyFileName.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("empty filename"); } return Send(event_header, nullptr, std::move(full_path), ingest_mode, callback, true); @@ -143,7 +143,7 @@ Error ProducerImpl::SetCredentials(SourceCredentials source_cred) { if (!source_cred_string_.empty()) { log__->Error("credentials already set"); - return ProducerErrorTemplates::kCredentialsAlreadySet.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("credentials already set"); } if (source_cred.stream.empty()) { @@ -154,7 +154,7 @@ Error ProducerImpl::SetCredentials(SourceCredentials source_cred) { if (source_cred_string_.size() + source_cred.user_token.size() > kMaxMessageSize) { log__->Error("credentials string is too long - " + source_cred_string_); source_cred_string_ = ""; - return ProducerErrorTemplates::kCredentialsTooLong.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("credentials string is too long"); } return nullptr; diff --git a/producer/api/cpp/src/producer_request.cpp b/producer/api/cpp/src/producer_request.cpp index 1c5527459..c780c09b1 100644 --- a/producer/api/cpp/src/producer_request.cpp +++ b/producer/api/cpp/src/producer_request.cpp @@ -2,13 +2,11 @@ namespace asapo { -Error ProducerRequest::ReadDataFromFileIfNeeded(const IO* io) { +bool ProducerRequest::DataFromFile() const { if (data != nullptr || original_filepath.empty() || !NeedSendData()) { - return nullptr; + return false; } - Error err; - data = io->GetDataFromFile(original_filepath, &header.data_size, &err); - return err; + return true; } ProducerRequest::ProducerRequest(std::string source_credentials, diff --git a/producer/api/cpp/src/producer_request.h b/producer/api/cpp/src/producer_request.h index 29030fa9b..25d6fe973 100644 --- a/producer/api/cpp/src/producer_request.h +++ b/producer/api/cpp/src/producer_request.h @@ -23,7 +23,7 @@ class ProducerRequest : public GenericRequest { std::string original_filepath; RequestCallback callback; bool manage_data_memory; - Error ReadDataFromFileIfNeeded(const IO* io); + bool DataFromFile() const; bool NeedSendData() const; bool NeedSendMetaData() const; }; diff --git a/producer/api/cpp/src/request_handler_filesystem.cpp b/producer/api/cpp/src/request_handler_filesystem.cpp index ac49aea6e..580e5c380 100644 --- a/producer/api/cpp/src/request_handler_filesystem.cpp +++ b/producer/api/cpp/src/request_handler_filesystem.cpp @@ -16,12 +16,14 @@ RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folde thread_id_{thread_id} { } -Error RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request) { +bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request) { auto producer_request = static_cast<ProducerRequest*>(request); - - auto err = producer_request->ReadDataFromFileIfNeeded(io__.get()); - if (err) { - return err; + Error err; + if (producer_request->DataFromFile()) { + auto data = io__->GetDataFromFile(producer_request->original_filepath, &producer_request->header.data_size, &err); + if (err) { + return false; + } } err = io__->WriteDataToFile(destination_folder_, request->header.message, (uint8_t*)producer_request->data.get(), @@ -29,7 +31,7 @@ Error RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request) if (producer_request->callback) { producer_request->callback(request->header, std::move(err)); } - return nullptr; + return true; } diff --git a/producer/api/cpp/src/request_handler_filesystem.h b/producer/api/cpp/src/request_handler_filesystem.h index 907d0c1ac..affdb3fab 100644 --- a/producer/api/cpp/src/request_handler_filesystem.h +++ b/producer/api/cpp/src/request_handler_filesystem.h @@ -17,12 +17,12 @@ namespace asapo { class RequestHandlerFilesystem: public RequestHandler { public: explicit RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id); - Error ProcessRequestUnlocked(GenericRequest* request) override; + bool ProcessRequestUnlocked(GenericRequest* request) override; bool ReadyProcessRequest() override { return true; }; void PrepareProcessingRequestLocked() override {}; - void TearDownProcessingRequestLocked(const Error& error_from_process) override {}; + void TearDownProcessingRequestLocked(bool processing_succeeded) override {}; virtual ~RequestHandlerFilesystem() = default; std::unique_ptr<IO> io__; diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index 4a45d7b15..274ca59e5 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -21,7 +21,7 @@ Error RequestHandlerTcp::Authorize(const std::string& beamtime_id) { if(err) { return err; } - return ReceiveResponse(); + return ReceiveResponse(header); } @@ -64,7 +64,11 @@ Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) { } if (request->NeedSendData()) { - io__->Send(sd_, (void*) request->data.get(), (size_t)request->header.data_size, &io_error); + if (request->DataFromFile()) { + io_error = io__->SendFile(sd_, request->original_filepath, (size_t)request->header.data_size); + } else { + io__->Send(sd_, (void*) request->data.get(), (size_t)request->header.data_size, &io_error); + } if (io_error) { return io_error; } @@ -73,7 +77,7 @@ Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) { return nullptr; } -Error RequestHandlerTcp::ReceiveResponse() { +Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_header) { Error err; SendDataResponse sendDataResponse; io__->Receive(sd_, &sendDataResponse, sizeof(sendDataResponse), &err); @@ -82,14 +86,15 @@ Error RequestHandlerTcp::ReceiveResponse() { } switch (sendDataResponse.error_code) { case kNetErrorFileIdAlreadyInUse : - return ProducerErrorTemplates::kFileIdAlreadyInUse.Generate(); + return ProducerErrorTemplates::kWrongInput.Generate("file id already in use: " + std::to_string( + request_header.data_id)); case kNetAuthorizationError : { - auto res_err = ProducerErrorTemplates::kAuthorizationFailed.Generate(); + auto res_err = ProducerErrorTemplates::kWrongInput.Generate("authorization failed"); res_err->Append(sendDataResponse.message); return res_err; } case kNetErrorErrorInMetadata : { - auto res_err = ProducerErrorTemplates::kErrorInMetadata.Generate(); + auto res_err = ProducerErrorTemplates::kWrongInput.Generate(); res_err->Append(sendDataResponse.message); return res_err; } @@ -108,7 +113,7 @@ Error RequestHandlerTcp::TrySendToReceiver(const ProducerRequest* request) { return err; } - err = ReceiveResponse(); + err = ReceiveResponse(request->header); if (err) { return err; } @@ -173,11 +178,12 @@ void RequestHandlerTcp::Disconnect() { } bool RequestHandlerTcp::ServerError(const Error& err) { - return err != nullptr && (err != ProducerErrorTemplates::kFileIdAlreadyInUse && - err != ProducerErrorTemplates::kErrorInMetadata); + return err != nullptr && (err != ProducerErrorTemplates::kWrongInput && + err != ProducerErrorTemplates::kLocalIOError + ); } -Error RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request) { +bool RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request) { for (auto receiver_uri : receivers_list_) { if (Disconnected()) { auto err = ConnectToReceiver(request->source_credentials, receiver_uri); @@ -185,33 +191,29 @@ Error RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request) { } auto err = TrySendToReceiver(request); - if (ServerError(err)) { + if (err) { Disconnect(); log__->Warning("cannot send data, opcode: " + std::to_string(request->header.op_code) + ", id: " + std::to_string(request->header.data_id) + " to " + receiver_uri + ": " + err->Explain()); + } + if (ServerError(err)) { continue; } if (request->callback) { request->callback(request->header, std::move(err)); } - return nullptr; + return true; } - return ProducerErrorTemplates::kCannotSendDataToReceivers.Generate(); + log__->Warning("put back to the queue, request opcode: " + std::to_string(request->header.op_code) + + ", id: " + std::to_string(request->header.data_id)); + return false; } -Error RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request) { +bool RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request) { auto producer_request = static_cast<ProducerRequest*>(request); - auto err = producer_request->ReadDataFromFileIfNeeded(io__.get()); - if (err) { - if (producer_request->callback) { - producer_request->callback(producer_request->header, std::move(err)); - } - return nullptr; - } - if (NeedRebalance()) { CloseConnectionToPeformRebalance(); } @@ -234,11 +236,10 @@ void RequestHandlerTcp::PrepareProcessingRequestLocked() { UpdateIfNewConnection(); } -void RequestHandlerTcp::TearDownProcessingRequestLocked(const Error& error_from_process) { - if (error_from_process) { +void RequestHandlerTcp::TearDownProcessingRequestLocked(bool processing_succeeded) { + if (!processing_succeeded) { (*ncurrent_connections_)--; } - } } diff --git a/producer/api/cpp/src/request_handler_tcp.h b/producer/api/cpp/src/request_handler_tcp.h index b55dd23ec..311579b79 100644 --- a/producer/api/cpp/src/request_handler_tcp.h +++ b/producer/api/cpp/src/request_handler_tcp.h @@ -19,10 +19,10 @@ namespace asapo { class RequestHandlerTcp: public RequestHandler { public: explicit RequestHandlerTcp(ReceiverDiscoveryService* discovery_service, uint64_t thread_id, uint64_t* shared_counter); - Error ProcessRequestUnlocked(GenericRequest* request) override; + bool ProcessRequestUnlocked(GenericRequest* request) override; bool ReadyProcessRequest() override; void PrepareProcessingRequestLocked() override; - void TearDownProcessingRequestLocked(const Error& error_from_process) override; + void TearDownProcessingRequestLocked(bool processing_succeeded) override; virtual ~RequestHandlerTcp() = default; std::unique_ptr<IO> io__; @@ -31,9 +31,9 @@ class RequestHandlerTcp: public RequestHandler { private: Error Authorize(const std::string& beamtime_id); Error ConnectToReceiver(const std::string& beamtime_id, const std::string& receiver_address); - Error SendDataToOneOfTheReceivers(ProducerRequest* request); + bool SendDataToOneOfTheReceivers(ProducerRequest* request); Error SendRequestContent(const ProducerRequest* request); - Error ReceiveResponse(); + Error ReceiveResponse(const GenericRequestHeader& request_header); Error TrySendToReceiver(const ProducerRequest* request); SocketDescriptor sd_{kDisconnectedSocketDescriptor}; void UpdateIfNewConnection(); diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp index c116c046e..7c640338f 100644 --- a/producer/api/cpp/unittests/test_producer.cpp +++ b/producer/api/cpp/unittests/test_producer.cpp @@ -26,7 +26,7 @@ TEST(CreateProducer, ErrorBeamtime) { std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, SourceCredentials{expected_beamtimeid, "", ""}, &err); ASSERT_THAT(producer, Eq(nullptr)); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCredentialsTooLong)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 78d5ddbc8..dbaeb7f27 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -102,42 +102,42 @@ TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::EventHeader event_header{1, 1, long_string}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfFileEmpty) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::EventHeader event_header{1, 1, ""}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) { - EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset size"))); + EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset dimensions"))); asapo::EventHeader event_header{1, 1000, "test", "", 1}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kErrorSubsetSize)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfZeroDataSize) { asapo::FileData data = asapo::FileData{new uint8_t[100] }; asapo::EventHeader event_header{1, 0, expected_fullpath}; auto err = producer.SendData(event_header, std::move(data), asapo::kDefaultIngestMode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kZeroDataSize)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfNoData) { asapo::EventHeader event_header{1, 100, expected_fullpath}; auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kNoData)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfNoDataSend_) { asapo::EventHeader event_header{1, 100, expected_fullpath}; auto err = producer.SendData_(event_header, nullptr, asapo::kDefaultIngestMode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kNoData)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -269,7 +269,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { asapo::EventHeader event_header{expected_id, 0, expected_name}; auto err = producer.SendFile(event_header, "", expected_ingest_mode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -282,7 +282,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) { asapo::EventHeader event_header{expected_id, 0, ""}; auto err = producer.SendFile(event_header, expected_fullpath, expected_ingest_mode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -314,7 +314,7 @@ TEST_F(ProducerImplTests, ErrorSettingBeamtime) { auto err = producer.SetCredentials(expected_credentials); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCredentialsTooLong)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorSettingSecondTime) { @@ -323,7 +323,7 @@ TEST_F(ProducerImplTests, ErrorSettingSecondTime) { producer.SetCredentials(asapo::SourceCredentials{"1", "2", "3"}); auto err = producer.SetCredentials(asapo::SourceCredentials{"4", "5", "6"}); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCredentialsAlreadySet)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { @@ -339,8 +339,8 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { auto err_null = producer.SendFile(event_header, expected_fullpath, ingest_mode, nullptr); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongIngestMode)); - ASSERT_THAT(err_null, Eq(asapo::ProducerErrorTemplates::kWrongIngestMode)); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); + ASSERT_THAT(err_null, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp index 41359b460..e36fbbc8e 100644 --- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp @@ -104,11 +104,11 @@ TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { ); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(callback_err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); ASSERT_THAT(called, Eq(true)); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); } TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { @@ -119,10 +119,10 @@ TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { ); - auto err = request_handler.ProcessRequestUnlocked(&request_nocallback); + auto success = request_handler.ProcessRequestUnlocked(&request_nocallback); ASSERT_THAT(called, Eq(false)); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); } @@ -135,8 +135,8 @@ TEST_F(RequestHandlerFilesystemTests, FileRequestErrorOnReadData) { Return(nullptr) )); - auto err = request_handler.ProcessRequestUnlocked(&request_filesend); - ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(success, Eq(false)); } TEST_F(RequestHandlerFilesystemTests, FileRequestOK) { @@ -154,8 +154,8 @@ TEST_F(RequestHandlerFilesystemTests, FileRequestOK) { Return(nullptr) ); - auto err = request_handler.ProcessRequestUnlocked(&request_filesend); - ASSERT_THAT(err, Eq(nullptr)); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(success, Eq(true)); } @@ -169,9 +169,9 @@ TEST_F(RequestHandlerFilesystemTests, TransferOK) { ); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); ASSERT_THAT(callback_err, Eq(nullptr)); ASSERT_THAT(called, Eq(true)); ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index 88634007e..649937735 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -110,6 +110,8 @@ class RequestHandlerTcpTests : public testing::Test { void ExpectOKSend(uint64_t expected_size, bool only_once); void ExpectOKSendAll(bool only_once); void ExpectOKSendData(bool only_once = false); + void ExpectOKSendFile(bool only_once = false); + void ExpectFailSendFile(const asapo::ProducerErrorTemplate& err_template, bool only_once = false); void ExpectOKSendMetaData(bool only_once = false); void ExpectFailReceive(bool only_once = false); void ExpectOKReceive(bool only_once = true); @@ -261,9 +263,38 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { if (only_once) break; i++; } + EXPECT_CALL(mock_logger, Warning(HasSubstr("put back"))); +} + +void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTemplate& err_template, bool only_once) { + int i = 0; + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t) expected_file_size)) + .Times(1) + .WillOnce( + Return(err_template.Generate().release()) + ); + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("disconnected"), + HasSubstr(receivers_list[i]) + ) + )); + + EXPECT_CALL(mock_logger, Warning(AllOf( + HasSubstr("cannot send"), + HasSubstr(receivers_list[i]) ) + )); + EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _)); + if (only_once) break; + i++; + } + if (err_template != asapo::ProducerErrorTemplates::kLocalIOError.Generate()) { + EXPECT_CALL(mock_logger, Warning(HasSubstr("put back"))); + } } + void RequestHandlerTcpTests::ExpectFailSend(uint64_t expected_size, bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { @@ -289,7 +320,7 @@ void RequestHandlerTcpTests::ExpectFailSend(uint64_t expected_size, bool only_on if (only_once) break; i++; } - + EXPECT_CALL(mock_logger, Warning(HasSubstr("put back"))); } @@ -329,6 +360,7 @@ void RequestHandlerTcpTests::ExpectFailReceive(bool only_once) { if (only_once) break; i++; } + EXPECT_CALL(mock_logger, Warning(HasSubstr("put back"))); } @@ -362,7 +394,14 @@ void RequestHandlerTcpTests::ExpectOKSendData(bool only_once) { ExpectOKSend(expected_file_size, only_once); } - +void RequestHandlerTcpTests::ExpectOKSendFile(bool only_once) { + for (auto expected_sd : expected_sds) { + EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t)expected_file_size)) + .Times(1) + .WillOnce(Return(nullptr)); + if (only_once) break; + } +} void RequestHandlerTcpTests::ExpectOKSendHeader(bool only_once, asapo::Opcode opcode) { for (auto expected_sd : expected_sds) { @@ -475,10 +514,9 @@ TEST_F(RequestHandlerTcpTests, DoesNotGetsUriIfAlreadyConnected) { } TEST_F(RequestHandlerTcpTests, ReduceConnectionNumberAtTearDownIfError) { - auto err = asapo::TextError("error"); n_connections = 1; - request_handler.TearDownProcessingRequestLocked(err); + request_handler.TearDownProcessingRequestLocked(false); ASSERT_THAT(n_connections, Eq(0)); @@ -487,7 +525,7 @@ TEST_F(RequestHandlerTcpTests, ReduceConnectionNumberAtTearDownIfError) { TEST_F(RequestHandlerTcpTests, DoNotReduceConnectionNumberAtTearDownIfNoError) { n_connections = 1; - request_handler.TearDownProcessingRequestLocked(nullptr); + request_handler.TearDownProcessingRequestLocked(true); ASSERT_THAT(n_connections, Eq(1)); } @@ -497,9 +535,9 @@ TEST_F(RequestHandlerTcpTests, TriesConnectWhenNotConnected) { ExpectFailConnect(); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(success, Eq(false)); } TEST_F(RequestHandlerTcpTests, FailsWhenCannotAuthorize) { @@ -507,10 +545,10 @@ TEST_F(RequestHandlerTcpTests, FailsWhenCannotAuthorize) { ExpectFailAuthorize(); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); - request_handler.TearDownProcessingRequestLocked(err); + auto success = request_handler.ProcessRequestUnlocked(&request); + request_handler.TearDownProcessingRequestLocked(success); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(success, Eq(false)); ASSERT_THAT(n_connections, Eq(0)); } @@ -527,9 +565,9 @@ TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { ExpectFailSendHeader(true); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(success, Eq(false)); } @@ -541,9 +579,9 @@ TEST_F(RequestHandlerTcpTests, DoNotCloseWhenNotConnected) { ExpectFailSendHeader(); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(success, Eq(false)); } @@ -556,9 +594,9 @@ TEST_F(RequestHandlerTcpTests, CloseConnectionWhenRebalance) { EXPECT_CALL(mock_io, CloseSocket_t(_, _)); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(success, Eq(false)); } @@ -569,9 +607,9 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendHeader) { ExpectFailSendHeader(); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(success, Eq(false)); } @@ -583,9 +621,9 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendData) { ExpectFailSendData(); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(success, Eq(false)); } TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendMetaData) { @@ -595,12 +633,11 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendMetaData) { ExpectFailSendMetaData(); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(success, Eq(false)); } - TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). WillOnce(Return(receivers_list_single)); @@ -611,9 +648,9 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) { ExpectFailReceive(true); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); + ASSERT_THAT(success, Eq(false)); } void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode error_code, @@ -630,25 +667,37 @@ void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode e A_WriteSendDataResponse(error_code), testing::ReturnArg<2>() )); - + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("disconnected"), + HasSubstr(receivers_list[0]) + ) + )); + + EXPECT_CALL(mock_logger, Warning(AllOf( + HasSubstr("cannot send"), + HasSubstr(receivers_list[0]) + ) + )); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); ASSERT_THAT(callback_err, Eq(err_template)); ASSERT_THAT(callback_called, Eq(true)); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); +} + +TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfAuthorizationFailure) { + AssertImmediatelyCallBack(asapo::kNetAuthorizationError, asapo::ProducerErrorTemplates::kWrongInput); } TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { - AssertImmediatelyCallBack(asapo::kNetErrorFileIdAlreadyInUse, asapo::ProducerErrorTemplates::kFileIdAlreadyInUse); + AssertImmediatelyCallBack(asapo::kNetErrorFileIdAlreadyInUse, asapo::ProducerErrorTemplates::kWrongInput); } TEST_F(RequestHandlerTcpTests, ImmediatelyCallBackErrorIfWrongMetadata) { - AssertImmediatelyCallBack(asapo::kNetErrorErrorInMetadata, asapo::ProducerErrorTemplates::kErrorInMetadata); + AssertImmediatelyCallBack(asapo::kNetErrorErrorInMetadata, asapo::ProducerErrorTemplates::kWrongInput); } - - TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ExpectOKConnect(true); ExpectOKAuthorize(true); @@ -656,49 +705,56 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ExpectOKReceive(); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request_nocallback); + auto success = request_handler.ProcessRequestUnlocked(&request_nocallback); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); ASSERT_THAT(callback_called, Eq(false)); } -TEST_F(RequestHandlerTcpTests, FileRequestErrorOnReadData) { +TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithReadError) { + ExpectOKConnect(true); + ExpectOKAuthorize(true); + ExpectOKSendHeader(true); + ExpectOKSendMetaData(true); + ExpectFailSendFile(asapo::ProducerErrorTemplates::kLocalIOError, true); request_handler.PrepareProcessingRequestLocked(); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend); - EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<2>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(nullptr) - )); - - auto err = request_handler.ProcessRequestUnlocked(&request_filesend); - ASSERT_THAT(callback_err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); + ASSERT_THAT(success, Eq(true)); ASSERT_THAT(callback_called, Eq(true)); - ASSERT_THAT(err, Eq(nullptr)); - + ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kLocalIOError)); } +TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithServerError) { + ExpectOKConnect(); + ExpectOKAuthorize(); + ExpectOKSendHeader(); + ExpectOKSendMetaData(); + ExpectFailSendFile(asapo::ProducerErrorTemplates::kInternalServerError); + + request_handler.PrepareProcessingRequestLocked(); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + + ASSERT_THAT(success, Eq(false)); + ASSERT_THAT(callback_called, Eq(false)); +} TEST_F(RequestHandlerTcpTests, FileRequestOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); - ExpectOKSendAll(true); + ExpectOKSendHeader(true); + ExpectOKSendMetaData(true); + ExpectOKSendFile(true); ExpectOKReceive(); request_handler.PrepareProcessingRequestLocked(); - EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) - .WillOnce( - DoAll( - testing::SetArgPointee<2>(nullptr), - Return(nullptr) - )); - - auto err = request_handler.ProcessRequestUnlocked(&request_filesend); - ASSERT_THAT(err, Eq(nullptr)); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(success, Eq(true)); + ASSERT_THAT(callback_called, Eq(true)); + ASSERT_THAT(callback_err, Eq(nullptr)); } @@ -710,9 +766,9 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKReceive(); request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); ASSERT_THAT(callback_err, Eq(nullptr)); ASSERT_THAT(callback_called, Eq(true)); ASSERT_THAT(callback_header.data_size, Eq(header.data_size)); @@ -734,9 +790,9 @@ TEST_F(RequestHandlerTcpTests, SendMetadataIgnoresIngestMode) { request.header.op_code = asapo::kOpcodeTransferMetaData; request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); } @@ -751,9 +807,9 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) { request.header.custom_data[asapo::kPosIngestMode] = ingest_mode; request_handler.PrepareProcessingRequestLocked(); - auto err = request_handler.ProcessRequestUnlocked(&request); + auto success = request_handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); ASSERT_THAT(callback_header.custom_data[asapo::kPosIngestMode], Eq(ingest_mode)); } @@ -766,13 +822,13 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) { request_handler.PrepareProcessingRequestLocked(); - EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0); + EXPECT_CALL(mock_io, SendFile_t(_, _, _)).Times(0); auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly; request_filesend.header.custom_data[asapo::kPosIngestMode] = ingest_mode; - auto err = request_handler.ProcessRequestUnlocked(&request_filesend); - ASSERT_THAT(err, Eq(nullptr)); + auto success = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(success, Eq(true)); } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 0d1831147..350e60010 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -18,21 +18,10 @@ cdef extern from "asapo_producer.h" namespace "asapo": cdef bool operator==(Error lhs, ErrorTemplateInterface rhs) cdef extern from "asapo_producer.h" namespace "asapo": - ErrorTemplateInterface kFileTooLarge "asapo::ProducerErrorTemplates::kFileTooLarge" - ErrorTemplateInterface kFileNameTooLong "asapo::ProducerErrorTemplates::kFileNameTooLong" - ErrorTemplateInterface kEmptyFileName "asapo::ProducerErrorTemplates::kEmptyFileName" - ErrorTemplateInterface kNoData "asapo::ProducerErrorTemplates::kNoData" - ErrorTemplateInterface kZeroDataSize "asapo::ProducerErrorTemplates::kZeroDataSize" - ErrorTemplateInterface kBeamtimeIdTooLong "asapo::ProducerErrorTemplates::kBeamtimeIdTooLong" - ErrorTemplateInterface kBeamtimeAlreadySet "asapo::ProducerErrorTemplates::kBeamtimeAlreadySet" - ErrorTemplateInterface kFileIdAlreadyInUse "asapo::ProducerErrorTemplates::kFileIdAlreadyInUse" - ErrorTemplateInterface kErrorInMetadata "asapo::ProducerErrorTemplates::kErrorInMetadata" - ErrorTemplateInterface kErrorSubsetSize "asapo::ProducerErrorTemplates::kErrorSubsetSize" - ErrorTemplateInterface kAuthorizationFailed "asapo::ProducerErrorTemplates::kAuthorizationFailed" ErrorTemplateInterface kInternalServerError "asapo::ProducerErrorTemplates::kInternalServerError" ErrorTemplateInterface kCannotSendDataToReceivers "asapo::ProducerErrorTemplates::kCannotSendDataToReceivers" ErrorTemplateInterface kRequestPoolIsFull "asapo::ProducerErrorTemplates::kRequestPoolIsFull" - ErrorTemplateInterface kWrongIngestMode "asapo::ProducerErrorTemplates::kWrongIngestMode" + ErrorTemplateInterface kWrongInput "asapo::ProducerErrorTemplates::kWrongInput" cdef extern from "asapo_producer.h" namespace "asapo": cppclass FileData: diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp index 6c3b5d16f..1254981a8 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp @@ -46,25 +46,25 @@ void* ReceiverDataServerRequestHandler::GetSlot(const ReceiverDataServerRequest* } -Error ReceiverDataServerRequestHandler::ProcessRequestUnlocked(GenericRequest* request) { +bool ReceiverDataServerRequestHandler::ProcessRequestUnlocked(GenericRequest* request) { auto receiver_request = dynamic_cast<ReceiverDataServerRequest*>(request); if (!CheckRequest(receiver_request)) { SendResponce(receiver_request, kNetErrorWrongRequest); server_->HandleAfterError(receiver_request->source_id); log__->Error("wrong request, code:" + std::to_string(receiver_request->header.op_code)); - return nullptr; + return true; } CacheMeta* meta; auto buf = GetSlot(receiver_request, &meta); if (buf == nullptr) { - return nullptr; + return true; } SendData(receiver_request, buf, meta); statistics__->IncreaseRequestCounter(); statistics__->IncreaseRequestDataVolume(receiver_request->header.data_size); - return nullptr; + return true; } bool ReceiverDataServerRequestHandler::ReadyProcessRequest() { @@ -75,7 +75,7 @@ void ReceiverDataServerRequestHandler::PrepareProcessingRequestLocked() { // do nothing } -void ReceiverDataServerRequestHandler::TearDownProcessingRequestLocked(const Error& error_from_process) { +void ReceiverDataServerRequestHandler::TearDownProcessingRequestLocked(bool processing_succeeded) { // do nothing } diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.h b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h index de93020a2..e5aabd418 100644 --- a/receiver/src/receiver_data_server/receiver_data_server_request_handler.h +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h @@ -13,10 +13,10 @@ namespace asapo { class ReceiverDataServerRequestHandler: public RequestHandler { public: explicit ReceiverDataServerRequestHandler(const NetServer* server, DataCache* data_cache, Statistics* statistics); - Error ProcessRequestUnlocked(GenericRequest* request) override; + bool ProcessRequestUnlocked(GenericRequest* request) override; bool ReadyProcessRequest() override; void PrepareProcessingRequestLocked() override; - void TearDownProcessingRequestLocked(const Error& error_from_process) override; + void TearDownProcessingRequestLocked(bool processing_succeeded) override; const AbstractLogger* log__; Statistics* statistics__; private: diff --git a/receiver/unittests/receiver_data_server/test_request_handler.cpp b/receiver/unittests/receiver_data_server/test_request_handler.cpp index 48ca98b97..6b56beb2b 100644 --- a/receiver/unittests/receiver_data_server/test_request_handler.cpp +++ b/receiver/unittests/receiver_data_server/test_request_handler.cpp @@ -97,18 +97,18 @@ TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) { EXPECT_CALL(mock_logger, Error(HasSubstr("wrong request"))); - auto err = handler.ProcessRequestUnlocked(&request); + auto success = handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); } TEST_F(RequestHandlerTests, ProcessRequestReturnsNoDataWhenCacheNotUsed) { MockSendResponce(asapo::kNetErrorNoData, true); - auto err = handler_no_cache.ProcessRequestUnlocked(&request); + auto success = handler_no_cache.ProcessRequestUnlocked(&request); EXPECT_CALL(mock_logger, Debug(_)).Times(0); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); } TEST_F(RequestHandlerTests, ProcessRequestReadSlotReturnsNull) { @@ -116,9 +116,9 @@ TEST_F(RequestHandlerTests, ProcessRequestReadSlotReturnsNull) { MockSendResponce(asapo::kNetErrorNoData, true); EXPECT_CALL(mock_logger, Debug(HasSubstr("not found"))); - auto err = handler.ProcessRequestUnlocked(&request); + auto success = handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); } @@ -128,9 +128,9 @@ TEST_F(RequestHandlerTests, ProcessRequestReadSlotErrorSendingResponce) { EXPECT_CALL(mock_net, SendData_t(expected_source_id, &tmp, expected_data_size)).Times(0); EXPECT_CALL(mock_cache, UnlockSlot(_)); - auto err = handler.ProcessRequestUnlocked(&request); + auto success = handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); } @@ -145,9 +145,9 @@ TEST_F(RequestHandlerTests, ProcessRequestOk) { EXPECT_CALL(mock_logger, Debug(HasSubstr("sending"))); EXPECT_CALL(mock_stat, IncreaseRequestCounter_t()); EXPECT_CALL(mock_stat, IncreaseRequestDataVolume_t(expected_data_size)); - auto err = handler.ProcessRequestUnlocked(&request); + auto success = handler.ProcessRequestUnlocked(&request); - ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(success, Eq(true)); } } -- GitLab