From 4f4e0263e61de386b2c0ae75d665f687bd3a8177 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 23 Dec 2020 13:41:23 +0100 Subject: [PATCH] get rid of default stream in C++ --- CHANGELOG.md | 3 +- .../cpp/include/asapo/common/data_structs.h | 3 - .../api/cpp/include/asapo/consumer/consumer.h | 44 ++-- consumer/api/cpp/src/consumer_impl.cpp | 85 +++---- consumer/api/cpp/src/consumer_impl.h | 16 +- .../api/cpp/unittests/test_consumer_impl.cpp | 227 ++++++------------ examples/consumer/getnext/getnext.cpp | 4 +- examples/pipeline/in_to_out/in_to_out.cpp | 102 ++++---- .../dummy_data_producer.cpp | 4 +- .../api/cpp/include/asapo/producer/producer.h | 28 +-- producer/api/cpp/src/producer_impl.cpp | 35 +-- producer/api/cpp/src/producer_impl.h | 10 - producer/api/cpp/unittests/test_producer.cpp | 2 +- .../api/cpp/unittests/test_producer_impl.cpp | 114 ++++----- .../src/main_eventmon.cpp | 2 +- .../consumer/consumer_api/consumer_api.cpp | 72 +++--- .../next_multithread_broker.cpp | 2 +- .../getlast_broker.cpp | 4 +- 18 files changed, 275 insertions(+), 482 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff5c2a4fb..16b8d7c4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,8 @@ BREAKING CHANGES #### renaming - general * stream -> data_source, substream -> stream * use millisecond everywhere for timeout/delay -* use term `message` for blob of information we send around, rename related structs, parameters, ... +* use term `message` for blob of information we send around, rename related structs, parameters, ... +* C++ - get rid of duplicate functions with default stream #### renaming - Producer API * SendData/send_data -> Send/send * id_in_subset -> dataset_substream diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index ffbc2d681..35b5e49d5 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -110,8 +110,5 @@ enum IngestModeFlags : uint64_t { const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem | kStoreInDatabase; -const std::string kDefaultStream = "default"; - - } #endif //ASAPO_message_meta_H diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 8af2ddfac..f7523180b 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -17,48 +17,44 @@ class Consumer { //! Reset counter for the specific group. /*! \param group_id - group id to use. + \param stream - stream to use \return nullptr of command was successful, otherwise error. */ - virtual Error ResetLastReadMarker(std::string group_id) = 0; virtual Error ResetLastReadMarker(std::string group_id, std::string stream) = 0; - virtual Error SetLastReadMarker(std::string group_id, uint64_t value) = 0; virtual Error SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) = 0; //! Acknowledge message for specific group id and stream. /*! \param group_id - group id to use. \param id - message id - \param stream (optional) - stream + \param stream - stream to use \return nullptr of command was successful, otherwise error. */ - virtual Error Acknowledge(std::string group_id, uint64_t id, std::string stream = kDefaultStream) = 0; + virtual Error Acknowledge(std::string group_id, uint64_t id, std::string stream) = 0; //! Negative acknowledge message for specific group id and stream. /*! \param group_id - group id to use. \param id - message id \param delay_ms - message will be redelivered after delay, 0 to redeliver immediately - \param stream (optional) - stream + \param stream - stream to use \return nullptr of command was successful, otherwise error. */ virtual Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_ms, - std::string stream = kDefaultStream) = 0; + std::string stream) = 0; //! Get unacknowledged messages for specific group id and stream. /*! \param group_id - group id to use. - \param stream (optional) - stream \param from_id - return messages with ids greater or equal to from (use 0 disable limit) \param to_id - return messages with ids less or equal to to (use 0 to disable limit) - \param in (optional) - stream - \param err - set to nullptr of operation succeed, error otherwise. - \return vector of ids, might be empty + \param stream - stream to use + \return nullptr if operation succeed, error otherwise. */ virtual IdList GetUnacknowledgedMessages(std::string group_id, std::string stream, uint64_t from_id, uint64_t to_id, Error* error) = 0; - virtual IdList GetUnacknowledgedMessages(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) = 0; //! Set timeout for consumer operations. Default - no timeout virtual void SetTimeout(uint64_t timeout_ms) = 0; @@ -79,10 +75,10 @@ class Consumer { //! Get current number of datasets /*! + \param stream - stream to use \param err - return nullptr of operation succeed, error otherwise. \return number of datasets. */ - virtual uint64_t GetCurrentSize(Error* err) = 0; virtual uint64_t GetCurrentSize(std::string stream, Error* err) = 0; //! Generate new GroupID. @@ -102,11 +98,11 @@ class Consumer { //! Receive next available message. /*! \param info - where to store message metadata. Can be set to nullptr only message data is needed. - \param group_id - group id to use. + \param group_id - group id to use + \param stream - stream to use \param data - where to store message data. Can be set to nullptr only message metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetNext(MessageMeta* info, std::string group_id, MessageData* data) = 0; virtual Error GetNext(MessageMeta* info, std::string group_id, std::string stream, MessageData* data) = 0; //! Retrieves message using message meta. @@ -122,70 +118,66 @@ class Consumer { /*! \param err - will be set to error data cannot be read, nullptr otherwise. \param group_id - group id to use. - \param stream - stream to use ("" for default). + \param stream - stream to use \param min_size - wait until dataset has min_size messages (0 for maximum size) \return DataSet - information about the dataset */ virtual DataSet GetNextDataset(std::string group_id, std::string stream, uint64_t min_size, Error* err) = 0; - virtual DataSet GetNextDataset(std::string group_id, uint64_t min_size, Error* err) = 0; //! Receive last available dataset which has min_size messages. /*! \param err - will be set to error data cannot be read, nullptr otherwise. - \param stream - stream to use ("" for default). + \param stream - stream to use \param min_size - amount of messages in dataset (0 for maximum size) \return DataSet - information about the dataset */ virtual DataSet GetLastDataset(std::string stream, uint64_t min_size, Error* err) = 0; - virtual DataSet GetLastDataset(uint64_t min_size, Error* err) = 0; //! Receive dataset by id. /*! \param id - dataset id \param err - will be set to error data cannot be read or dataset size less than min_size, nullptr otherwise. - \param stream - stream to use ("" for default). + \param stream - stream to use \param min_size - wait until dataset has min_size messages (0 for maximum size) \return DataSet - information about the dataset */ virtual DataSet GetDatasetById(uint64_t id, std::string stream, uint64_t min_size, Error* err) = 0; - virtual DataSet GetDatasetById(uint64_t id, uint64_t min_size, Error* err) = 0; //! Receive single message by id. /*! \param id - message id \param info - where to store message metadata. Can be set to nullptr only message data is needed. + \param stream - stream to use \param data - where to store message data. Can be set to nullptr only message metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetById(uint64_t id, MessageMeta* info, MessageData* data) = 0; virtual Error GetById(uint64_t id, MessageMeta* info, std::string stream, MessageData* data) = 0; //! Receive id of last acknowledged message /*! \param group_id - group id to use. - \param stream (optional) - stream - \param err - will be set in case of error, nullptr otherwise. + \param stream - stream to use + \param error - will be set in case of error, nullptr otherwise. \return id of the last acknowledged message, 0 if error */ virtual uint64_t GetLastAcknowledgedMessage(std::string group_id, std::string stream, Error* error) = 0; - virtual uint64_t GetLastAcknowledgedMessage(std::string group_id, Error* error) = 0; //! Receive last available message. /*! \param info - where to store message metadata. Can be set to nullptr only message data is needed. + \param stream - stream to use \param data - where to store message data. Can be set to nullptr only message metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetLast(MessageMeta* info, MessageData* data) = 0; virtual Error GetLast(MessageMeta* info, std::string stream, MessageData* data) = 0; //! Get all messages matching the query. /*! \param sql_query - query string in SQL format. Limit dataset is supported + \param stream - stream to use \param err - will be set in case of error, nullptr otherwise \return vector of message metadata matchiing to specified query. Empty if nothing found or error */ - virtual MessageMetas QueryMessages(std::string query, Error* err) = 0; virtual MessageMetas QueryMessages(std::string query, std::string stream, Error* err) = 0; //! Configure resending unacknowledged data diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index a61911c80..45c45076b 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -243,6 +243,11 @@ RequestInfo ConsumerImpl::PrepareRequestInfo(std::string api_url, bool dataset, Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group_id, std::string stream, GetMessageServerOperation op, bool dataset, uint64_t min_size) { + + if (stream.empty()) { + return ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); + } + interrupt_flag_= false; std::string request_suffix = OpToUriCmd(op); std::string request_group = OpToUriCmd(op); @@ -294,10 +299,6 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group return nullptr; } -Error ConsumerImpl::GetNext(MessageMeta* info, std::string group_id, MessageData* data) { - return GetNext(info, std::move(group_id), kDefaultStream, data); -} - Error ConsumerImpl::GetNext(MessageMeta* info, std::string group_id, std::string stream, MessageData* data) { return GetMessageFromServer(GetMessageServerOperation::GetNext, 0, @@ -307,10 +308,6 @@ Error ConsumerImpl::GetNext(MessageMeta* info, std::string group_id, std::string data); } -Error ConsumerImpl::GetLast(MessageMeta* info, MessageData* data) { - return GetLast(info, kDefaultStream, data); -} - Error ConsumerImpl::GetLast(MessageMeta* info, std::string stream, MessageData* data) { return GetMessageFromServer(GetMessageServerOperation::GetLast, 0, @@ -515,14 +512,6 @@ std::string ConsumerImpl::BrokerRequestWithTimeout(RequestInfo request, Error* e return std::move(response.string_output); } -Error ConsumerImpl::SetLastReadMarker(std::string group_id, uint64_t value) { - return SetLastReadMarker(std::move(group_id), value, kDefaultStream); -} - -Error ConsumerImpl::ResetLastReadMarker(std::string group_id) { - return ResetLastReadMarker(std::move(group_id), kDefaultStream); -} - Error ConsumerImpl::ResetLastReadMarker(std::string group_id, std::string stream) { return SetLastReadMarker(group_id, 0, stream); } @@ -556,24 +545,20 @@ uint64_t ConsumerImpl::GetCurrentSize(std::string stream, Error* err) { return size; } -uint64_t ConsumerImpl::GetCurrentSize(Error* err) { - return GetCurrentSize(kDefaultStream, err); -} -Error ConsumerImpl::GetById(uint64_t id, MessageMeta* info, MessageData* data) { +Error ConsumerImpl::GetById(uint64_t id, MessageMeta* info, std::string stream, MessageData* data) { if (id == 0) { return ConsumerErrorTemplates::kWrongInput.Generate("id should be positive"); } - - return GetById(id, info, kDefaultStream, data); -} - -Error ConsumerImpl::GetById(uint64_t id, MessageMeta* info, std::string stream, MessageData* data) { return GetMessageFromServer(GetMessageServerOperation::GetID, id, "0", stream, info, data); } Error ConsumerImpl::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id, std::string stream, bool dataset, uint64_t min_size) { + if (stream.empty()) { + return ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); + } + RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(stream) + @@ -607,6 +592,11 @@ DataSet DecodeDatasetFromResponse(std::string response, Error* err) { } MessageMetas ConsumerImpl::QueryMessages(std::string query, std::string stream, Error* err) { + if (stream.empty()) { + *err = ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); + return {}; + } + RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + "/" + std::move(stream) + "/0/querymessages"; @@ -622,14 +612,6 @@ MessageMetas ConsumerImpl::QueryMessages(std::string query, std::string stream, return dataset.content; } -MessageMetas ConsumerImpl::QueryMessages(std::string query, Error* err) { - return QueryMessages(std::move(query), kDefaultStream, err); -} - -DataSet ConsumerImpl::GetNextDataset(std::string group_id, uint64_t min_size, Error* err) { - return GetNextDataset(std::move(group_id), kDefaultStream, min_size, err); -} - DataSet ConsumerImpl::GetNextDataset(std::string group_id, std::string stream, uint64_t min_size, Error* err) { return GetDatasetFromServer(GetMessageServerOperation::GetNext, 0, std::move(group_id), std::move(stream),min_size, err); } @@ -638,10 +620,6 @@ DataSet ConsumerImpl::GetLastDataset(std::string stream, uint64_t min_size, Erro return GetDatasetFromServer(GetMessageServerOperation::GetLast, 0, "0", std::move(stream),min_size, err); } -DataSet ConsumerImpl::GetLastDataset(uint64_t min_size, Error* err) { - return GetLastDataset(kDefaultStream, min_size, err); -} - DataSet ConsumerImpl::GetDatasetFromServer(GetMessageServerOperation op, uint64_t id, std::string group_id, std::string stream, @@ -660,11 +638,11 @@ DataSet ConsumerImpl::GetDatasetFromServer(GetMessageServerOperation op, return DecodeDatasetFromResponse(response, err); } -DataSet ConsumerImpl::GetDatasetById(uint64_t id, uint64_t min_size, Error* err) { - return GetDatasetById(id, kDefaultStream, min_size, err); -} - DataSet ConsumerImpl::GetDatasetById(uint64_t id, std::string stream, uint64_t min_size, Error* err) { + if (id == 0) { + *err = ConsumerErrorTemplates::kWrongInput.Generate("id should be positive"); + return {}; + } return GetDatasetFromServer(GetMessageServerOperation::GetID, id, "0", std::move(stream), min_size, err); } @@ -761,6 +739,9 @@ Error ConsumerImpl::GetDataFromFileTransferService(MessageMeta* info, MessageDat } Error ConsumerImpl::Acknowledge(std::string group_id, uint64_t id, std::string stream) { + if (stream.empty()) { + return ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); + } RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(stream) + @@ -778,6 +759,10 @@ IdList ConsumerImpl::GetUnacknowledgedMessages(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) { + if (stream.empty()) { + *error = ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); + return {}; + } RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(stream) + @@ -798,14 +783,11 @@ IdList ConsumerImpl::GetUnacknowledgedMessages(std::string group_id, return list; } -IdList ConsumerImpl::GetUnacknowledgedMessages(std::string group_id, - uint64_t from_id, - uint64_t to_id, - Error* error) { - return GetUnacknowledgedMessages(std::move(group_id), kDefaultStream, from_id, to_id, error); -} - uint64_t ConsumerImpl::GetLastAcknowledgedMessage(std::string group_id, std::string stream, Error* error) { + if (stream.empty()) { + *error = ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); + return 0; + } RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(stream) + @@ -828,10 +810,6 @@ uint64_t ConsumerImpl::GetLastAcknowledgedMessage(std::string group_id, std::str return id; } -uint64_t ConsumerImpl::GetLastAcknowledgedMessage(std::string group_id, Error* error) { - return GetLastAcknowledgedMessage(std::move(group_id), kDefaultStream, error); -} - void ConsumerImpl::SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) { resend_ = resend; delay_ms_ = delay_ms; @@ -842,6 +820,9 @@ Error ConsumerImpl::NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_ms, std::string stream) { + if (stream.empty()) { + return ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); + } RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + +"/" + std::move(stream) + diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 0193178c3..166039868 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -56,39 +56,31 @@ class ConsumerImpl final : public asapo::Consumer { explicit ConsumerImpl(std::string server_uri, std::string source_path, bool has_filesystem, SourceCredentials source); - Error Acknowledge(std::string group_id, uint64_t id, std::string stream = kDefaultStream) override; + Error Acknowledge(std::string group_id, uint64_t id, std::string) override; Error NegativeAcknowledge(std::string group_id, uint64_t id, uint64_t delay_ms, - std::string stream = kDefaultStream) override; + std::string stream) override; IdList GetUnacknowledgedMessages(std::string group_id, std::string stream, uint64_t from_id, uint64_t to_id, Error* error) override; - IdList GetUnacknowledgedMessages(std::string group_id, uint64_t from_id, uint64_t to_id, Error* error) override; uint64_t GetLastAcknowledgedMessage(std::string group_id, std::string stream, Error* error) override; - uint64_t GetLastAcknowledgedMessage(std::string group_id, Error* error) override; - Error ResetLastReadMarker(std::string group_id) override; Error ResetLastReadMarker(std::string group_id, std::string stream) override; - Error SetLastReadMarker(std::string group_id, uint64_t value) override; Error SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) override; - Error GetNext(MessageMeta* info, std::string group_id, MessageData* data) override; Error GetNext(MessageMeta* info, std::string group_id, std::string stream, MessageData* data) override; - Error GetLast(MessageMeta* info, MessageData* data) override; Error GetLast(MessageMeta* info, std::string stream, MessageData* data) override; std::string GenerateNewGroupId(Error* err) override; std::string GetBeamtimeMeta(Error* err) override; - uint64_t GetCurrentSize(Error* err) override; uint64_t GetCurrentSize(std::string stream, Error* err) override; - Error GetById(uint64_t id, MessageMeta* info, MessageData* data) override; Error GetById(uint64_t id, MessageMeta* info, std::string stream, MessageData* data) override; @@ -97,16 +89,12 @@ class ConsumerImpl final : public asapo::Consumer { NetworkConnectionType CurrentConnectionType() const override; - MessageMetas QueryMessages(std::string query, Error* err) override; MessageMetas QueryMessages(std::string query, std::string stream, Error* err) override; - DataSet GetNextDataset(std::string group_id, uint64_t min_size, Error* err) override; DataSet GetNextDataset(std::string group_id, std::string stream, uint64_t min_size, Error* err) override; - DataSet GetLastDataset(uint64_t min_size, Error* err) override; DataSet GetLastDataset(std::string stream, uint64_t min_size, Error* err) override; - DataSet GetDatasetById(uint64_t id, uint64_t min_size, Error* err) override; DataSet GetDatasetById(uint64_t id, std::string stream, uint64_t min_size, Error* err) override; Error RetrieveData(MessageMeta* info, MessageData* data) override; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index c65bee5ac..fad9678b9 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -178,7 +178,7 @@ class ConsumerImplTests : public Test { }; TEST_F(ConsumerImplTests, GetMessageReturnsErrorOnWrongInput) { - auto err = consumer->GetNext(nullptr, "", nullptr); + auto err = consumer->GetNext(nullptr, "", expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } @@ -200,7 +200,7 @@ TEST_F(ConsumerImplTests, DefaultStreamIsDetector) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, - Get_t(expected_broker_uri + "/database/beamtime_id/detector/default/" + expected_group_id + Get_t(expected_broker_uri + "/database/beamtime_id/detector/stream/" + expected_group_id + "/next?token=" + expected_token, _, @@ -209,20 +209,7 @@ TEST_F(ConsumerImplTests, DefaultStreamIsDetector) { SetArgPointee<2>(nullptr), Return(""))); - consumer->GetNext(&info, expected_group_id, nullptr); -} - -TEST_F(ConsumerImplTests, GetNextUsesCorrectUri) { - MockGetBrokerUri(); - - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" - + expected_group_id + "/next?token=" - + expected_token, _, - _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(""))); - consumer->GetNext(&info, expected_group_id, nullptr); + consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); } TEST_F(ConsumerImplTests, GetNextUsesCorrectUriWithStream) { @@ -242,13 +229,13 @@ TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, - Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/last?token=" + Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/"+ expected_stream+"/0/last?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - consumer->GetLast(&info, nullptr); + consumer->GetLast(&info, expected_stream, nullptr); } TEST_F(ConsumerImplTests, GetMessageReturnsEndOfStreamFromHttpClient) { @@ -259,7 +246,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsEndOfStreamFromHttpClient) { SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}"))); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData()); @@ -278,7 +265,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsStreamFinishedFromHttpClient) { Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"" + expected_next_stream + "\"}"))); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData()); @@ -296,7 +283,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNoDataFromHttpClient) { SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_stream\":\"""\"}"))); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData()); ASSERT_THAT(err_data->id, Eq(1)); @@ -314,7 +301,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNotAuthorized) { SetArgPointee<2>(nullptr), Return(""))); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } @@ -328,7 +315,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsWrongResponseFromHttpClient) { SetArgPointee<2>(nullptr), Return("id"))); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); ASSERT_THAT(err->Explain(), HasSubstr("malformed")); @@ -342,7 +329,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerAddressNotFound) { Return(""))); consumer->SetTimeout(100); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err->Explain(), AllOf(HasSubstr(expected_server_uri), HasSubstr("unavailable"))); } @@ -355,7 +342,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerUriEmpty) { Return(""))); consumer->SetTimeout(100); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err->Explain(), AllOf(HasSubstr(expected_server_uri), HasSubstr("unavailable"))); } @@ -365,13 +352,13 @@ TEST_F(ConsumerImplTests, GetDoNotCallBrokerUriIfAlreadyFound) { MockGet("error_response"); consumer->SetTimeout(100); - consumer->GetNext(&info, expected_group_id, nullptr); + consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); Mock::VerifyAndClearExpectations(&mock_http_client); EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/asap-broker"), _, _)).Times(0); MockGet("error_response"); - consumer->GetNext(&info, expected_group_id, nullptr); + consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); } TEST_F(ConsumerImplTests, GetBrokerUriAgainAfterConnectionError) { @@ -379,12 +366,12 @@ TEST_F(ConsumerImplTests, GetBrokerUriAgainAfterConnectionError) { MockGetError(); consumer->SetTimeout(0); - consumer->GetNext(&info, expected_group_id, nullptr); + consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); Mock::VerifyAndClearExpectations(&mock_http_client); MockGetBrokerUri(); MockGet("error_response"); - consumer->GetNext(&info, expected_group_id, nullptr); + consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); } TEST_F(ConsumerImplTests, GetMessageReturnsEofStreamFromHttpClientUntilTimeout) { @@ -396,7 +383,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsEofStreamFromHttpClientUntilTimeout) Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"""\"}"))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } @@ -411,7 +398,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNoDataAfterTimeoutEvenIfOtherErrorOcc Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) + ",\"id_max\":2,\"next_stream\":\"""\"}"))); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::NotFound), @@ -419,7 +406,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNoDataAfterTimeoutEvenIfOtherErrorOcc Return(""))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); } @@ -433,7 +420,7 @@ TEST_F(ConsumerImplTests, GetNextMessageReturnsImmediatelyOnTransferError) { Return(""))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); ASSERT_THAT(err->Explain(), HasSubstr("sss")); @@ -456,7 +443,7 @@ TEST_F(ConsumerImplTests, GetNextRetriesIfConnectionHttpClientErrorUntilTimeout) Return(""))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kUnavailableService)); } @@ -470,7 +457,7 @@ TEST_F(ConsumerImplTests, GetNextMessageReturnsImmediatelyOnFinshedStream) { Return("{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_stream\":\"next\"}"))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kStreamFinished)); } @@ -483,7 +470,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsMessageMeta) { MockGet(json); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err, Eq(nullptr)); @@ -497,7 +484,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsParseError) { MockGetBrokerUri(); MockGet("error_response"); - auto err = consumer->GetNext(&info, expected_group_id, nullptr); + auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); } @@ -509,7 +496,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsIfNoDataNeeded) { EXPECT_CALL(mock_netclient, GetData_t(_, _)).Times(0); EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0); - consumer->GetNext(&info, expected_group_id, nullptr); + consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); } TEST_F(ConsumerImplTests, GetMessageTriesToGetDataFromMemoryCache) { @@ -522,7 +509,7 @@ TEST_F(ConsumerImplTests, GetMessageTriesToGetDataFromMemoryCache) { EXPECT_CALL(mock_netclient, GetData_t(&info, &data)).WillOnce(Return(nullptr)); MockReadDataFromFile(0); - consumer->GetNext(&info, expected_group_id, &data); + consumer->GetNext(&info, expected_group_id, expected_stream, &data); ASSERT_THAT(info.buf_id, Eq(expected_buf_id)); @@ -540,7 +527,7 @@ TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfCannotReadFromCache) { &data)).WillOnce(Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release())); MockReadDataFromFile(); - consumer->GetNext(&info, expected_group_id, &data); + consumer->GetNext(&info, expected_group_id, expected_stream, &data); ASSERT_THAT(info.buf_id, Eq(0)); } @@ -556,7 +543,7 @@ TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfZeroBufId) { MockReadDataFromFile(); - consumer->GetNext(&info, expected_group_id, &data); + consumer->GetNext(&info, expected_group_id, expected_stream, &data); } TEST_F(ConsumerImplTests, GenerateNewGroupIdReturnsErrorCreateGroup) { @@ -595,13 +582,13 @@ TEST_F(ConsumerImplTests, ResetCounterByDefaultUsesCorrectUri) { consumer->SetTimeout(100); EXPECT_CALL(mock_http_client, - Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + + Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/" + expected_group_id + "/resetcounter?token=" + expected_token + "&value=0", _, _, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), SetArgPointee<4>(nullptr), Return(""))); - auto err = consumer->ResetLastReadMarker(expected_group_id); + auto err = consumer->ResetLastReadMarker(expected_group_id, expected_stream); ASSERT_THAT(err, Eq(nullptr)); } @@ -609,21 +596,6 @@ TEST_F(ConsumerImplTests, ResetCounterUsesCorrectUri) { MockGetBrokerUri(); consumer->SetTimeout(100); - EXPECT_CALL(mock_http_client, - Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + - expected_group_id + - "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll( - SetArgPointee<3>(HttpCode::OK), - SetArgPointee<4>(nullptr), - Return(""))); - auto err = consumer->SetLastReadMarker(expected_group_id, 10); - ASSERT_THAT(err, Eq(nullptr)); -} - -TEST_F(ConsumerImplTests, ResetCounterUsesCorrectUriWithStream) { - MockGetBrokerUri(); - consumer->SetTimeout(100); - EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_stream + "/" + expected_group_id + @@ -639,22 +611,6 @@ TEST_F(ConsumerImplTests, GetCurrentSizeUsesCorrectUri) { MockGetBrokerUri(); consumer->SetTimeout(100); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + - "/default/size?token=" - + expected_token, _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return("{\"size\":10}"))); - asapo::Error err; - auto size = consumer->GetCurrentSize(&err); - ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(size, Eq(10)); -} - -TEST_F(ConsumerImplTests, GetCurrentSizeUsesCorrectUriWithStream) { - MockGetBrokerUri(); - consumer->SetTimeout(100); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_stream + "/size?token=" + expected_token, _, _)).WillOnce(DoAll( @@ -672,13 +628,13 @@ TEST_F(ConsumerImplTests, GetCurrentSizeErrorOnWrongResponce) { consumer->SetTimeout(100); EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + - "/default/size?token=" + "/"+expected_stream+"/size?token=" + expected_token, _, _)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::Unauthorized), SetArgPointee<2>(nullptr), Return(""))); asapo::Error err; - auto size = consumer->GetCurrentSize(&err); + auto size = consumer->GetCurrentSize(expected_stream, &err); ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(size, Eq(0)); } @@ -688,13 +644,13 @@ TEST_F(ConsumerImplTests, GetNDataErrorOnWrongParse) { consumer->SetTimeout(100); EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + - "/default/size?token=" + "/stream/size?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return("{\"siz\":10}"))); asapo::Error err; - auto size = consumer->GetCurrentSize(&err); + auto size = consumer->GetCurrentSize(expected_stream,&err); ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(size, Eq(0)); } @@ -705,7 +661,7 @@ TEST_F(ConsumerImplTests, GetByIdUsesCorrectUri) { auto to_send = CreateFI(); auto json = to_send.Json(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/0/" + std::to_string( expected_dataset_id) + "?token=" + expected_token, _, @@ -714,7 +670,7 @@ TEST_F(ConsumerImplTests, GetByIdUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(json))); - auto err = consumer->GetById(expected_dataset_id, &info, nullptr); + auto err = consumer->GetById(expected_dataset_id, &info, expected_stream, nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(info.name, Eq(to_send.name)); @@ -724,14 +680,14 @@ TEST_F(ConsumerImplTests, GetByIdTimeouts) { MockGetBrokerUri(); consumer->SetTimeout(10); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), Return(""))); - auto err = consumer->GetById(expected_dataset_id, &info, nullptr); + auto err = consumer->GetById(expected_dataset_id, &info, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); } @@ -740,14 +696,14 @@ TEST_F(ConsumerImplTests, GetByIdReturnsEndOfStream) { MockGetBrokerUri(); consumer->SetTimeout(10); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"""\"}"))); - auto err = consumer->GetById(expected_dataset_id, &info, nullptr); + auto err = consumer->GetById(expected_dataset_id, &info, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } @@ -756,14 +712,14 @@ TEST_F(ConsumerImplTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) { MockGetBrokerUri(); consumer->SetTimeout(10); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":100,\"id_max\":1,\"next_stream\":\"""\"}"))); - auto err = consumer->GetById(expected_dataset_id, &info, nullptr); + auto err = consumer->GetById(expected_dataset_id, &info, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } @@ -798,7 +754,7 @@ TEST_F(ConsumerImplTests, QueryMessagesReturnError) { consumer->SetTimeout(1000); asapo::Error err; - auto messages = consumer->QueryMessages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); ASSERT_THAT(err->Explain(), HasSubstr("query")); @@ -815,7 +771,7 @@ TEST_F(ConsumerImplTests, QueryMessagesReturnEmptyResults) { consumer->SetTimeout(100); asapo::Error err; - auto messages = consumer->QueryMessages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(messages.size(), Eq(0)); @@ -839,7 +795,7 @@ TEST_F(ConsumerImplTests, QueryMessagesWrongResponseArray) { consumer->SetTimeout(100); asapo::Error err; - auto messages = consumer->QueryMessages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err); ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(messages.size(), Eq(0)); @@ -859,7 +815,7 @@ TEST_F(ConsumerImplTests, QueryMessagesWrongResponseRecorsd) { consumer->SetTimeout(100); asapo::Error err; - auto messages = consumer->QueryMessages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err); ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(messages.size(), Eq(0)); @@ -878,7 +834,7 @@ TEST_F(ConsumerImplTests, QueryMessagesReturnRecords) { auto responce_string = "[" + json1 + "," + json2 + "]"; EXPECT_CALL(mock_http_client, - Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0" + + Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/0" + "/querymessages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), SetArgPointee<4>(nullptr), @@ -886,7 +842,7 @@ TEST_F(ConsumerImplTests, QueryMessagesReturnRecords) { consumer->SetTimeout(100); asapo::Error err; - auto messages = consumer->QueryMessages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(messages.size(), Eq(2)); @@ -895,29 +851,10 @@ TEST_F(ConsumerImplTests, QueryMessagesReturnRecords) { ASSERT_THAT(messages[1].name, Eq(rec2.name)); } -TEST_F(ConsumerImplTests, QueryMessagesUsesCorrectUriWithStream) { - - MockGetBrokerUri(); - - EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + - expected_stream + "/0" + - "/querymessages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll( - SetArgPointee<3>(HttpCode::OK), - SetArgPointee<4>(nullptr), - Return("[]"))); - - consumer->SetTimeout(100); - asapo::Error err; - auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err); - - ASSERT_THAT(err, Eq(nullptr)); - -} - TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/" + expected_group_id + "/next?token=" + expected_token + "&dataset=true&minsize=0", _, _)).WillOnce(DoAll( @@ -925,9 +862,16 @@ TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(""))); asapo::Error err; - consumer->GetNextDataset(expected_group_id, 0, &err); + consumer->GetNextDataset(expected_group_id, expected_stream, 0, &err); } +TEST_F(ConsumerImplTests, GetNextErrorOnEmptyStream) { + MessageData data; + auto err = consumer->GetNext(&info, expected_group_id, "", &data); + ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); +} + + TEST_F(ConsumerImplTests, GetDataSetReturnsMessageMetas) { asapo::Error err; MockGetBrokerUri(); @@ -949,7 +893,7 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsMessageMetas) { MockGet(json); - auto dataset = consumer->GetNextDataset(expected_group_id, 0, &err); + auto dataset = consumer->GetNextDataset(expected_group_id, expected_stream, 0, &err); ASSERT_THAT(err, Eq(nullptr)); @@ -981,7 +925,7 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsPartialMessageMetas) { MockGet(json, asapo::HttpCode::PartialContent); - auto dataset = consumer->GetNextDataset(expected_group_id, 0, &err); + auto dataset = consumer->GetNextDataset(expected_group_id, expected_stream, 0, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kPartialData)); @@ -1016,7 +960,7 @@ TEST_F(ConsumerImplTests, GetDataSetByIdReturnsPartialMessageMetas) { MockGet(json, asapo::HttpCode::PartialContent); - auto dataset = consumer->GetDatasetById(1, 0, &err); + auto dataset = consumer->GetDatasetById(1, expected_stream, 0, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kPartialData)); auto err_data = static_cast<const asapo::PartialErrorData*>(err->GetCustomData()); @@ -1034,7 +978,7 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsParseError) { MockGet("error_response"); asapo::Error err; - auto dataset = consumer->GetNextDataset(expected_group_id, 0, &err); + auto dataset = consumer->GetNextDataset(expected_group_id, expected_stream, 0, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); ASSERT_THAT(dataset.content.size(), Eq(0)); @@ -1045,20 +989,6 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsParseError) { TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, - Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/last?token=" - + expected_token + "&dataset=true&minsize=2", _, - _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(""))); - asapo::Error err; - consumer->GetLastDataset(2, &err); -} - -TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUriWithStream) { - MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_stream + "/0/last?token=" + expected_token + "&dataset=true&minsize=1", _, @@ -1073,7 +1003,7 @@ TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUriWithStream) { TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/0/" + std::to_string(expected_dataset_id) + "?token=" + expected_token + "&dataset=true" + "&minsize=0", _, _)).WillOnce(DoAll( @@ -1081,7 +1011,7 @@ TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(""))); asapo::Error err; - consumer->GetDatasetById(expected_dataset_id, 0, &err); + consumer->GetDatasetById(expected_dataset_id, expected_stream, 0, &err); } TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) { @@ -1187,7 +1117,7 @@ void ConsumerImplTests::AssertSingleFileTransfer() { MockGetFTSUri(); ExpectFileTransfer(nullptr); - fts_consumer->GetNext(&info, expected_group_id, &data); + fts_consumer->GetNext(&info, expected_group_id, expected_stream, &data); ASSERT_THAT(data[0], Eq(expected_value)); Mock::VerifyAndClearExpectations(&mock_http_client); @@ -1233,7 +1163,7 @@ TEST_F(ConsumerImplTests, GetMessageReusesTokenAndUri) { MockBeforeFTS(&data); ExpectFileTransfer(nullptr); - auto err = fts_consumer->GetNext(&info, expected_group_id, &data); + auto err = fts_consumer->GetNext(&info, expected_group_id, expected_stream, &data); } TEST_F(ConsumerImplTests, GetMessageTriesToGetTokenAgainIfTransferFailed) { @@ -1244,7 +1174,7 @@ TEST_F(ConsumerImplTests, GetMessageTriesToGetTokenAgainIfTransferFailed) { ExpectRepeatedFileTransfer(); ExpectFolderToken(); - auto err = fts_consumer->GetNext(&info, expected_group_id, &data); + auto err = fts_consumer->GetNext(&info, expected_group_id, expected_stream, &data); } TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) { @@ -1264,23 +1194,6 @@ TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) { ASSERT_THAT(err, Eq(nullptr)); } -TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUriWithDefaultStream) { - MockGetBrokerUri(); - auto expected_acknowledge_command = "{\"Op\":\"ackmessage\"}"; - EXPECT_CALL(mock_http_client, - Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + - expected_group_id - + "/" + std::to_string(expected_dataset_id) + "?token=" - + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll( - SetArgPointee<3>(HttpCode::OK), - SetArgPointee<4>(nullptr), - Return(""))); - - auto err = consumer->Acknowledge(expected_group_id, expected_dataset_id); - - ASSERT_THAT(err, Eq(nullptr)); -} - void ConsumerImplTests::ExpectIdList(bool error) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + @@ -1331,7 +1244,7 @@ TEST_F(ConsumerImplTests, GetLastAcknowledgeReturnsNoData) { TEST_F(ConsumerImplTests, GetByIdErrorsForId0) { - auto err = consumer->GetById(0, &info, nullptr); + auto err = consumer->GetById(0, &info, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } @@ -1339,7 +1252,7 @@ TEST_F(ConsumerImplTests, GetByIdErrorsForId0) { TEST_F(ConsumerImplTests, ResendNacks) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/stream/" + expected_group_id + "/next?token=" + expected_token + "&resend_nacks=true&delay_ms=10000&resend_attempts=3", _, _)).WillOnce(DoAll( @@ -1348,7 +1261,7 @@ TEST_F(ConsumerImplTests, ResendNacks) { Return(""))); consumer->SetResendNacs(true, 10000, 3); - consumer->GetNext(&info, expected_group_id, nullptr); + consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); } TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) { @@ -1379,7 +1292,7 @@ TEST_F(ConsumerImplTests, CanInterruptOperation) { asapo::Error err; auto exec = [this,&err]() { consumer->SetTimeout(10000); - err = consumer->GetNext(&info, "", nullptr); + err = consumer->GetNext(&info, "", expected_stream, nullptr); }; auto thread = std::thread(exec); std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/examples/consumer/getnext/getnext.cpp b/examples/consumer/getnext/getnext.cpp index 4d40d8bd5..ca310858c 100644 --- a/examples/consumer/getnext/getnext.cpp +++ b/examples/consumer/getnext/getnext.cpp @@ -137,7 +137,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err bool isFirstFile = true; while (true) { if (params.datasets) { - auto dataset = consumer->GetNextDataset(group_id, 0, &err); + auto dataset = consumer->GetNextDataset(group_id,"default", 0, &err); if (err == nullptr) { for (auto& fi : dataset.content) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; @@ -145,7 +145,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err } } } else { - err = consumer->GetNext(&fi, group_id, params.read_data ? &data : nullptr); + err = consumer->GetNext(&fi, group_id,"default", params.read_data ? &data : nullptr); if (isFirstFile) { isFirstFile = false; timer->count_down_and_wait(); diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 87becf1a9..27c450095 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -27,16 +27,16 @@ system_clock::time_point streamout_start; system_clock::time_point streamout_finish; struct Args { - std::string server; - std::string file_path; - std::string beamtime_id; - std::string stream_in; - std::string stream_out; - std::string token; - int timeout_ms; - int timeout_ms_producer; - int nthreads; - bool transfer_data; + std::string server; + std::string file_path; + std::string beamtime_id; + std::string stream_in; + std::string stream_out; + std::string token; + int timeout_ms; + int timeout_ms_producer; + int nthreads; + bool transfer_data; }; void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { @@ -51,24 +51,24 @@ void ProcessAfterSend(asapo::RequestCallbackPayload payload, asapo::Error err) { } - void WaitConsumerThreadsFinished(std::vector<std::thread>* threads) { - for (auto& thread : *threads) { + for (auto &thread : *threads) { thread.join(); } } -int ProcessError(const Error& err) { +int ProcessError(const Error &err) { if (err == nullptr) return 0; std::cout << err->Explain() << std::endl; return err == asapo::ConsumerErrorTemplates::kEndOfStream ? 0 : 1; } -ConsumerPtr CreateConsumerAndGroup(const Args& args, Error* err) { +ConsumerPtr CreateConsumerAndGroup(const Args &args, Error* err) { auto consumer = asapo::ConsumerFactory::CreateConsumer(args.server, args.file_path, true, - asapo::SourceCredentials{asapo::SourceType::kProcessed, - args.beamtime_id, "", args.stream_in, - args.token}, err); + asapo::SourceCredentials{asapo::SourceType::kProcessed, + args.beamtime_id, "", + args.stream_in, + args.token}, err); if (*err) { return nullptr; } @@ -88,7 +88,7 @@ ConsumerPtr CreateConsumerAndGroup(const Args& args, Error* err) { return consumer; } -void GetBeamtimeMeta(const ConsumerPtr& consumer) { +void GetBeamtimeMeta(const ConsumerPtr &consumer) { Error err; auto meta = consumer->GetBeamtimeMeta(&err); if (err == nullptr) { @@ -98,17 +98,17 @@ void GetBeamtimeMeta(const ConsumerPtr& consumer) { } } -void SendDownstreamThePipeline(const Args& args, const asapo::MessageMeta& fi, asapo::MessageData data, - const ProducerPtr& producer) { +void SendDownstreamThePipeline(const Args &args, const asapo::MessageMeta &fi, asapo::MessageData data, + const ProducerPtr &producer) { asapo::MessageHeader header{fi.id, fi.size, fi.name, fi.metadata}; Error err_send; if (args.transfer_data) { header.file_name += "_" + args.stream_out; - err_send = producer->Send(header, std::move(data), asapo::kDefaultIngestMode, ProcessAfterSend); + err_send = producer->Send(header, "default", std::move(data), asapo::kDefaultIngestMode, ProcessAfterSend); } else { header.file_name = args.file_path + asapo::kPathSeparator + header.file_name; err_send = - producer->Send(header, nullptr, asapo::IngestModeFlags::kTransferMetaDataOnly, ProcessAfterSend); + producer->Send(header, "default", nullptr, asapo::IngestModeFlags::kTransferMetaDataOnly, ProcessAfterSend); std::cout << err_send << std::endl; } @@ -124,11 +124,11 @@ void SendDownstreamThePipeline(const Args& args, const asapo::MessageMeta& fi, a } } -Error ProcessNextEvent(const Args& args, const ConsumerPtr& consumer, const ProducerPtr& producer) { +Error ProcessNextEvent(const Args &args, const ConsumerPtr &consumer, const ProducerPtr &producer) { asapo::MessageData data; asapo::MessageMeta fi; - auto err = consumer->GetNext(&fi, group_id, args.transfer_data ? &data : nullptr); + auto err = consumer->GetNext(&fi, group_id, "default", args.transfer_data ? &data : nullptr); if (err) { return err; } @@ -138,28 +138,29 @@ Error ProcessNextEvent(const Args& args, const ConsumerPtr& consumer, const Prod return nullptr; } -std::vector<std::thread> StartConsumerThreads(const Args& args, const ProducerPtr& producer, +std::vector<std::thread> StartConsumerThreads(const Args &args, const ProducerPtr &producer, std::vector<int>* nfiles, std::vector<int>* errors) { - auto exec_next = [&args, nfiles, errors, &producer ](int i) { - asapo::MessageMeta fi; - Error err; - auto consumer = CreateConsumerAndGroup(args, &err); - if (err) { - (*errors)[i] += ProcessError(err); - return; - } - - while (true) { - auto err = ProcessNextEvent(args, consumer, producer); - if (err) { - (*errors)[i] += ProcessError(err); - if (err == asapo::ConsumerErrorTemplates::kEndOfStream || err == asapo::ConsumerErrorTemplates::kWrongInput) { - break; - } - } - (*nfiles)[i]++; - } + auto exec_next = [&args, nfiles, errors, &producer](int i) { + asapo::MessageMeta fi; + Error err; + auto consumer = CreateConsumerAndGroup(args, &err); + if (err) { + (*errors)[i] += ProcessError(err); + return; + } + + while (true) { + auto err = ProcessNextEvent(args, consumer, producer); + if (err) { + (*errors)[i] += ProcessError(err); + if (err == asapo::ConsumerErrorTemplates::kEndOfStream + || err == asapo::ConsumerErrorTemplates::kWrongInput) { + break; + } + } + (*nfiles)[i]++; + } }; std::vector<std::thread> threads; @@ -169,7 +170,7 @@ std::vector<std::thread> StartConsumerThreads(const Args& args, const ProducerPt return threads; } -int ProcessAllData(const Args& args, const ProducerPtr& producer, uint64_t* duration_ms, int* nerrors) { +int ProcessAllData(const Args &args, const ProducerPtr &producer, uint64_t* duration_ms, int* nerrors) { asapo::MessageMeta fi; system_clock::time_point t1 = system_clock::now(); @@ -188,12 +189,13 @@ int ProcessAllData(const Args& args, const ProducerPtr& producer, uint64_t* dura return n_total; } -std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { +std::unique_ptr<asapo::Producer> CreateProducer(const Args &args) { asapo::Error err; auto producer = asapo::Producer::Create(args.server, args.nthreads, asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{asapo::SourceType::kProcessed,args.beamtime_id, "", args.stream_out, args.token }, 60000, &err); - if(err) { + asapo::SourceCredentials{asapo::SourceType::kProcessed, args.beamtime_id, + "", args.stream_out, args.token}, 60000, &err); + if (err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); } @@ -208,7 +210,7 @@ int main(int argc, char* argv[]) { Args args; if (argc != 11) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <beamtime_id> <stream_in> <stream_out> <nthreads> <token> <timeout ms> <timeout ms producer> <transfer data>" + + " <server> <files_path> <beamtime_id> <stream_in> <stream_out> <nthreads> <token> <timeout ms> <timeout ms producer> <transfer data>" << std::endl; exit(EXIT_FAILURE); @@ -249,9 +251,7 @@ int main(int argc, char* argv[]) { std::cout << " Elapsed : " << duration_streamout.count() << "ms" << std::endl; std::cout << " Rate : " << 1000.0f * files_sent / (duration_streamout.count()) << std::endl; - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - return (nerrors == 0) && (files_sent == nfiles) ? 0 : 1; } diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index a23c2ed08..b05ecc0a9 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -154,7 +154,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it message_header.file_name = message_folder+message_header.file_name; message_header.user_metadata = std::move(meta); if (messages_in_set == 1) { - auto err = producer->Send(message_header, std::move(buffer), write_files ? asapo::kDefaultIngestMode : + auto err = producer->Send(message_header,"default", std::move(buffer), write_files ? asapo::kDefaultIngestMode : asapo::kTransferData, &ProcessAfterSend); if (err) { std::cerr << "Cannot send file: " << err << std::endl; @@ -173,7 +173,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it message_header.file_name = message_folder + message_header.file_name; message_header.user_metadata = meta; auto err = - producer->Send(message_header, std::move(buffer), write_files ? asapo::kDefaultIngestMode : + producer->Send(message_header, "default", std::move(buffer), write_files ? asapo::kDefaultIngestMode : asapo::kTransferData, &ProcessAfterSend); if (err) { std::cerr << "Cannot send file: " << err << std::endl; diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h index 66b0a5b15..3b6733565 100644 --- a/producer/api/cpp/include/asapo/producer/producer.h +++ b/producer/api/cpp/include/asapo/producer/producer.h @@ -25,12 +25,11 @@ class Producer { //! Get stream information from receiver /*! - \param stream (optional) - stream + \param stream - stream to send messages to \param timeout_ms - operation timeout in milliseconds \return StreamInfo - a structure with stream information */ virtual StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const = 0; - virtual StreamInfo GetStreamInfo(uint64_t timeout_ms, Error* err) const = 0; //! Get stream that has the newest ingested data /*! @@ -39,22 +38,6 @@ class Producer { */ virtual StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) const = 0; - - //! Sends message to the receiver - /*! - \param message_header - A stucture with the meta information (file name, size, a string with user metadata (JSON format)). - \param data - A smart pointer to the message data to send - \return Error - Will be nullptr on success - */ - virtual Error Send(const MessageHeader& message_header, MessageData data, uint64_t ingest_mode, - RequestCallback callback) = 0; - - - //! Sends message to the receiver - same as Send - memory should not be freed until send is finished - //! used e.g. for Python bindings - virtual Error Send__(const MessageHeader& message_header, void* data, uint64_t ingest_mode, - RequestCallback callback) = 0; - //! Sends message to the receiver /*! \param message_header - A stucture with the meta information (file name, size, a string with user metadata (JSON format)). @@ -74,15 +57,6 @@ class Producer { //! used e.g. for Python bindings virtual void StopThreads__() = 0; - //! Sends message from a file to the default stream - /*! - \param message_header - A stucture with the meta information (file name, size is ignored). - \param full_path - A full path of the file to send - \return Error - Will be nullptr on success - */ - virtual Error SendFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode, - RequestCallback callback) = 0; - //! Sends message from a file to a stream /*! \param message_header - A stucture with the meta information (file name, size is ignored). diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index 6fc10904f..fcb7db06c 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -71,7 +71,12 @@ Error CheckIngestMode(uint64_t ingest_mode) { return nullptr; } -Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_mode) { +Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_mode, const std::string& stream) { + + if (stream.empty()) { + return ProducerErrorTemplates::kWrongInput.Generate("stream empty"); + } + if (message_header.file_name.size() > kMaxMessageSize) { return ProducerErrorTemplates::kWrongInput.Generate("too long filename"); } @@ -98,7 +103,7 @@ Error ProducerImpl::Send(const MessageHeader& message_header, uint64_t ingest_mode, RequestCallback callback, bool manage_data_memory) { - auto err = CheckProducerRequest(message_header, ingest_mode); + auto err = CheckProducerRequest(message_header, ingest_mode, stream); if (err) { if (!manage_data_memory) { data.release(); @@ -131,11 +136,6 @@ Error CheckData(uint64_t ingest_mode, const MessageHeader& message_header, const return nullptr; } -Error ProducerImpl::Send(const MessageHeader& message_header, MessageData data, - uint64_t ingest_mode, RequestCallback callback) { - return Send(message_header, kDefaultStream, std::move(data), ingest_mode, callback); -} - Error ProducerImpl::Send(const MessageHeader& message_header, std::string stream, MessageData data, @@ -161,12 +161,6 @@ Error ProducerImpl::SendStreamFinishedFlag(std::string stream, uint64_t last_id, return Send(message_header, std::move(stream), nullptr, "", IngestModeFlags::kTransferMetaDataOnly, callback, true); } -Error ProducerImpl::SendFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode, - RequestCallback callback) { - return SendFile(message_header, kDefaultStream, std::move(full_path), ingest_mode, callback); -} - - void ProducerImpl::SetLogLevel(LogLevel level) { log__->SetLogLevel(level); } @@ -240,13 +234,6 @@ Error ProducerImpl::Send__(const MessageHeader& message_header, return Send(std::move(message_header), std::move(stream), std::move(data_wrapped), "", ingest_mode, callback, false); } -Error ProducerImpl::Send__(const MessageHeader& message_header, - void* data, - uint64_t ingest_mode, - RequestCallback callback) { - return Send__(message_header, kDefaultStream, data, ingest_mode, callback); -} - uint64_t ProducerImpl::GetRequestsQueueSize() { return request_pool__->NRequestsInPool(); }; @@ -348,13 +335,13 @@ StreamInfo ProducerImpl::StreamRequest(StreamRequestOp op,std::string stream, ui } StreamInfo ProducerImpl::GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const { + if (stream.empty()) { + *err = ProducerErrorTemplates::kWrongInput.Generate("stream empty"); + return {}; + } return StreamRequest(StreamRequestOp::kStreamInfo,stream,timeout_ms,err); } -StreamInfo ProducerImpl::GetStreamInfo(uint64_t timeout_ms, Error* err) const { - return GetStreamInfo(kDefaultStream, timeout_ms, err); -} - StreamInfo ProducerImpl::GetLastStream(uint64_t timeout_ms, Error* err) const { return StreamRequest(StreamRequestOp::kLastStream,"",timeout_ms,err); } diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 3e04b58e2..5e075186c 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -33,28 +33,18 @@ class ProducerImpl : public Producer { ProducerImpl &operator=(const ProducerImpl &) = delete; StreamInfo GetStreamInfo(std::string stream, uint64_t timeout_ms, Error* err) const override; - StreamInfo GetStreamInfo(uint64_t timeout_ms, Error* err) const override; StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) const override; void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error Send(const MessageHeader &message_header, - MessageData data, - uint64_t ingest_mode, - RequestCallback callback) override; - Error Send__(const MessageHeader &message_header, void* data, uint64_t ingest_mode, - RequestCallback callback) override; Error Send(const MessageHeader &message_header, std::string stream, MessageData data, uint64_t ingest_mode, RequestCallback callback) override; Error Send__(const MessageHeader &message_header, std::string stream, void* data, uint64_t ingest_mode, RequestCallback callback) override; void StopThreads__() override; - Error SendFile(const MessageHeader &message_header, std::string full_path, uint64_t ingest_mode, - RequestCallback callback) override; Error SendFile(const MessageHeader &message_header, std::string stream, std::string full_path, uint64_t ingest_mode, RequestCallback callback) override; - Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream, RequestCallback callback) override; diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp index 5fd627a86..950363189 100644 --- a/producer/api/cpp/unittests/test_producer.cpp +++ b/producer/api/cpp/unittests/test_producer.cpp @@ -63,7 +63,7 @@ TEST(Producer, SimpleWorkflowWihoutConnection) { &err); asapo::MessageHeader message_header{1, 1, "test"}; - auto err_send = producer->Send(message_header, nullptr, asapo::kTransferMetaDataOnly, nullptr); + auto err_send = producer->Send(message_header,"stream", nullptr, asapo::kTransferMetaDataOnly, nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 089dae54b..595b2a833 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -100,108 +100,73 @@ TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_, false)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); asapo::MessageHeader message_header{1, 1, "test"}; - auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header,"default", nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::MessageHeader message_header{1, 1, long_string}; - auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header,"default", nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } +TEST_F(ProducerImplTests, ErrorIfStreamEmpty) { + asapo::MessageHeader message_header{1, 100, expected_fullpath}; + auto err = producer.Send(message_header,"", nullptr, expected_ingest_mode, nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); +} + + TEST_F(ProducerImplTests, ErrorIfFileEmpty) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::MessageHeader message_header{1, 1, ""}; - auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, "default", nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfDatasetSizeNotDefined) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("dataset dimensions"))); asapo::MessageHeader message_header{1, 1000, "test", "", 1}; - auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, "default", nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfZeroDataSize) { asapo::MessageData data = asapo::MessageData{new uint8_t[100]}; asapo::MessageHeader message_header{1, 0, expected_fullpath}; - auto err = producer.Send(message_header, std::move(data), asapo::kDefaultIngestMode, nullptr); + auto err = producer.Send(message_header, "default", std::move(data), asapo::kDefaultIngestMode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfNoData) { asapo::MessageHeader message_header{1, 100, expected_fullpath}; - auto err = producer.Send(message_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.Send(message_header, "default", nullptr, asapo::kDefaultIngestMode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfNoDataSend_) { asapo::MessageHeader message_header{1, 100, expected_fullpath}; - auto err = producer.Send__(message_header, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.Send__(message_header, expected_stream, nullptr, asapo::kDefaultIngestMode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfSendingDataWithZeroId) { asapo::MessageHeader message_header{0, 100, expected_fullpath}; - auto err = producer.Send(message_header, nullptr, asapo::kTransferMetaDataOnly, nullptr); + auto err = producer.Send(message_header, "default", nullptr, asapo::kTransferMetaDataOnly, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, OkIfNoDataWithTransferMetadataOnlyMode) { asapo::MessageHeader message_header{1, 100, expected_fullpath}; - auto err = producer.Send(message_header, nullptr, asapo::kTransferMetaDataOnly, nullptr); + auto err = producer.Send(message_header, "default", nullptr, asapo::kTransferMetaDataOnly, nullptr); ASSERT_THAT(err, Eq(nullptr)); } TEST_F(ProducerImplTests, OkIfZeroSizeWithTransferMetadataOnlyMode) { asapo::MessageData data = asapo::MessageData{new uint8_t[100]}; asapo::MessageHeader message_header{1, 0, expected_fullpath}; - auto err = producer.Send(message_header, std::move(data), asapo::kTransferMetaDataOnly, nullptr); - ASSERT_THAT(err, Eq(nullptr)); -} - -TEST_F(ProducerImplTests, UsesDefaultStream) { - producer.SetCredentials(expected_default_credentials); - - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendRequest(asapo::kOpcodeTransferData, - expected_default_credentials_str, - expected_metadata, - expected_id, - expected_size, - expected_name, - asapo::kDefaultStream.c_str(), - expected_ingest_mode, - 0, - 0), false)).WillOnce(Return(nullptr)); - - asapo::MessageHeader message_header{expected_id, expected_size, expected_name, expected_metadata}; - auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr); - - ASSERT_THAT(err, Eq(nullptr)); -} - -TEST_F(ProducerImplTests, OKSendingSendRequest) { - producer.SetCredentials(expected_credentials); - - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendRequest(asapo::kOpcodeTransferData, - expected_credentials_str, - expected_metadata, - expected_id, - expected_size, - expected_name, - asapo::kDefaultStream.c_str(), - expected_ingest_mode, - 0, - 0 - ), false)).WillOnce(Return( - nullptr)); - - asapo::MessageHeader message_header{expected_id, expected_size, expected_name, expected_metadata}; - auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr); - + auto err = producer.Send(message_header, "default", std::move(data), asapo::kTransferMetaDataOnly, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -250,6 +215,13 @@ TEST_F(ProducerImplTests, OKSendingStreamFinish) { ASSERT_THAT(err, Eq(nullptr)); } +TEST_F(ProducerImplTests, ErrorSendingStreamFinishWithemptyStream) { + producer.SetCredentials(expected_credentials); + auto err = producer.SendStreamFinishedFlag("", expected_id, expected_next_stream, nullptr); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); +} + TEST_F(ProducerImplTests, OKSendingStreamFinishWithNoNextStream) { producer.SetCredentials(expected_credentials); @@ -283,7 +255,7 @@ TEST_F(ProducerImplTests, OKSendingSendDatasetDataRequest) { expected_id, expected_size, expected_name, - asapo::kDefaultStream.c_str(), + expected_stream, expected_ingest_mode, expected_dataset_id, expected_dataset_size), false)).WillOnce( @@ -292,7 +264,7 @@ TEST_F(ProducerImplTests, OKSendingSendDatasetDataRequest) { asapo::MessageHeader message_header {expected_id, expected_size, expected_name, expected_metadata, expected_dataset_id, expected_dataset_size}; - auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, expected_stream, nullptr, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -327,7 +299,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::MessageHeader message_header{expected_id, 0, expected_name}; - auto err = producer.SendFile(message_header, "", expected_ingest_mode, nullptr); + auto err = producer.SendFile(message_header, expected_stream, "", expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); @@ -339,31 +311,22 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::MessageHeader message_header{expected_id, 0, ""}; - auto err = producer.SendFile(message_header, expected_fullpath, expected_ingest_mode, nullptr); + auto err = producer.SendFile(message_header, expected_stream, expected_fullpath, expected_ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } -TEST_F(ProducerImplTests, OKSendingSendFileRequest) { +TEST_F(ProducerImplTests, ErrorSendingFileToEmptyStream) { producer.SetCredentials(expected_credentials); - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendRequest(asapo::kOpcodeTransferData, - expected_credentials_str, - "", - expected_id, - 0, - expected_name, - asapo::kDefaultStream.c_str(), - expected_ingest_mode, - 0, - 0), false)).WillOnce(Return( - nullptr)); + EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::MessageHeader message_header{expected_id, 0, expected_name}; - auto err = producer.SendFile(message_header, expected_fullpath, expected_ingest_mode, nullptr); + auto err = producer.SendFile(message_header, "", expected_fullpath, expected_ingest_mode, nullptr); + + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); - ASSERT_THAT(err, Eq(nullptr)); } TEST_F(ProducerImplTests, OKSendingSendFileRequestWithStream) { @@ -421,7 +384,7 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); for (auto ingest_mode : ingest_modes) { - auto err = producer.SendFile(message_header, expected_fullpath, ingest_mode, nullptr); + auto err = producer.SendFile(message_header, expected_stream, expected_fullpath, ingest_mode, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -481,10 +444,17 @@ TEST_F(ProducerImplTests, GetStreamInfoMakesCorerctRequest) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); } +TEST_F(ProducerImplTests, GetStreamInfoErrorOnEmptyStream) { + producer.SetCredentials(expected_credentials); + asapo::Error err; + producer.GetStreamInfo("", 1000, &err); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); +} + TEST(GetStreamInfoTest, GetStreamInfoTimeout) { asapo::ProducerImpl producer1{"", 1, 10000, asapo::RequestHandlerType::kTcp}; asapo::Error err; - auto sinfo = producer1.GetStreamInfo(5000, &err); + auto sinfo = producer1.GetStreamInfo("stream", 5000, &err); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout)); ASSERT_THAT(err->Explain(), HasSubstr("opcode: 4")); diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp index 3450ded0b..4b188649f 100644 --- a/producer/event_monitor_producer/src/main_eventmon.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -137,7 +137,7 @@ int main (int argc, char* argv[]) { } message_header.message_id = ++i; HandleDatasets(&message_header); - producer->SendFile(message_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator + + producer->SendFile(message_header,"default", GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator + message_header.file_name, asapo::kDefaultIngestMode, ProcessAfterSend); } diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 0cd7c3e4d..15e664b04 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -31,7 +31,7 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str asapo::MessageMeta fi; asapo::Error err; - err = consumer->GetNext(&fi, group_id, nullptr); + err = consumer->GetNext(&fi, group_id, "default", nullptr); if (err) { std::cout << err->Explain() << std::endl; } @@ -45,76 +45,76 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertEq("hello1", std::string(data.get(), data.get() + fi.size)); - err = consumer->GetLast(&fi, nullptr); + err = consumer->GetLast(&fi,"default", nullptr); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(fi.name == "10", "GetLast filename"); M_AssertTrue(fi.metadata == "{\"test\":10}", "GetLast metadata"); - err = consumer->GetNext(&fi, group_id, nullptr); + err = consumer->GetNext(&fi, group_id,"default", nullptr); M_AssertTrue(err == nullptr, "GetNext2 no error"); M_AssertTrue(fi.name == "2", "GetNext2 filename"); - err = consumer->SetLastReadMarker(group_id, 2); + err = consumer->SetLastReadMarker(group_id, 2,"default"); M_AssertTrue(err == nullptr, "SetLastReadMarker no error"); - err = consumer->GetById(8, &fi, nullptr); + err = consumer->GetById(8, &fi,"default", nullptr); M_AssertTrue(err == nullptr, "GetById error"); M_AssertTrue(fi.name == "8", "GetById filename"); - err = consumer->GetNext(&fi, group_id, nullptr); + err = consumer->GetNext(&fi, group_id,"default", nullptr); M_AssertTrue(err == nullptr, "GetNext After GetById no error"); M_AssertTrue(fi.name == "3", "GetNext After GetById filename"); - err = consumer->GetLast(&fi, nullptr); + err = consumer->GetLast(&fi,"default", nullptr); M_AssertTrue(err == nullptr, "GetLast2 no error"); - err = consumer->SetLastReadMarker(group_id, 8); + err = consumer->SetLastReadMarker(group_id, 8,"default"); M_AssertTrue(err == nullptr, "SetLastReadMarker 2 no error"); - err = consumer->GetNext(&fi, group_id, nullptr); + err = consumer->GetNext(&fi, group_id,"default", nullptr); M_AssertTrue(err == nullptr, "GetNext3 no error"); M_AssertTrue(fi.name == "9", "GetNext3 filename"); - auto size = consumer->GetCurrentSize(&err); + auto size = consumer->GetCurrentSize("default", &err); M_AssertTrue(err == nullptr, "GetCurrentSize no error"); M_AssertTrue(size == 10, "GetCurrentSize size"); - err = consumer->ResetLastReadMarker(group_id); + err = consumer->ResetLastReadMarker(group_id,"default"); M_AssertTrue(err == nullptr, "SetLastReadMarker"); - err = consumer->GetNext(&fi, group_id, nullptr); + err = consumer->GetNext(&fi, group_id,"default", nullptr); M_AssertTrue(err == nullptr, "GetNext4 no error"); M_AssertTrue(fi.name == "1", "GetNext4 filename"); auto group_id2 = consumer->GenerateNewGroupId(&err); - err = consumer->GetNext(&fi, group_id2, nullptr); + err = consumer->GetNext(&fi, group_id2,"default", nullptr); M_AssertTrue(err == nullptr, "GetNext5 no error"); M_AssertTrue(fi.name == "1", "GetNext5 filename"); - auto messages = consumer->QueryMessages("meta.test = 10", &err); + auto messages = consumer->QueryMessages("meta.test = 10","default", &err); M_AssertTrue(err == nullptr, "query1"); M_AssertTrue(messages.size() == 10, "size of query answer 1"); - messages = consumer->QueryMessages("meta.test = 10 AND name='1'", &err); + messages = consumer->QueryMessages("meta.test = 10 AND name='1'","default", &err); M_AssertTrue(err == nullptr, "query2"); M_AssertTrue(messages.size() == 1, "size of query answer 2"); M_AssertTrue(fi.name == "1", "GetNext5 filename"); - messages = consumer->QueryMessages("meta.test = 11", &err); + messages = consumer->QueryMessages("meta.test = 11","default", &err); M_AssertTrue(err == nullptr, "query3"); M_AssertTrue(messages.size() == 0, "size of query answer 3"); - messages = consumer->QueryMessages("meta.test = 18", &err); + messages = consumer->QueryMessages("meta.test = 18","default", &err); M_AssertTrue(err == nullptr, "query4"); M_AssertTrue(messages.size() == 0, "size of query answer 4"); - messages = consumer->QueryMessages("bla", &err); + messages = consumer->QueryMessages("bla","default", &err); M_AssertTrue(err != nullptr, "query5"); M_AssertTrue(messages.size() == 0, "size of query answer 5"); @@ -148,21 +148,21 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(asapo::NanosecsEpochFromTimePoint(streams[2].timestamp_created) == 2000, "streams2.timestamp"); // acknowledges - auto id = consumer->GetLastAcknowledgedMessage(group_id, &err); + auto id = consumer->GetLastAcknowledgedMessage(group_id,"default", &err); M_AssertTrue(err == asapo::ConsumerErrorTemplates::kNoData, "last ack default stream no data"); M_AssertTrue(id == 0, "last ack default stream no data id = 0"); - auto nacks = consumer->GetUnacknowledgedMessages(group_id, 0, 0, &err); + auto nacks = consumer->GetUnacknowledgedMessages(group_id,"default", 0, 0, &err); M_AssertTrue(err == nullptr, "nacks default stream all"); M_AssertTrue(nacks.size() == 10, "nacks default stream size = 10"); - err = consumer->Acknowledge(group_id, 1); + err = consumer->Acknowledge(group_id, 1,"default"); M_AssertTrue(err == nullptr, "ack default stream no error"); - nacks = consumer->GetUnacknowledgedMessages(group_id, 0, 0, &err); + nacks = consumer->GetUnacknowledgedMessages(group_id,"default", 0, 0, &err); M_AssertTrue(nacks.size() == 9, "nacks default stream size = 9 after ack"); - id = consumer->GetLastAcknowledgedMessage(group_id, &err); + id = consumer->GetLastAcknowledgedMessage(group_id,"default", &err); M_AssertTrue(err == nullptr, "last ack default stream no error"); M_AssertTrue(id == 1, "last ack default stream id = 1"); @@ -173,29 +173,29 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(nacks.size() == 4, "nacks stream1 size = 4 after ack"); // negative acks - consumer->ResetLastReadMarker(group_id); - err = consumer->GetNext(&fi, group_id, nullptr); + consumer->ResetLastReadMarker(group_id,"default"); + err = consumer->GetNext(&fi, group_id,"default", nullptr); M_AssertTrue(err == nullptr, "GetNextNegAckBeforeResend no error"); M_AssertTrue(fi.name == "1", "GetNextNegAckBeforeResend filename"); - err = consumer->NegativeAcknowledge(group_id, 1, 0); + err = consumer->NegativeAcknowledge(group_id, 1, 0,"default"); M_AssertTrue(err == nullptr, "NegativeAcknowledge no error"); - err = consumer->GetNext(&fi, group_id, nullptr); + err = consumer->GetNext(&fi, group_id,"default", nullptr); M_AssertTrue(err == nullptr, "GetNextNegAckWithResend no error"); M_AssertTrue(fi.name == "1", "GetNextNegAckWithResend filename"); // automatic resend - consumer->ResetLastReadMarker(group_id); + consumer->ResetLastReadMarker(group_id,"default"); consumer->SetResendNacs(true, 0, 1); - err = consumer->GetNext(&fi, group_id, nullptr); + err = consumer->GetNext(&fi, group_id,"default", nullptr); M_AssertTrue(err == nullptr, "GetNextBeforeResend no error"); M_AssertTrue(fi.name == "1", "GetNextBeforeResend filename"); - err = consumer->GetNext(&fi, group_id, nullptr); + err = consumer->GetNext(&fi, group_id,"default", nullptr); M_AssertTrue(err == nullptr, "GetNextWithResend no error"); M_AssertTrue(fi.name == "1", "GetNextWithResend filename"); consumer->SetResendNacs(false, 0, 1); - err = consumer->GetNext(&fi, group_id, nullptr); + err = consumer->GetNext(&fi, group_id,"default", nullptr); M_AssertTrue(err == nullptr, "GetNextAfterResend no error"); M_AssertTrue(fi.name == "2", "GetNextAfterResend filename"); @@ -206,7 +206,7 @@ void TestDataset(const std::unique_ptr<asapo::Consumer>& consumer, const std::st asapo::MessageMeta fi; asapo::Error err; - auto dataset = consumer->GetNextDataset(group_id, 0, &err); + auto dataset = consumer->GetNextDataset(group_id,"default", 0, &err); if (err) { std::cout << err->Explain() << std::endl; } @@ -222,19 +222,19 @@ void TestDataset(const std::unique_ptr<asapo::Consumer>& consumer, const std::st M_AssertEq("hello1", std::string(data.get(), data.get() + dataset.content[0].size)); - dataset = consumer->GetLastDataset(0, &err); + dataset = consumer->GetLastDataset("default", 0, &err); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename"); M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetLastDataset metadata"); - dataset = consumer->GetNextDataset(group_id, 0, &err); + dataset = consumer->GetNextDataset(group_id, "default", 0, &err); M_AssertTrue(err == nullptr, "GetNextDataset2 no error"); M_AssertTrue(dataset.content[0].name == "2_1", "GetNextDataSet2 filename"); - dataset = consumer->GetLastDataset(0, &err); + dataset = consumer->GetLastDataset("default", 0, &err); M_AssertTrue(err == nullptr, "GetLastDataset2 no error"); - dataset = consumer->GetDatasetById(8, 0, &err); + dataset = consumer->GetDatasetById(8,"default", 0, &err); M_AssertTrue(err == nullptr, "GetDatasetById error"); M_AssertTrue(dataset.content[2].name == "8_3", "GetDatasetById filename"); diff --git a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp index bc89b9660..96a753089 100644 --- a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp +++ b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp @@ -69,7 +69,7 @@ void TestAll(const Args& args) { std::vector<asapo::MessageMetas>message_metas(args.nthreads); auto exec_next = [&](int i) { asapo::MessageMeta fi; - while ((err = consumer->GetNext(&fi, group_id, nullptr)) == nullptr) { + while ((err = consumer->GetNext(&fi, group_id,"default", nullptr)) == nullptr) { message_metas[i].emplace_back(fi); } printf("%s\n", err->Explain().c_str()); diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index a3ee8ba05..db1f1ae9c 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -83,7 +83,7 @@ std::vector<std::thread> StartThreads(const Args& params, while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() < params.timeout_ms) { if (params.datasets) { - auto dataset = consumer->GetLastDataset(0, &err); + auto dataset = consumer->GetLastDataset("default", 0, &err); if (err == nullptr) { for (auto& fi : dataset.content) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; @@ -91,7 +91,7 @@ std::vector<std::thread> StartThreads(const Args& params, } } } else { - err = consumer->GetLast(&fi, params.read_data ? &data : nullptr); + err = consumer->GetLast(&fi,"default", params.read_data ? &data : nullptr); if (err == nullptr) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { -- GitLab