From 2ca363d6067fd91ad1a614fd3a6b27fb695886f7 Mon Sep 17 00:00:00 2001 From: Carsten Patzke <carsten.patzke@desy.de> Date: Tue, 1 Sep 2020 15:08:37 +0200 Subject: [PATCH] lazy initialisation of DataBroker's net_client (Try fabric first and fallback to TCP) --- common/cpp/include/common/networking.h | 2 +- consumer/api/cpp/CMakeLists.txt | 1 + .../api/cpp/include/consumer/data_broker.h | 42 ++++++----- consumer/api/cpp/src/data_broker.cpp | 11 ++- .../api/cpp/src/fabric_consumer_client.cpp | 5 +- consumer/api/cpp/src/rds_response_error.h | 54 ++++++++++++++ consumer/api/cpp/src/server_data_broker.cpp | 73 +++++++++++++------ consumer/api/cpp/src/server_data_broker.h | 10 ++- consumer/api/cpp/src/tcp_client.cpp | 26 ++++--- .../cpp/unittests/test_rds_error_mapper.cpp | 16 ++++ .../api/cpp/unittests/test_server_broker.cpp | 61 ++++++++-------- examples/producer/simple-producer/produce.cpp | 4 +- producer/api/cpp/include/producer/common.h | 4 +- producer/api/cpp/include/producer/producer.h | 18 ++--- producer/api/cpp/src/producer_impl.cpp | 41 ++++++----- producer/api/cpp/src/producer_impl.h | 4 +- .../cpp/src/request_handler_filesystem.cpp | 2 +- producer/api/cpp/src/request_handler_tcp.cpp | 21 +++--- producer/api/cpp/src/request_handler_tcp.h | 4 +- producer/api/cpp/unittests/mocking.h | 6 +- .../api/cpp/unittests/test_producer_impl.cpp | 46 ++++++------ .../test_request_handler_filesystem.cpp | 4 +- .../unittests/test_request_handler_tcp.cpp | 6 +- producer/api/python/asapo_wrappers.h | 2 +- receiver/src/request.h | 4 +- .../request_handler_db_stream_info.cpp | 8 +- .../request_handler_db_stream_info.h | 6 +- .../request_handler_db_write.cpp | 2 +- .../request_handler_file_process.cpp | 4 +- .../request_handler/requests_dispatcher.cpp | 2 +- receiver/unittests/receiver_mocking.h | 4 +- .../test_request_handler_db_stream_info.cpp | 16 ++-- .../test_request_handler_db_writer.cpp | 2 +- .../test_request_handler_file_process.cpp | 4 +- .../test_requests_dispatcher.cpp | 4 +- receiver/unittests/test_request.cpp | 4 +- .../consumer/consumer_api/consumer_api.cpp | 10 +-- .../insert_retrieve_mongodb.cpp | 4 +- 38 files changed, 335 insertions(+), 202 deletions(-) create mode 100644 consumer/api/cpp/src/rds_response_error.h create mode 100644 consumer/api/cpp/unittests/test_rds_error_mapper.cpp diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 733b3d848..e89f2077f 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -70,7 +70,7 @@ struct GenericRequestHeader { uint64_t data_size; uint64_t meta_size; CustomRequestData custom_data; - char message[kMaxMessageSize]; /* Can also be a binary message (e.g. MemoryRegionDetails)*/ + char message[kMaxMessageSize]; /* Can also be a binary message (e.g. MemoryRegionDetails) */ char substream[kMaxMessageSize]; /* Must be a string (strcpy is used) */ std::string Json() { std::string s = "{\"id\":" + std::to_string(data_id) + "," diff --git a/consumer/api/cpp/CMakeLists.txt b/consumer/api/cpp/CMakeLists.txt index 9e3128467..ae5ce7be2 100644 --- a/consumer/api/cpp/CMakeLists.txt +++ b/consumer/api/cpp/CMakeLists.txt @@ -35,6 +35,7 @@ set(TEST_SOURCE_FILES unittests/test_tcp_client.cpp unittests/test_tcp_connection_pool.cpp unittests/test_fabric_consumer_client.cpp + unittests/test_rds_error_mapper.cpp ) set(TEST_LIBRARIES "${TARGET_NAME}") diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h index 7cb765518..1c00b3008 100644 --- a/consumer/api/cpp/include/consumer/data_broker.h +++ b/consumer/api/cpp/include/consumer/data_broker.h @@ -35,22 +35,28 @@ class DataBroker { - //! Get unacknowledged tuple for specific group id and substream. - /*! - \param group_id - group id to use. - \param substream (optional) - substream - \param from_id - return tuples with ids greater or equal to from (use 0 disable limit) - \param to_id - return tuples with ids less or equal to to (use 0 to disable limit) - \param in (optional) - substream - \param err - set to nullptr of operation succeed, error otherwise. - \return vector of ids, might be empty - */ - virtual IdList GetUnacknowledgedTupleIds(std::string group_id, std::string substream, uint64_t from_id, uint64_t to_id, Error* error) = 0; + //! Get unacknowledged tuple for specific group id and substream. + /*! + \param group_id - group id to use. + \param substream (optional) - substream + \param from_id - return tuples with ids greater or equal to from (use 0 disable limit) + \param to_id - return tuples with ids less or equal to to (use 0 to disable limit) + \param in (optional) - substream + \param err - set to nullptr of operation succeed, error otherwise. + \return vector of ids, might be empty + */ + virtual IdList GetUnacknowledgedTupleIds(std::string group_id, std::string substream, uint64_t from_id, uint64_t to_id, + Error* error) = 0; virtual IdList GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) = 0; //! Set timeout for broker operations. Default - no timeout virtual void SetTimeout(uint64_t timeout_ms) = 0; + //! Will disable RDMA. + //! If RDMA is disabled, not available or the first connection fails to build up, it will automatically fall back to TCP. + //! This will only have an effect if no previous connection attempted was made on this DataBroker. + virtual void ForceNoRdma() = 0; + //! Set list of substreams virtual std::vector<std::string> GetSubstreamList(Error* err) = 0; @@ -134,13 +140,13 @@ class DataBroker { virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) = 0; virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, std::string substream, FileData* data) = 0; - //! Receive id of last acknowledged data tuple - /*! - \param group_id - group id to use. - \param substream (optional) - substream - \param err - will be set in case of error, nullptr otherwise. - \return id of the last acknowledged image, 0 if error - */ + //! Receive id of last acknowledged data tuple + /*! + \param group_id - group id to use. + \param substream (optional) - substream + \param err - will be set in case of error, nullptr otherwise. + \return id of the last acknowledged image, 0 if error + */ virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) = 0; virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, Error* error) = 0; diff --git a/consumer/api/cpp/src/data_broker.cpp b/consumer/api/cpp/src/data_broker.cpp index 482cdcf40..64d4ac8e9 100644 --- a/consumer/api/cpp/src/data_broker.cpp +++ b/consumer/api/cpp/src/data_broker.cpp @@ -39,8 +39,15 @@ std::unique_ptr<DataBroker> DataBrokerFactory::CreateServerBroker(std::string se return nullptr; } - return Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), has_filesystem, - std::move(source), networkConnectionType); + auto broker = Create<ServerDataBroker>(std::move(server_name), error, std::move(source_path), has_filesystem, + std::move(source)); + + // TODO: This is just here to test the old code. Will be removed with the next commit. + if (networkConnectionType == NetworkConnectionType::kAsapoTcp) { + broker->ForceNoRdma(); + } + + return broker; } diff --git a/consumer/api/cpp/src/fabric_consumer_client.cpp b/consumer/api/cpp/src/fabric_consumer_client.cpp index 50c9eee6c..14c75775e 100644 --- a/consumer/api/cpp/src/fabric_consumer_client.cpp +++ b/consumer/api/cpp/src/fabric_consumer_client.cpp @@ -2,6 +2,7 @@ #include <io/io_factory.h> #include <iostream> #include "fabric_consumer_client.h" +#include "rds_response_error.h" using namespace asapo; @@ -41,7 +42,7 @@ Error FabricConsumerClient::GetData(const FileInfo* info, FileData* data) { } if (response.error_code) { - return TextError("Response NetworkErrorCode " + std::to_string(response.error_code)); + return ConvertRdsResponseToError(response.error_code); } data->swap(tempData); @@ -74,7 +75,7 @@ void FabricConsumerClient::PerformNetworkTransfer(fabric::FabricAddress address, return; } - /* The server is sending us the data over RDMA, and then sending us a confirmation */ + /* The server is _now_ sending us the data over RDMA, and then sending us a confirmation */ client__->Recv(address, currentMessageId, response, sizeof(*response), err); // if (*err) ... diff --git a/consumer/api/cpp/src/rds_response_error.h b/consumer/api/cpp/src/rds_response_error.h new file mode 100644 index 000000000..36b417c69 --- /dev/null +++ b/consumer/api/cpp/src/rds_response_error.h @@ -0,0 +1,54 @@ +#ifndef ASAPO_RDS_RESPONSE_ERROR_H +#define ASAPO_RDS_RESPONSE_ERROR_H + +#include <common/networking.h> + +namespace asapo { + +using RdsResponseError = ServiceError<NetworkErrorCode, ErrorType::kFabricError>; +using RdsResponseErrorTemplate = ServiceErrorTemplate<NetworkErrorCode, ErrorType::kFabricError>; + +namespace RdsResponseErrorTemplates { +auto const kNetErrorReauthorize = RdsResponseErrorTemplate { + "RDS response Reauthorize", NetworkErrorCode::kNetErrorReauthorize +}; +auto const kNetErrorWarning = RdsResponseErrorTemplate { + "RDS response Warning", NetworkErrorCode::kNetErrorWarning +}; +auto const kNetErrorWrongRequest = RdsResponseErrorTemplate { + "RDS response WrongRequest", NetworkErrorCode::kNetErrorWrongRequest +}; +auto const kNetErrorNoData = RdsResponseErrorTemplate { + "RDS response NoData", NetworkErrorCode::kNetErrorNoData +}; +auto const kNetAuthorizationError = RdsResponseErrorTemplate { + "RDS response AuthorizationError", NetworkErrorCode::kNetAuthorizationError +}; +auto const kNetErrorInternalServerError = RdsResponseErrorTemplate { + "RDS response InternalServerError", NetworkErrorCode::kNetErrorInternalServerError +}; +} + +inline Error ConvertRdsResponseToError(NetworkErrorCode error_code) { + switch (error_code) { + case kNetErrorNoError: + return nullptr; + case kNetErrorReauthorize: + return RdsResponseErrorTemplates::kNetErrorReauthorize.Generate(); + case kNetErrorWarning: + return RdsResponseErrorTemplates::kNetErrorWarning.Generate(); + case kNetErrorWrongRequest: + return RdsResponseErrorTemplates::kNetErrorWrongRequest.Generate(); + case kNetErrorNoData: + return RdsResponseErrorTemplates::kNetErrorNoData.Generate(); + case kNetAuthorizationError: + return RdsResponseErrorTemplates::kNetAuthorizationError.Generate(); + case kNetErrorInternalServerError: + return RdsResponseErrorTemplates::kNetErrorInternalServerError.Generate(); + default: + return TextError("Unknown RDS response code " + std::to_string(error_code)); + } +} +} + +#endif //ASAPO_RDS_RESPONSE_ERROR_H diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index a656fdd55..6bb5d9d90 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -10,6 +10,7 @@ #include "asapo_consumer.h" #include "fabric_consumer_client.h" +#include "rds_response_error.h" using std::chrono::system_clock; @@ -86,19 +87,12 @@ Error ProcessRequestResponce(const Error& server_err, const RequestOutput* respo ServerDataBroker::ServerDataBroker(std::string server_uri, std::string source_path, bool has_filesystem, - SourceCredentials source, - NetworkConnectionType networkType) : + SourceCredentials source) : io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()}, endpoint_{std::move(server_uri)}, source_path_{std::move(source_path)}, has_filesystem_{has_filesystem}, source_credentials_(std::move(source)) { - switch (networkType) { - case NetworkConnectionType::kAsapoTcp: - net_client__.reset(new TcpClient()); - break; - case NetworkConnectionType::kFabric: - net_client__.reset(new FabricConsumerClient()); - break; - } + + // net_client__ will be lazy initialized if (source_credentials_.stream.empty()) { source_credentials_.stream = SourceCredentials::kDefaultStream; @@ -110,6 +104,10 @@ void ServerDataBroker::SetTimeout(uint64_t timeout_ms) { timeout_ms_ = timeout_ms; } +void ServerDataBroker::ForceNoRdma() { + should_try_rdma_first_ = false; +} + std::string ServerDataBroker::RequestWithToken(std::string uri) { return std::move(uri) + "?token=" + source_credentials_.user_token; } @@ -348,8 +346,38 @@ bool ServerDataBroker::DataCanBeInBuffer(const FileInfo* info) { return info->buf_id > 0; } + + Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* data) { - return net_client__->GetData(info, data); + Error error; + if (!net_client__) { + const std::lock_guard<std::mutex> lock(net_client_mutex__); + if (!net_client__) { + if (should_try_rdma_first_) { // This will check if a rdma connection can be made and will return early if so + auto fabricClient = std::unique_ptr<NetClient>(new FabricConsumerClient()); + + error = fabricClient->GetData(info, data); + + // Check if the error comes from the receiver data server (so a connection was made) + if (!error || error == RdsResponseErrorTemplates::kNetErrorNoData) { + net_client__.swap(fabricClient); + return error; // Successfully received data and is now using a fabric client + } + + // Retry with TCP + should_try_rdma_first_ = false; + error = nullptr; + } + + if (!should_try_rdma_first_) { + net_client__.reset(new TcpClient()); + // If we use tcp, we can fall thought and use the normal GetData code + } + } + } + + error = net_client__->GetData(info, data); + return error; } std::string ServerDataBroker::GenerateNewGroupId(Error* err) { @@ -637,8 +665,8 @@ Error ServerDataBroker::GetDataFromFileTransferService(const FileInfo* info, Fil Error ServerDataBroker::Acknowledge(std::string group_id, uint64_t id, std::string substream) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + - +"/" + std::move(substream) + - "/" + std::move(group_id) + "/" + std::to_string(id); + +"/" + std::move(substream) + + "/" + std::move(group_id) + "/" + std::to_string(id); ri.post = true; ri.body = "{\"Op\":\"Acknowledge\"}"; @@ -647,12 +675,13 @@ Error ServerDataBroker::Acknowledge(std::string group_id, uint64_t id, std::stri return err; } -IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, std::string substream, uint64_t from_id, uint64_t to_id, Error* error) { +IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, std::string substream, uint64_t from_id, + uint64_t to_id, Error* error) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + - +"/" + std::move(substream) + - "/" + std::move(group_id) + "/nacks"; - ri.extra_params = "&from=" + std::to_string(from_id)+"&to=" + std::to_string(to_id); + +"/" + std::move(substream) + + "/" + std::move(group_id) + "/nacks"; + ri.extra_params = "&from=" + std::to_string(from_id) + "&to=" + std::to_string(to_id); auto json_string = BrokerRequestWithTimeout(ri, error); if (*error) { @@ -668,15 +697,16 @@ IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, std::st return list; } -IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) { +IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, + Error* error) { return GetUnacknowledgedTupleIds(std::move(group_id), kDefaultSubstream, from_id, to_id, error); } uint64_t ServerDataBroker::GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + - +"/" + std::move(substream) + - "/" + std::move(group_id) + "/lastack"; + +"/" + std::move(substream) + + "/" + std::move(group_id) + "/lastack"; auto json_string = BrokerRequestWithTimeout(ri, error); if (*error) { @@ -690,7 +720,7 @@ uint64_t ServerDataBroker::GetLastAcknowledgedTulpeId(std::string group_id, std: } if (id == 0) { - *error=ConsumerErrorTemplates::kNoData.Generate(); + *error = ConsumerErrorTemplates::kNoData.Generate(); } return id; } @@ -699,4 +729,5 @@ uint64_t ServerDataBroker::GetLastAcknowledgedTulpeId(std::string group_id, Erro return GetLastAcknowledgedTulpeId(std::move(group_id), kDefaultSubstream, error); } + } diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index 8295bf3cf..7a8d1de05 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -2,6 +2,7 @@ #define ASAPO_SERVER_DATA_BROKER_H #include <common/networking.h> +#include <mutex> #include "consumer/data_broker.h" #include "io/io.h" #include "http_client/http_client.h" @@ -53,10 +54,11 @@ Error ConsumerErrorFromNoDataResponse(const std::string& response); class ServerDataBroker final : public asapo::DataBroker { public: explicit ServerDataBroker(std::string server_uri, std::string source_path, bool has_filesystem, - SourceCredentials source, NetworkConnectionType networkType); + SourceCredentials source); Error Acknowledge(std::string group_id, uint64_t id, std::string substream = kDefaultSubstream) override; - IdList GetUnacknowledgedTupleIds(std::string group_id, std::string substream, uint64_t from_id, uint64_t to_id, Error* error) override; + IdList GetUnacknowledgedTupleIds(std::string group_id, std::string substream, uint64_t from_id, uint64_t to_id, + Error* error) override; IdList GetUnacknowledgedTupleIds(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) override; uint64_t GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) override; @@ -84,6 +86,8 @@ class ServerDataBroker final : public asapo::DataBroker { Error GetById(uint64_t id, FileInfo* info, std::string group_id, std::string substream, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override; + void ForceNoRdma() override; + FileInfos QueryImages(std::string query, Error* err) override; FileInfos QueryImages(std::string query, std::string substream, Error* err) override; @@ -103,6 +107,7 @@ class ServerDataBroker final : public asapo::DataBroker { std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch std::unique_ptr<HttpClient> httpclient__; std::unique_ptr<NetClient> net_client__; + std::mutex net_client_mutex__; // Required for the lazy initialization of net_client private: Error GetDataFromFileTransferService(const FileInfo* info, FileData* data, bool retry_with_new_token); Error GetDataFromFile(FileInfo* info, FileData* data); @@ -141,6 +146,7 @@ class ServerDataBroker final : public asapo::DataBroker { bool has_filesystem_; SourceCredentials source_credentials_; uint64_t timeout_ms_ = 0; + bool should_try_rdma_first_ = true; std::string folder_token_; RequestInfo CreateFolderTokenRequest() const; RequestInfo CreateFileTransferRequest(const FileInfo* info) const; diff --git a/consumer/api/cpp/src/tcp_client.cpp b/consumer/api/cpp/src/tcp_client.cpp index 8606943d4..c6bf9b5ae 100644 --- a/consumer/api/cpp/src/tcp_client.cpp +++ b/consumer/api/cpp/src/tcp_client.cpp @@ -1,6 +1,8 @@ #include "tcp_client.h" #include "io/io_factory.h" #include "common/networking.h" +#include "rds_response_error.h" + namespace asapo { TcpClient::TcpClient() : io__{GenerateDefaultIO()}, connection_pool__{new TcpConnectionPool()} { @@ -32,23 +34,25 @@ Error TcpClient::ReconnectAndResendGetDataRequest(SocketDescriptor* sd, const Fi Error TcpClient::ReceiveResponce(SocketDescriptor sd) const noexcept { Error err; - GenericNetworkResponse Response; - io__->Receive(sd, &Response, sizeof(Response), &err); + GenericNetworkResponse response; + io__->Receive(sd, &response, sizeof(response), &err); if(err != nullptr) { io__->CloseSocket(sd, nullptr); connection_pool__->ReleaseConnection(sd); return err; } - switch (Response.error_code) { - case kNetErrorWrongRequest : - io__->CloseSocket(sd, nullptr); - return Error{new SimpleError("internal server error: wrong request")}; - case kNetErrorNoData : - connection_pool__->ReleaseConnection(sd); - return Error{new SimpleError("no data")}; - default: - return nullptr; + if (response.error_code) { + switch (response.error_code) { + case kNetErrorWrongRequest: + io__->CloseSocket(sd, nullptr); + break; + case kNetErrorNoData: + connection_pool__->ReleaseConnection(sd); + break; + } + return ConvertRdsResponseToError(response.error_code); } + return nullptr; } Error TcpClient::QueryCacheHasData(SocketDescriptor* sd, const FileInfo* info, bool try_reconnect) const noexcept { diff --git a/consumer/api/cpp/unittests/test_rds_error_mapper.cpp b/consumer/api/cpp/unittests/test_rds_error_mapper.cpp new file mode 100644 index 000000000..1ba1cdf6d --- /dev/null +++ b/consumer/api/cpp/unittests/test_rds_error_mapper.cpp @@ -0,0 +1,16 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> +#include "../src/rds_response_error.h" + +using namespace asapo; +using ::testing::Eq; + +TEST(ConvertRdsResponseToError, TestAllCases) { + ASSERT_THAT(ConvertRdsResponseToError(NetworkErrorCode::kNetErrorNoError /* 0 */), Eq(nullptr)); + ASSERT_THAT(ConvertRdsResponseToError(NetworkErrorCode::kNetErrorReauthorize), Eq(RdsResponseErrorTemplates::kNetErrorReauthorize)); + ASSERT_THAT(ConvertRdsResponseToError(NetworkErrorCode::kNetErrorWarning), Eq(RdsResponseErrorTemplates::kNetErrorWarning)); + ASSERT_THAT(ConvertRdsResponseToError(NetworkErrorCode::kNetErrorWrongRequest), Eq(RdsResponseErrorTemplates::kNetErrorWrongRequest)); + ASSERT_THAT(ConvertRdsResponseToError(NetworkErrorCode::kNetErrorNoData), Eq(RdsResponseErrorTemplates::kNetErrorNoData)); + ASSERT_THAT(ConvertRdsResponseToError(NetworkErrorCode::kNetAuthorizationError), Eq(RdsResponseErrorTemplates::kNetAuthorizationError)); + ASSERT_THAT(ConvertRdsResponseToError(NetworkErrorCode::kNetErrorInternalServerError), Eq(RdsResponseErrorTemplates::kNetErrorInternalServerError)); +} diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index e9b5251b9..46b17363f 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -46,11 +46,11 @@ namespace { TEST(FolderDataBroker, Constructor) { auto data_broker = std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "path", false, - asapo::SourceCredentials{"beamtime_id", "", "", "token"}, asapo::NetworkConnectionType::kAsapoTcp) + asapo::SourceCredentials{"beamtime_id", "", "", "token"}) }; ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(data_broker->io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::CurlHttpClient*>(data_broker->httpclient__.get()), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<asapo::TcpClient*>(data_broker->net_client__.get()), Ne(nullptr)); + ASSERT_THAT(data_broker->net_client__.get(), Eq(nullptr)); } const uint8_t expected_value = 1; @@ -87,10 +87,10 @@ class ServerDataBrokerTests : public Test { void AssertSingleFileTransfer(); void SetUp() override { data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, true, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}, asapo::NetworkConnectionType::kAsapoTcp) + new ServerDataBroker(expected_server_uri, expected_path, true, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}) }; fts_data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}, asapo::NetworkConnectionType::kAsapoTcp) + new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{expected_beamtime_id, "", expected_stream, expected_token}) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; @@ -178,7 +178,7 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) { data_broker->httpclient__.release(); data_broker->net_client__.release(); data_broker = std::unique_ptr<ServerDataBroker> { - new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}, asapo::NetworkConnectionType::kAsapoTcp) + new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{"beamtime_id", "", "", expected_token}) }; data_broker->io__ = std::unique_ptr<IO> {&mock_io}; data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client}; @@ -1139,13 +1139,14 @@ TEST_F(ServerDataBrokerTests, GetImageTriesToGetTokenAgainIfTransferFailed) { TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUri) { MockGetBrokerUri(); auto expected_acknowledge_command = "{\"Op\":\"Acknowledge\"}"; - EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"+expected_substream+"/" + - expected_group_id - + "/" + std::to_string(expected_dataset_id) + "?token=" - + expected_token,expected_acknowledge_command, _,_)).WillOnce(DoAll( - SetArgPointee<2>(HttpCode::OK), - SetArgPointee<3>(nullptr), - Return(""))); + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + expected_substream + "/" + + expected_group_id + + "/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token, expected_acknowledge_command, _, _)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(""))); auto err = data_broker->Acknowledge(expected_group_id, expected_dataset_id, expected_substream); @@ -1157,12 +1158,12 @@ TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUriWithDefaultSubStream) { MockGetBrokerUri(); auto expected_acknowledge_command = "{\"Op\":\"Acknowledge\"}"; EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + - expected_group_id - + "/" + std::to_string(expected_dataset_id) + "?token=" - + expected_token,expected_acknowledge_command, _,_)).WillOnce(DoAll( - SetArgPointee<2>(HttpCode::OK), - SetArgPointee<3>(nullptr), - Return(""))); + expected_group_id + + "/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token, expected_acknowledge_command, _, _)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(""))); auto err = data_broker->Acknowledge(expected_group_id, expected_dataset_id); @@ -1171,11 +1172,12 @@ TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUriWithDefaultSubStream) { void ServerDataBrokerTests::ExpectIdList(bool error) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"+expected_substream+"/" + - expected_group_id + "/nacks?token=" + expected_token+"&from=1&to=0",_,_)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(error?"":"{\"unacknowledged\":[1,2,3]}"))); + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + expected_substream + "/" + + expected_group_id + "/nacks?token=" + expected_token + "&from=1&to=0", _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(error ? "" : "{\"unacknowledged\":[1,2,3]}"))); } TEST_F(ServerDataBrokerTests, GetUnAcknowledgedListReturnsIds) { @@ -1183,17 +1185,18 @@ TEST_F(ServerDataBrokerTests, GetUnAcknowledgedListReturnsIds) { asapo::Error err; auto list = data_broker->GetUnacknowledgedTupleIds(expected_group_id, expected_substream, 1, 0, &err); - ASSERT_THAT(list, ElementsAre(1,2,3)); + ASSERT_THAT(list, ElementsAre(1, 2, 3)); ASSERT_THAT(err, Eq(nullptr)); } void ServerDataBrokerTests::ExpectLastAckId(bool empty_response) { - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/"+expected_substream+"/" + - expected_group_id + "/lastack?token=" + expected_token,_,_)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(empty_response?"{\"lastAckId\":0}":"{\"lastAckId\":1}"))); + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + expected_substream + "/" + + expected_group_id + "/lastack?token=" + expected_token, _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(empty_response ? "{\"lastAckId\":0}" : "{\"lastAckId\":1}"))); } diff --git a/examples/producer/simple-producer/produce.cpp b/examples/producer/simple-producer/produce.cpp index c1f850bc0..68f15c8d2 100644 --- a/examples/producer/simple-producer/produce.cpp +++ b/examples/producer/simple-producer/produce.cpp @@ -25,14 +25,14 @@ int main(int argc, char* argv[]) { auto source = "asapo-services2:8400"; auto beamtime = "asapo_test"; - auto producer = asapo::Producer::Create(source, 1,asapo::RequestHandlerType::kTcp, + auto producer = asapo::Producer::Create(source, 1, asapo::RequestHandlerType::kTcp, asapo::SourceCredentials{beamtime, "", "", ""}, 60, &err); exit_if_error("Cannot start producer", err); std::string to_send = "hello"; auto send_size = to_send.size() + 1; auto buffer = asapo::FileData(new uint8_t[send_size]); - memcpy(buffer.get(),to_send.c_str(),send_size); + memcpy(buffer.get(), to_send.c_str(), send_size); asapo::EventHeader event_header{1, send_size, "test_file"}; err = producer->SendData(event_header, std::move(buffer), asapo::kDefaultIngestMode, &ProcessAfterSend); diff --git a/producer/api/cpp/include/producer/common.h b/producer/api/cpp/include/producer/common.h index 289f50999..998d74f1a 100644 --- a/producer/api/cpp/include/producer/common.h +++ b/producer/api/cpp/include/producer/common.h @@ -13,8 +13,8 @@ const uint8_t kMaxProcessingThreads = 32; struct RequestCallbackPayload { - GenericRequestHeader original_header; - std::string response; + GenericRequestHeader original_header; + std::string response; }; using RequestCallback = std::function<void(RequestCallbackPayload, Error)>; diff --git a/producer/api/cpp/include/producer/producer.h b/producer/api/cpp/include/producer/producer.h index 783c92c7e..ffd752a1e 100644 --- a/producer/api/cpp/include/producer/producer.h +++ b/producer/api/cpp/include/producer/producer.h @@ -25,15 +25,15 @@ class Producer { virtual ~Producer() = default; - //! Get substream information from receiver - /*! - \param substream (optional) - substream - \return StreamInfo - a structure with substream information - */ - virtual StreamInfo GetStreamInfo(std::string substream, uint64_t timeout_ms, Error* err) const = 0; - virtual StreamInfo GetStreamInfo(uint64_t timeout_ms, Error* err) const = 0; - - //! Sends data to the receiver + //! Get substream information from receiver + /*! + \param substream (optional) - substream + \return StreamInfo - a structure with substream information + */ + virtual StreamInfo GetStreamInfo(std::string substream, uint64_t timeout_ms, Error* err) const = 0; + virtual StreamInfo GetStreamInfo(uint64_t timeout_ms, Error* err) const = 0; + + //! Sends data to the receiver /*! \param event_header - A stucture with the meta information (file name, size, a string with user metadata (JSON format)). \param data - A pointer to the data to send diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 50760be36..e6054e30a 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -261,34 +261,36 @@ Error ProducerImpl::SendFile(const EventHeader& event_header, } -using RequestCallbackWithPromise = void (*)(std::shared_ptr<std::promise<StreamInfoResult>>, RequestCallbackPayload header, Error err); +using RequestCallbackWithPromise = void (*)(std::shared_ptr<std::promise<StreamInfoResult>>, + RequestCallbackPayload header, Error err); -RequestCallback unwrap_callback(RequestCallbackWithPromise callback, std::unique_ptr<std::promise<StreamInfoResult>> promise) { +RequestCallback unwrap_callback(RequestCallbackWithPromise callback, + std::unique_ptr<std::promise<StreamInfoResult>> promise) { auto shared_promise = std::shared_ptr<std::promise<StreamInfoResult>>(std::move(promise)); RequestCallback wrapper = [ = ](RequestCallbackPayload payload, Error err) -> void { - callback(shared_promise, std::move(payload), std::move(err)); + callback(shared_promise, std::move(payload), std::move(err)); }; return wrapper; } -void ActivatePromise(std::shared_ptr<std::promise<StreamInfoResult>> promise, RequestCallbackPayload payload, Error err) { +void ActivatePromise(std::shared_ptr<std::promise<StreamInfoResult>> promise, RequestCallbackPayload payload, + Error err) { StreamInfoResult res; if (err == nullptr) { auto ok = res.sinfo.SetFromJson(payload.response); - res.err=ok?nullptr:ProducerErrorTemplates::kInternalServerError.Generate( - std::string("cannot read JSON string from server response: ")+payload.response).release(); + res.err = ok ? nullptr : ProducerErrorTemplates::kInternalServerError.Generate( + std::string("cannot read JSON string from server response: ") + payload.response).release(); } else { - res.err=err.release(); + res.err = err.release(); } try { promise->set_value(res); - } - catch(...){} + } catch(...) {} } -StreamInfo GetInfroFromCallback(std::future<StreamInfoResult>* promiseResult,uint64_t timeout_sec, Error* err) { +StreamInfo GetInfroFromCallback(std::future<StreamInfoResult>* promiseResult, uint64_t timeout_sec, Error* err) { try { - auto status = promiseResult->wait_for(std::chrono::milliseconds(timeout_sec*1000)); + auto status = promiseResult->wait_for(std::chrono::milliseconds(timeout_sec * 1000)); if (status == std::future_status::ready) { auto res = promiseResult->get(); if (res.err == nullptr) { @@ -298,32 +300,33 @@ StreamInfo GetInfroFromCallback(std::future<StreamInfoResult>* promiseResult,uin return StreamInfo{}; } } - } catch(...){} + } catch(...) {} *err = ProducerErrorTemplates::kTimeout.Generate(); return StreamInfo{}; } StreamInfo ProducerImpl::GetStreamInfo(std::string substream, uint64_t timeout_sec, Error* err) const { - GenericRequestHeader request_header{kOpcodeStreamInfo, 0, 0,0, "", substream}; + GenericRequestHeader request_header{kOpcodeStreamInfo, 0, 0, 0, "", substream}; std::unique_ptr<std::promise<StreamInfoResult>> promise {new std::promise<StreamInfoResult>}; - std::future<StreamInfoResult> promiseResult= promise->get_future(); + std::future<StreamInfoResult> promiseResult = promise->get_future(); *err = request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header), - nullptr, "", "", - unwrap_callback(ActivatePromise,std::move(promise)), true, - timeout_sec*1000} + nullptr, "", "", + unwrap_callback(ActivatePromise, std::move(promise)), true, + timeout_sec * 1000} }, true); if (*err) { return StreamInfo{}; } - return GetInfroFromCallback(&promiseResult,timeout_sec+2,err); // we give two more sec for request to exit by timeout + return GetInfroFromCallback(&promiseResult, timeout_sec + 2, + err); // we give two more sec for request to exit by timeout } StreamInfo ProducerImpl::GetStreamInfo(uint64_t timeout_sec, Error* err) const { - return GetStreamInfo(kDefaultSubstream,timeout_sec, err); + return GetStreamInfo(kDefaultSubstream, timeout_sec, err); } } \ No newline at end of file diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index dc78bd75e..c5f0583f2 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -70,8 +70,8 @@ class ProducerImpl : public Producer { }; struct StreamInfoResult { - StreamInfo sinfo; - ErrorInterface* err; + StreamInfo sinfo; + ErrorInterface* err; }; diff --git a/producer/api/cpp/src/request_handler_filesystem.cpp b/producer/api/cpp/src/request_handler_filesystem.cpp index 1e77c7de8..c548f70c7 100644 --- a/producer/api/cpp/src/request_handler_filesystem.cpp +++ b/producer/api/cpp/src/request_handler_filesystem.cpp @@ -31,7 +31,7 @@ bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request, b err = io__->WriteDataToFile(destination_folder_, request->header.message, (uint8_t*)producer_request->data.get(), (size_t)request->header.data_size, true, true); if (producer_request->callback) { - producer_request->callback(RequestCallbackPayload{request->header,""}, std::move(err)); + producer_request->callback(RequestCallbackPayload{request->header, ""}, std::move(err)); } *retry = false; return true; diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp index f395d63e9..e0931b03e 100644 --- a/producer/api/cpp/src/request_handler_tcp.cpp +++ b/producer/api/cpp/src/request_handler_tcp.cpp @@ -75,7 +75,7 @@ Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) { return nullptr; } -Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_header,std::string* response) { +Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_header, std::string* response) { Error err; SendDataResponse sendDataResponse; io__->Receive(sd_, &sendDataResponse, sizeof(sendDataResponse), &err); @@ -115,13 +115,13 @@ Error RequestHandlerTcp::ReceiveResponse(const GenericRequestHeader& request_hea } } -Error RequestHandlerTcp::TrySendToReceiver(const ProducerRequest* request,std::string* response) { +Error RequestHandlerTcp::TrySendToReceiver(const ProducerRequest* request, std::string* response) { auto err = SendRequestContent(request); if (err) { return err; } - err = ReceiveResponse(request->header,response); + err = ReceiveResponse(request->header, response); if (err == nullptr || err == ProducerErrorTemplates::kServerWarning) { log__->Debug("successfully sent data, opcode: " + std::to_string(request->header.op_code) + ", id: " + std::to_string(request->header.data_id) + " to " + connected_receiver_uri_); @@ -214,9 +214,9 @@ bool RequestHandlerTcp::ProcessErrorFromReceiver(const Error& error, } -void RequestHandlerTcp::ProcessRequestCallback(Error err, ProducerRequest* request,std::string response, bool* retry) { +void RequestHandlerTcp::ProcessRequestCallback(Error err, ProducerRequest* request, std::string response, bool* retry) { if (request->callback) { - request->callback(RequestCallbackPayload{request->header,std::move(response)}, std::move(err)); + request->callback(RequestCallbackPayload{request->header, std::move(response)}, std::move(err)); } *retry = false; } @@ -235,7 +235,7 @@ bool RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request, bo } std::string response; - auto err = TrySendToReceiver(request,&response); + auto err = TrySendToReceiver(request, &response); bool server_error_can_retry = ProcessErrorFromReceiver(err, request, receiver_uri); if (server_error_can_retry) { continue; @@ -295,13 +295,14 @@ void RequestHandlerTcp::TearDownProcessingRequestLocked(bool request_processed_s void RequestHandlerTcp::ProcessRequestTimeout(GenericRequest* request) { auto producer_request = static_cast<ProducerRequest*>(request); - auto err_string ="request id:" + std::to_string(request->header.data_id) + ", opcode: "+std::to_string(request->header.op_code) + " for " + request->header.substream + - " substream"; - log__->Error("timeout "+err_string); + auto err_string = "request id:" + std::to_string(request->header.data_id) + ", opcode: " + std::to_string( + request->header.op_code) + " for " + request->header.substream + + " substream"; + log__->Error("timeout " + err_string); auto err = ProducerErrorTemplates::kTimeout.Generate(err_string); if (producer_request->callback) { - producer_request->callback(RequestCallbackPayload{request->header,""}, std::move(err)); + producer_request->callback(RequestCallbackPayload{request->header, ""}, std::move(err)); } } diff --git a/producer/api/cpp/src/request_handler_tcp.h b/producer/api/cpp/src/request_handler_tcp.h index 38841d6c8..822b75da2 100644 --- a/producer/api/cpp/src/request_handler_tcp.h +++ b/producer/api/cpp/src/request_handler_tcp.h @@ -34,8 +34,8 @@ class RequestHandlerTcp: public RequestHandler { Error ConnectToReceiver(const std::string& source_credentials, const std::string& receiver_address); bool SendDataToOneOfTheReceivers(ProducerRequest* request, bool* retry); Error SendRequestContent(const ProducerRequest* request); - Error ReceiveResponse(const GenericRequestHeader& request_header,std::string* response); - Error TrySendToReceiver(const ProducerRequest* request,std::string* response); + Error ReceiveResponse(const GenericRequestHeader& request_header, std::string* response); + Error TrySendToReceiver(const ProducerRequest* request, std::string* response); SocketDescriptor sd_{kDisconnectedSocketDescriptor}; void UpdateIfNewConnection(); bool UpdateReceiversList(); diff --git a/producer/api/cpp/unittests/mocking.h b/producer/api/cpp/unittests/mocking.h index 109fa2382..ac806babd 100644 --- a/producer/api/cpp/unittests/mocking.h +++ b/producer/api/cpp/unittests/mocking.h @@ -28,11 +28,11 @@ class MockRequestPull : public RequestPool { RequestPool{1, request_handler_factory, log} {}; asapo::Error AddRequest(std::unique_ptr<asapo::GenericRequest> request, bool top_priority = false) override { if (request == nullptr) { - return asapo::Error{AddRequest_t(nullptr,top_priority)}; + return asapo::Error{AddRequest_t(nullptr, top_priority)}; } - return asapo::Error{AddRequest_t(request.get(),top_priority)}; + return asapo::Error{AddRequest_t(request.get(), top_priority)}; } - MOCK_METHOD2(AddRequest_t, asapo::ErrorInterface * (GenericRequest*,bool)); + MOCK_METHOD2(AddRequest_t, asapo::ErrorInterface * (GenericRequest*, bool)); MOCK_METHOD0(NRequestsInPool, uint64_t ()); MOCK_METHOD1(WaitRequestsFinished_t, asapo::ErrorInterface * (uint64_t timeout_ms)); diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 798a49c17..fdc8f4c04 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -95,8 +95,8 @@ class ProducerImplTests : public testing::Test { }; TEST_F(ProducerImplTests, SendReturnsError) { - EXPECT_CALL(mock_pull, AddRequest_t(_,false)).WillOnce(Return( - asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); + EXPECT_CALL(mock_pull, AddRequest_t(_, false)).WillOnce(Return( + asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); asapo::EventHeader event_header{1, 1, "test"}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); @@ -176,7 +176,7 @@ TEST_F(ProducerImplTests, UsesDefaultStream) { asapo::kDefaultSubstream.c_str(), expected_ingest_mode, 0, - 0),false)).WillOnce(Return(nullptr)); + 0), false)).WillOnce(Return(nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; auto err = producer.SendData(event_header, nullptr, expected_ingest_mode, nullptr); @@ -197,7 +197,7 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) { expected_ingest_mode, 0, 0 - ),false)).WillOnce(Return( + ), false)).WillOnce(Return( nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; @@ -219,7 +219,7 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequestWithSubstream) { expected_ingest_mode, 0, 0 - ),false)).WillOnce(Return( + ), false)).WillOnce(Return( nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata}; @@ -244,7 +244,7 @@ TEST_F(ProducerImplTests, OKSendingSubstreamFinish) { asapo::IngestModeFlags::kTransferMetaDataOnly, 0, 0 - ),false)).WillOnce(Return( + ), false)).WillOnce(Return( nullptr)); auto err = producer.SendSubstreamFinishedFlag(expected_substream, expected_id, expected_next_substream, nullptr); @@ -269,7 +269,7 @@ TEST_F(ProducerImplTests, OKSendingSubstreamFinishWithNoNextStream) { asapo::IngestModeFlags::kTransferMetaDataOnly, 0, 0 - ),false)).WillOnce(Return( + ), false)).WillOnce(Return( nullptr)); auto err = producer.SendSubstreamFinishedFlag(expected_substream, expected_id, "", nullptr); @@ -285,7 +285,7 @@ TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) { expected_credentials_str, expected_metadata, expected_id, expected_size, expected_name, asapo::kDefaultSubstream.c_str(), expected_ingest_mode, - expected_subset_id, expected_subset_size),false)).WillOnce( + expected_subset_id, expected_subset_size), false)).WillOnce( Return( nullptr)); @@ -312,7 +312,7 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { "", expected_ingest_mode, 10, - 10),false)).WillOnce(Return( + 10), false)).WillOnce(Return( nullptr)); auto err = producer.SendMetaData(expected_metadata, nullptr); @@ -324,7 +324,7 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { producer.SetCredentials(expected_credentials); - EXPECT_CALL(mock_pull, AddRequest_t(_,_)).Times(0); + EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::EventHeader event_header{expected_id, 0, expected_name}; auto err = producer.SendFile(event_header, "", expected_ingest_mode, nullptr); @@ -337,7 +337,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) { producer.SetCredentials(expected_credentials); - EXPECT_CALL(mock_pull, AddRequest_t(_,_)).Times(0); + EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::EventHeader event_header{expected_id, 0, ""}; auto err = producer.SendFile(event_header, expected_fullpath, expected_ingest_mode, nullptr); @@ -359,8 +359,8 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) { asapo::kDefaultSubstream.c_str(), expected_ingest_mode, 0, - 0),false)).WillOnce(Return( - nullptr)); + 0), false)).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, 0, expected_name}; auto err = producer.SendFile(event_header, expected_fullpath, expected_ingest_mode, nullptr); @@ -380,8 +380,8 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequestWithSubstream) { expected_substream, expected_ingest_mode, 0, - 0),false)).WillOnce(Return( - nullptr)); + 0), false)).WillOnce(Return( + nullptr)); asapo::EventHeader event_header{expected_id, 0, expected_name}; auto err = producer.SendFile(event_header, expected_substream, expected_fullpath, expected_ingest_mode, nullptr); @@ -411,7 +411,7 @@ TEST_F(ProducerImplTests, ErrorSettingSecondTime) { TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { producer.SetCredentials(expected_credentials); - EXPECT_CALL(mock_pull, AddRequest_t(_,_)).Times(0); + EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::EventHeader event_header{expected_id, 0, expected_name}; auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly | asapo::IngestModeFlags::kTransferData; @@ -445,23 +445,23 @@ TEST_F(ProducerImplTests, WaitRequestsFinished) { MATCHER_P3(M_CheckGetSubstreamInfoRequest, op_code, source_credentials, substream, - "Checks if a valid GenericRequestHeader was Send") { + "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code - && request->source_credentials == source_credentials - && strcmp(((asapo::GenericRequestHeader) (arg->header)).substream, substream) == 0; + && request->source_credentials == source_credentials + && strcmp(((asapo::GenericRequestHeader) (arg->header)).substream, substream) == 0; } TEST_F(ProducerImplTests, GetStreamInfoMakesCorerctRequest) { producer.SetCredentials(expected_credentials); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetSubstreamInfoRequest(asapo::kOpcodeStreamInfo, - expected_credentials_str, - expected_substream),true)).WillOnce( - Return(nullptr)); + expected_credentials_str, + expected_substream), true)).WillOnce( + Return(nullptr)); asapo::Error err; - producer.GetStreamInfo(expected_substream,1, &err); + producer.GetStreamInfo(expected_substream, 1, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp index 321b60e4d..120d79bca 100644 --- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp @@ -57,9 +57,9 @@ class RequestHandlerFilesystemTests : public testing::Test { expected_meta_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - std::string callback_response; + std::string callback_response; - asapo::ProducerRequest request{"", header, nullptr, "", "", [this](asapo::RequestCallbackPayload payload, asapo::Error err) { + asapo::ProducerRequest request{"", header, nullptr, "", "", [this](asapo::RequestCallbackPayload payload, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = payload.original_header; diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index 784c25cc5..687b86c06 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -59,7 +59,7 @@ class RequestHandlerTcpTests : public testing::Test { std::string expected_warning = "warning"; std::string expected_response = "response"; - char expected_file_name[asapo::kMaxMessageSize] = "test_name"; + char expected_file_name[asapo::kMaxMessageSize] = "test_name"; char expected_beamtime_id[asapo::kMaxMessageSize] = "test_beamtime_id"; char expected_substream[asapo::kMaxMessageSize] = "test_substream"; @@ -851,7 +851,7 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) { ExpectOKSendHeader(true); ExpectOKSendMetaData(true); ExpectOKSendFile(true); - ExpectOKReceive(true,asapo::kNetErrorNoError,expected_response); + ExpectOKReceive(true, asapo::kNetErrorNoError, expected_response); request_handler.PrepareProcessingRequestLocked(); @@ -870,7 +870,7 @@ TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); ExpectOKSendAll(true); - ExpectOKReceive(true,asapo::kNetErrorNoError,expected_response); + ExpectOKReceive(true, asapo::kNetErrorNoError, expected_response); request_handler.PrepareProcessingRequestLocked(); diff --git a/producer/api/python/asapo_wrappers.h b/producer/api/python/asapo_wrappers.h index 5c9929da5..794e97004 100644 --- a/producer/api/python/asapo_wrappers.h +++ b/producer/api/python/asapo_wrappers.h @@ -13,7 +13,7 @@ inline std::string GetErrorString(asapo::Error* err) { return ""; } -using RequestCallbackCython = void (*)(void*, void*, RequestCallbackPayload payload , Error err); +using RequestCallbackCython = void (*)(void*, void*, RequestCallbackPayload payload, Error err); using RequestCallbackCythonMemory = void (*)(void*, void*, void*, RequestCallbackPayload payload, Error err); RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, void* py_func) { diff --git a/receiver/src/request.h b/receiver/src/request.h index 22e80ecfe..6bfbcc38f 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -24,8 +24,8 @@ namespace asapo { using RequestHandlerList = std::vector<const ReceiverRequestHandler*>; enum class ResponseMessageType { - kWarning, - kInfo + kWarning, + kInfo }; class Request { diff --git a/receiver/src/request_handler/request_handler_db_stream_info.cpp b/receiver/src/request_handler/request_handler_db_stream_info.cpp index 1c2695f42..ec52adb5b 100644 --- a/receiver/src/request_handler/request_handler_db_stream_info.cpp +++ b/receiver/src/request_handler/request_handler_db_stream_info.cpp @@ -16,11 +16,11 @@ Error RequestHandlerDbStreamInfo::ProcessRequest(Request* request) const { auto col_name = collection_name_prefix_ + "_" + request->GetSubstream(); StreamInfo info; - auto err = db_client__->GetStreamInfo(col_name,&info); + auto err = db_client__->GetStreamInfo(col_name, &info); if (!err) { - log__->Debug(std::string{"get stream info from "} + col_name + " in " + - db_name_ + " at " + GetReceiverConfig()->database_uri); - request->SetResponseMessage(info.Json(),ResponseMessageType::kInfo); + log__->Debug(std::string{"get stream info from "} + col_name + " in " + + db_name_ + " at " + GetReceiverConfig()->database_uri); + request->SetResponseMessage(info.Json(), ResponseMessageType::kInfo); } return err; } diff --git a/receiver/src/request_handler/request_handler_db_stream_info.h b/receiver/src/request_handler/request_handler_db_stream_info.h index 5ec17c9dd..6562b6d79 100644 --- a/receiver/src/request_handler/request_handler_db_stream_info.h +++ b/receiver/src/request_handler/request_handler_db_stream_info.h @@ -7,9 +7,9 @@ namespace asapo { class RequestHandlerDbStreamInfo final: public RequestHandlerDb { - public: - RequestHandlerDbStreamInfo(std::string collection_name_prefix); - Error ProcessRequest(Request* request) const override; + public: + RequestHandlerDbStreamInfo(std::string collection_name_prefix); + Error ProcessRequest(Request* request) const override; }; } diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp index 5570dad6a..b37232ed5 100644 --- a/receiver/src/request_handler/request_handler_db_write.cpp +++ b/receiver/src/request_handler/request_handler_db_write.cpp @@ -38,7 +38,7 @@ Error RequestHandlerDbWrite::ProcessDuplicateRecordSituation(Request* request) c auto check_err = request->CheckForDuplicates(); if (check_err == ReceiverErrorTemplates::kWarningDuplicatedRequest) { std::string warn_str = "ignoring duplicate record for id " + std::to_string(request->GetDataID()); - request->SetResponseMessage(warn_str,ResponseMessageType::kWarning); + request->SetResponseMessage(warn_str, ResponseMessageType::kWarning); log__->Warning(warn_str); return nullptr; } diff --git a/receiver/src/request_handler/request_handler_file_process.cpp b/receiver/src/request_handler/request_handler_file_process.cpp index 47bf3dd21..0d07f793c 100644 --- a/receiver/src/request_handler/request_handler_file_process.cpp +++ b/receiver/src/request_handler/request_handler_file_process.cpp @@ -20,14 +20,14 @@ Error RequestHandlerFileProcess::ProcessRequest(Request* request) const { Error RequestHandlerFileProcess::ProcessFileExistSituation(Request* request) const { auto err_duplicate = request->CheckForDuplicates(); if (err_duplicate == nullptr) { - request->SetResponseMessage("file has been overwritten",ResponseMessageType::kWarning); + request->SetResponseMessage("file has been overwritten", ResponseMessageType::kWarning); log__->Warning(std::string("overwriting file " ) + request->GetOfflinePath() + kPathSeparator + request->GetFileName()); return file_processor_->ProcessFile(request, true); } if (err_duplicate == ReceiverErrorTemplates::kWarningDuplicatedRequest) { request->SetAlreadyProcessedFlag(); - request->SetResponseMessage("duplicated request, ignored",ResponseMessageType::kWarning); + request->SetResponseMessage("duplicated request, ignored", ResponseMessageType::kWarning); log__->Warning("duplicated request, id: " + std::to_string(request->GetDataID())); return nullptr; } diff --git a/receiver/src/request_handler/requests_dispatcher.cpp b/receiver/src/request_handler/requests_dispatcher.cpp index 460e82861..0ce7e34bf 100644 --- a/receiver/src/request_handler/requests_dispatcher.cpp +++ b/receiver/src/request_handler/requests_dispatcher.cpp @@ -39,7 +39,7 @@ GenericNetworkResponse RequestsDispatcher::CreateResponseToRequest(const std::un strncpy(generic_response.message, handle_error->Explain().c_str(), kMaxMessageSize); } if (request->GetResponseMessage().size() > 0) { - if (request->GetResponseMessageType()==ResponseMessageType::kWarning) { + if (request->GetResponseMessageType() == ResponseMessageType::kWarning) { generic_response.error_code = kNetErrorWarning; } strncpy(generic_response.message, request->GetResponseMessage().c_str(), kMaxMessageSize); diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h index 8aebb8a3e..c8bd31122 100644 --- a/receiver/unittests/receiver_mocking.h +++ b/receiver/unittests/receiver_mocking.h @@ -94,12 +94,12 @@ class MockRequest: public Request { MOCK_CONST_METHOD0(WasAlreadyProcessed, bool()); MOCK_METHOD0(SetAlreadyProcessedFlag, void()); - MOCK_METHOD2(SetResponseMessage, void(std::string,ResponseMessageType)); + MOCK_METHOD2(SetResponseMessage, void(std::string, ResponseMessageType)); MOCK_CONST_METHOD0(GetResponseMessage, const std::string & ()); MOCK_CONST_METHOD0(GetResponseMessageType_t, ResponseMessageType ()); const ResponseMessageType GetResponseMessageType() const override { - return GetResponseMessageType_t(); + return GetResponseMessageType_t(); }; Error CheckForDuplicates() override { diff --git a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp index fa78df3cd..155ac6f90 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_stream_info.cpp @@ -67,7 +67,7 @@ class DbMetaStreamInfoTests : public Test { std::string info_str = R"({"lastId":10})"; const uint8_t* expected_info_str = reinterpret_cast<const uint8_t*>(info_str.c_str()); asapo::StreamInfo expected_stream_info; - void SetUp() override { + void SetUp() override { GenericRequestHeader request_header; expected_stream_info.last_id = 10; request_header.data_id = 0; @@ -93,20 +93,20 @@ TEST_F(DbMetaStreamInfoTests, CallsUpdate) { ; EXPECT_CALL(*mock_request, GetSubstream()) - .WillOnce(Return(expected_substream)) - ; + .WillOnce(Return(expected_substream)) + ; EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)). WillOnce(testing::Return(nullptr)); EXPECT_CALL(mock_db, GetStreamInfo_t(expected_collection_name, _)). - WillOnce(DoAll( - SetArgPointee<1>(expected_stream_info), - testing::Return(nullptr) - )); + WillOnce(DoAll( + SetArgPointee<1>(expected_stream_info), + testing::Return(nullptr) + )); - EXPECT_CALL(*mock_request, SetResponseMessage(info_str,asapo::ResponseMessageType::kInfo)); + EXPECT_CALL(*mock_request, SetResponseMessage(info_str, asapo::ResponseMessageType::kInfo)); EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("get stream info"), HasSubstr(config.database_uri), diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp index e48797967..a2536c22c 100644 --- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp +++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp @@ -254,7 +254,7 @@ TEST_F(DbWriterHandlerTests, SkipIfWasAlreadyProcessed) { TEST_F(DbWriterHandlerTests, DuplicatedRequest_SameRecord) { ExpectDuplicatedID(); - EXPECT_CALL(*mock_request, SetResponseMessage(HasSubstr("duplicate record"),asapo::ResponseMessageType::kWarning)); + EXPECT_CALL(*mock_request, SetResponseMessage(HasSubstr("duplicate record"), asapo::ResponseMessageType::kWarning)); EXPECT_CALL(*mock_request, CheckForDuplicates_t()) .WillOnce( Return(asapo::ReceiverErrorTemplates::kWarningDuplicatedRequest.Generate().release()) diff --git a/receiver/unittests/request_handler/test_request_handler_file_process.cpp b/receiver/unittests/request_handler/test_request_handler_file_process.cpp index 11e32e370..65e1d9df9 100644 --- a/receiver/unittests/request_handler/test_request_handler_file_process.cpp +++ b/receiver/unittests/request_handler/test_request_handler_file_process.cpp @@ -81,7 +81,7 @@ void FileWriteHandlerTests::ExpecFileProcess(const asapo::SimpleErrorTemplate* e } TEST_F(FileWriteHandlerTests, FileAlreadyExists_NoRecordInDb) { - EXPECT_CALL(*mock_request, SetResponseMessage(HasSubstr("overwritten"),asapo::ResponseMessageType::kWarning)); + EXPECT_CALL(*mock_request, SetResponseMessage(HasSubstr("overwritten"), asapo::ResponseMessageType::kWarning)); EXPECT_CALL(*mock_request, CheckForDuplicates_t()) .WillOnce( Return(nullptr) @@ -106,7 +106,7 @@ TEST_F(FileWriteHandlerTests, FileAlreadyExists_NoRecordInDb) { TEST_F(FileWriteHandlerTests, FileAlreadyExists_DuplicatedRecordInDb) { - EXPECT_CALL(*mock_request, SetResponseMessage(HasSubstr("ignore"),asapo::ResponseMessageType::kWarning)); + EXPECT_CALL(*mock_request, SetResponseMessage(HasSubstr("ignore"), asapo::ResponseMessageType::kWarning)); EXPECT_CALL(*mock_request, SetAlreadyProcessedFlag()); EXPECT_CALL(mock_logger, Warning(HasSubstr("duplicated"))); EXPECT_CALL(*mock_request, GetDataID()).WillOnce(Return(1)); diff --git a/receiver/unittests/request_handler/test_requests_dispatcher.cpp b/receiver/unittests/request_handler/test_requests_dispatcher.cpp index f3513ddfb..eda6db4cd 100644 --- a/receiver/unittests/request_handler/test_requests_dispatcher.cpp +++ b/receiver/unittests/request_handler/test_requests_dispatcher.cpp @@ -262,7 +262,7 @@ TEST_F(RequestsDispatcherTests, OkProcessRequestSendOK) { TEST_F(RequestsDispatcherTests, ProcessRequestReturnsOkWithWarning) { MockHandleRequest(0); MockSendResponse(&response, false); - request->SetResponseMessage("duplicate",asapo::ResponseMessageType::kWarning); + request->SetResponseMessage("duplicate", asapo::ResponseMessageType::kWarning); auto err = dispatcher->ProcessRequest(request); @@ -274,7 +274,7 @@ TEST_F(RequestsDispatcherTests, ProcessRequestReturnsOkWithWarning) { TEST_F(RequestsDispatcherTests, ProcessRequestReturnsOkWithInfo) { MockHandleRequest(0); MockSendResponse(&response, false); - request->SetResponseMessage("some info",asapo::ResponseMessageType::kInfo); + request->SetResponseMessage("some info", asapo::ResponseMessageType::kInfo); auto err = dispatcher->ProcessRequest(request); diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index 2a25efbde..32784f11d 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -257,7 +257,7 @@ TEST_F(RequestTests, RequestTests_SetGetBeamtimeYear_Test) { } TEST_F(RequestTests, SetGetWarningMessage) { - request->SetResponseMessage("warn",asapo::ResponseMessageType::kWarning); + request->SetResponseMessage("warn", asapo::ResponseMessageType::kWarning); ASSERT_THAT(request->GetResponseMessage(), "warn"); ASSERT_THAT(request->GetResponseMessageType(), asapo::ResponseMessageType::kWarning); @@ -265,7 +265,7 @@ TEST_F(RequestTests, SetGetWarningMessage) { } TEST_F(RequestTests, SetGetInfossage) { - request->SetResponseMessage("info",asapo::ResponseMessageType::kInfo); + request->SetResponseMessage("info", asapo::ResponseMessageType::kInfo); ASSERT_THAT(request->GetResponseMessage(), "info"); ASSERT_THAT(request->GetResponseMessageType(), asapo::ResponseMessageType::kInfo); diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 05c7d5a7a..bbab49bd7 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -149,24 +149,24 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str M_AssertTrue(err == asapo::ConsumerErrorTemplates::kNoData, "last ack default stream no data"); M_AssertTrue(id == 0, "last ack default stream no data id = 0"); - auto nacks = broker->GetUnacknowledgedTupleIds(group_id,0,0,&err); + auto nacks = broker->GetUnacknowledgedTupleIds(group_id, 0, 0, &err); M_AssertTrue(err == nullptr, "nacks default stream all"); M_AssertTrue(nacks.size() == 10, "nacks default stream size = 10"); - err = broker->Acknowledge(group_id,1); + err = broker->Acknowledge(group_id, 1); M_AssertTrue(err == nullptr, "ack default stream no error"); - nacks = broker->GetUnacknowledgedTupleIds(group_id,0,0,&err); + nacks = broker->GetUnacknowledgedTupleIds(group_id, 0, 0, &err); M_AssertTrue(nacks.size() == 9, "nacks default stream size = 9 after ack"); id = broker->GetLastAcknowledgedTulpeId(group_id, &err); M_AssertTrue(err == nullptr, "last ack default stream no error"); M_AssertTrue(id == 1, "last ack default stream id = 1"); - err = broker->Acknowledge(group_id,1,"stream1"); + err = broker->Acknowledge(group_id, 1, "stream1"); M_AssertTrue(err == nullptr, "ack stream1 no error"); - nacks = broker->GetUnacknowledgedTupleIds(group_id,"stream1",0,0,&err); + nacks = broker->GetUnacknowledgedTupleIds(group_id, "stream1", 0, 0, &err); M_AssertTrue(nacks.size() == 4, "nacks stream1 size = 4 after ack"); } diff --git a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp index 289362011..e245e6c40 100644 --- a/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp +++ b/tests/automatic/mongo_db/insert_retrieve/insert_retrieve_mongodb.cpp @@ -66,9 +66,9 @@ int main(int argc, char* argv[]) { Assert(err, "No record"); asapo::StreamInfo info; - err = db.GetStreamInfo("test",&info); + err = db.GetStreamInfo("test", &info); M_AssertEq(nullptr, err); - M_AssertEq(fi.id,info.last_id); + M_AssertEq(fi.id, info.last_id); } return 0; -- GitLab