diff --git a/CHANGELOG.md b/CHANGELOG.md index 16b8d7c4b50e4d1e58795854c69dd7b7f00b3907..1a5ec00419269b0849b8be9e0285775a73271113 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,11 @@ -## 20.12.0 (in progress) +## 20.12.0 FEATURES * implemented possibility to send data without writing to database (no need of consecutive indexes, etc. but will not be able to consume such data) * allow to return incomplete datasets (wihout error if one sets minimum dataset size, otherwise with "partial data" error) IMPROVEMENTS -* Consumer API - change behavior of GetLast/get_last - do not set current pointer after call to the last image +* Consumer API - change behavior of GetLast/get_last - do not change current pointer after call * Consumer API - add interrupt_current_operation to allow interrupting (from a separate thread) long consumer operation * Producer API - return original data in callback payload. * Producer API - allow to set queue limits (number of pending requests and/or max memory), reject new requests if reached the limits @@ -14,14 +14,15 @@ FEATURES BREAKING CHANGES * Consumer API - get_next_dataset, get_last_dataset, get_dataset_by_id return dictionary with 'id','expected_size','content' fields, not tuple (id,content) as before * Consumer API - remove group_id argument from get_last/get_by_id/get_last_dataset/get_dataset_by_id functions -* Producer API - changed meaning of subsets (subset_id replaced with id_in_subset and this means now id of the image within a subset (e.g. module number for multi-module detector)), message_id is now a global id of a multi-set data (i.g. multi-image id) +* Producer API - changed meaning of subsets (subset_id replaced with dataset_substream and this means now id of the image within a subset (e.g. module number for multi-module detector)), message_id is now a global id of a multi-set data (i.g. multi-image id) #### renaming - general * stream -> data_source, substream -> stream * use millisecond everywhere for timeout/delay * use term `message` for blob of information we send around, rename related structs, parameters, ... -* C++ - get rid of duplicate functions with default stream +* C++ - get rid of duplicate functions with default stream #### renaming - Producer API * SendData/send_data -> Send/send +* SendXX/send_xx -> swap parameters (stream to the end) * id_in_subset -> dataset_substream * subset_size -> dataset_size (and in general replace subset with dataset) #### renaming - Consumer API @@ -29,7 +30,7 @@ BREAKING CHANGES * SetLastReadMarker/set_lastread_marker -> swap arguments * GetUnacknowledgedTupleIds/get_unacknowledged_tuple_ids -> GetUnacknowledgedMessages/get_unacknowledged_messages * GetLastAcknowledgedTulpeId/get_last_acknowledged_tuple_id -> GetLastAcknowledgedMessage/get_last_acknowledged_message - +* GetUnacknowledgedMessages, -> swap parameters (stream to the end) BUG FIXES * fix memory leak bug in Python consumer library (lead to problems when creating many consumer instances) diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index f7523180bf7de6547a4974296f3bf73347074cfe..678c7179b73a53cdace52021d4e2488a951e52a4 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -53,7 +53,10 @@ class Consumer { \param stream - stream to use \return nullptr if operation succeed, error otherwise. */ - virtual IdList GetUnacknowledgedMessages(std::string group_id, std::string stream, uint64_t from_id, uint64_t to_id, + virtual IdList GetUnacknowledgedMessages(std::string group_id, + uint64_t from_id, + uint64_t to_id, + std::string stream, Error* error) = 0; //! Set timeout for consumer operations. Default - no timeout @@ -99,11 +102,11 @@ class Consumer { /*! \param info - where to store message metadata. Can be set to nullptr only message data is needed. \param group_id - group id to use - \param stream - stream to use \param data - where to store message data. Can be set to nullptr only message metadata is needed. + \param stream - stream to use \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetNext(MessageMeta* info, std::string group_id, std::string stream, MessageData* data) = 0; + virtual Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) = 0; //! Retrieves message using message meta. /*! @@ -118,40 +121,40 @@ class Consumer { /*! \param err - will be set to error data cannot be read, nullptr otherwise. \param group_id - group id to use. - \param stream - stream to use \param min_size - wait until dataset has min_size messages (0 for maximum size) + \param stream - stream to use \return DataSet - information about the dataset */ - virtual DataSet GetNextDataset(std::string group_id, std::string stream, uint64_t min_size, Error* err) = 0; + virtual DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) = 0; //! Receive last available dataset which has min_size messages. /*! \param err - will be set to error data cannot be read, nullptr otherwise. - \param stream - stream to use \param min_size - amount of messages in dataset (0 for maximum size) + \param stream - stream to use \return DataSet - information about the dataset */ - virtual DataSet GetLastDataset(std::string stream, uint64_t min_size, Error* err) = 0; + virtual DataSet GetLastDataset(uint64_t min_size, std::string stream, Error* err) = 0; //! Receive dataset by id. /*! \param id - dataset id \param err - will be set to error data cannot be read or dataset size less than min_size, nullptr otherwise. - \param stream - stream to use \param min_size - wait until dataset has min_size messages (0 for maximum size) + \param stream - stream to use \return DataSet - information about the dataset */ - virtual DataSet GetDatasetById(uint64_t id, std::string stream, uint64_t min_size, Error* err) = 0; + virtual DataSet GetDatasetById(uint64_t id, uint64_t min_size, std::string stream, Error* err) = 0; //! Receive single message by id. /*! \param id - message id \param info - where to store message metadata. Can be set to nullptr only message data is needed. - \param stream - stream to use \param data - where to store message data. Can be set to nullptr only message metadata is needed. + \param stream - stream to use \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetById(uint64_t id, MessageMeta* info, std::string stream, MessageData* data) = 0; + virtual Error GetById(uint64_t id, MessageMeta* info, MessageData* data, std::string stream) = 0; //! Receive id of last acknowledged message /*! @@ -165,11 +168,11 @@ class Consumer { //! Receive last available message. /*! \param info - where to store message metadata. Can be set to nullptr only message data is needed. - \param stream - stream to use \param data - where to store message data. Can be set to nullptr only message metadata is needed. + \param stream - stream to use \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetLast(MessageMeta* info, std::string stream, MessageData* data) = 0; + virtual Error GetLast(MessageMeta* info, MessageData* data, std::string stream) = 0; //! Get all messages matching the query. /*! diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 45c45076b294c787f782bcd26a4bd240fe4b82ee..7a94ecf6e028e1f0444eb5d18e298cd3421ad62b 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -299,7 +299,7 @@ Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group return nullptr; } -Error ConsumerImpl::GetNext(MessageMeta* info, std::string group_id, std::string stream, MessageData* data) { +Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) { return GetMessageFromServer(GetMessageServerOperation::GetNext, 0, std::move(group_id), @@ -308,7 +308,7 @@ Error ConsumerImpl::GetNext(MessageMeta* info, std::string group_id, std::string data); } -Error ConsumerImpl::GetLast(MessageMeta* info, std::string stream, MessageData* data) { +Error ConsumerImpl::GetLast(MessageMeta* info, MessageData* data, std::string stream) { return GetMessageFromServer(GetMessageServerOperation::GetLast, 0, "0", @@ -545,7 +545,7 @@ uint64_t ConsumerImpl::GetCurrentSize(std::string stream, Error* err) { return size; } -Error ConsumerImpl::GetById(uint64_t id, MessageMeta* info, std::string stream, MessageData* data) { +Error ConsumerImpl::GetById(uint64_t id, MessageMeta* info, MessageData* data, std::string stream) { if (id == 0) { return ConsumerErrorTemplates::kWrongInput.Generate("id should be positive"); } @@ -612,11 +612,11 @@ MessageMetas ConsumerImpl::QueryMessages(std::string query, std::string stream, return dataset.content; } -DataSet ConsumerImpl::GetNextDataset(std::string group_id, std::string stream, uint64_t min_size, Error* err) { +DataSet ConsumerImpl::GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) { return GetDatasetFromServer(GetMessageServerOperation::GetNext, 0, std::move(group_id), std::move(stream),min_size, err); } -DataSet ConsumerImpl::GetLastDataset(std::string stream, uint64_t min_size, Error* err) { +DataSet ConsumerImpl::GetLastDataset(uint64_t min_size, std::string stream, Error* err) { return GetDatasetFromServer(GetMessageServerOperation::GetLast, 0, "0", std::move(stream),min_size, err); } @@ -638,7 +638,7 @@ DataSet ConsumerImpl::GetDatasetFromServer(GetMessageServerOperation op, return DecodeDatasetFromResponse(response, err); } -DataSet ConsumerImpl::GetDatasetById(uint64_t id, std::string stream, uint64_t min_size, Error* err) { +DataSet ConsumerImpl::GetDatasetById(uint64_t id, uint64_t min_size, std::string stream, Error* err) { if (id == 0) { *err = ConsumerErrorTemplates::kWrongInput.Generate("id should be positive"); return {}; @@ -755,9 +755,9 @@ Error ConsumerImpl::Acknowledge(std::string group_id, uint64_t id, std::string s } IdList ConsumerImpl::GetUnacknowledgedMessages(std::string group_id, - std::string stream, uint64_t from_id, uint64_t to_id, + std::string stream, Error* error) { if (stream.empty()) { *error = ConsumerErrorTemplates::kWrongInput.Generate("empty stream"); diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 166039868019d39859902725ec8e5e19d4a2bdca..0697b5f96067597f58fa6bb43ff6461c8b01d109 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -61,9 +61,9 @@ class ConsumerImpl final : public asapo::Consumer { std::string stream) override; IdList GetUnacknowledgedMessages(std::string group_id, - std::string stream, uint64_t from_id, uint64_t to_id, + std::string stream, Error* error) override; uint64_t GetLastAcknowledgedMessage(std::string group_id, std::string stream, Error* error) override; @@ -72,16 +72,16 @@ class ConsumerImpl final : public asapo::Consumer { Error SetLastReadMarker(std::string group_id, uint64_t value, std::string stream) override; - Error GetNext(MessageMeta* info, std::string group_id, std::string stream, MessageData* data) override; + Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override; - Error GetLast(MessageMeta* info, std::string stream, MessageData* data) override; + Error GetLast(MessageMeta* info, MessageData* data, std::string stream) override; std::string GenerateNewGroupId(Error* err) override; std::string GetBeamtimeMeta(Error* err) override; uint64_t GetCurrentSize(std::string stream, Error* err) override; - Error GetById(uint64_t id, MessageMeta* info, std::string stream, MessageData* data) override; + Error GetById(uint64_t id, MessageMeta* info, MessageData* data, std::string stream) override; void SetTimeout(uint64_t timeout_ms) override; @@ -91,11 +91,11 @@ class ConsumerImpl final : public asapo::Consumer { MessageMetas QueryMessages(std::string query, std::string stream, Error* err) override; - DataSet GetNextDataset(std::string group_id, std::string stream, uint64_t min_size, Error* err) override; + DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) override; - DataSet GetLastDataset(std::string stream, uint64_t min_size, Error* err) override; + DataSet GetLastDataset(uint64_t min_size, std::string stream, Error* err) override; - DataSet GetDatasetById(uint64_t id, std::string stream, uint64_t min_size, Error* err) override; + DataSet GetDatasetById(uint64_t id, uint64_t min_size, std::string stream, Error* err) override; Error RetrieveData(MessageMeta* info, MessageData* data) override; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index fad9678b9b490791b586a0392eaabb810d09709d..26020e46e30a5551f3bf1eb818fbadc8fd8eaaf0 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -178,7 +178,7 @@ class ConsumerImplTests : public Test { }; TEST_F(ConsumerImplTests, GetMessageReturnsErrorOnWrongInput) { - auto err = consumer->GetNext(nullptr, "", expected_stream, nullptr); + auto err = consumer->GetNext("", nullptr, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } @@ -209,7 +209,7 @@ TEST_F(ConsumerImplTests, DefaultStreamIsDetector) { SetArgPointee<2>(nullptr), Return(""))); - consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); } TEST_F(ConsumerImplTests, GetNextUsesCorrectUriWithStream) { @@ -222,7 +222,7 @@ TEST_F(ConsumerImplTests, GetNextUsesCorrectUriWithStream) { SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); } TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) { @@ -235,7 +235,7 @@ TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) { SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - consumer->GetLast(&info, expected_stream, nullptr); + consumer->GetLast(&info, nullptr, expected_stream); } TEST_F(ConsumerImplTests, GetMessageReturnsEndOfStreamFromHttpClient) { @@ -246,7 +246,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsEndOfStreamFromHttpClient) { SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}"))); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData()); @@ -265,7 +265,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsStreamFinishedFromHttpClient) { Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"" + expected_next_stream + "\"}"))); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData()); @@ -283,7 +283,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNoDataFromHttpClient) { SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_stream\":\"""\"}"))); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData()); ASSERT_THAT(err_data->id, Eq(1)); @@ -301,7 +301,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNotAuthorized) { SetArgPointee<2>(nullptr), Return(""))); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } @@ -315,7 +315,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsWrongResponseFromHttpClient) { SetArgPointee<2>(nullptr), Return("id"))); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); ASSERT_THAT(err->Explain(), HasSubstr("malformed")); @@ -329,7 +329,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerAddressNotFound) { Return(""))); consumer->SetTimeout(100); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err->Explain(), AllOf(HasSubstr(expected_server_uri), HasSubstr("unavailable"))); } @@ -342,7 +342,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerUriEmpty) { Return(""))); consumer->SetTimeout(100); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err->Explain(), AllOf(HasSubstr(expected_server_uri), HasSubstr("unavailable"))); } @@ -352,13 +352,13 @@ TEST_F(ConsumerImplTests, GetDoNotCallBrokerUriIfAlreadyFound) { MockGet("error_response"); consumer->SetTimeout(100); - consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); Mock::VerifyAndClearExpectations(&mock_http_client); EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/asap-broker"), _, _)).Times(0); MockGet("error_response"); - consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); } TEST_F(ConsumerImplTests, GetBrokerUriAgainAfterConnectionError) { @@ -366,12 +366,12 @@ TEST_F(ConsumerImplTests, GetBrokerUriAgainAfterConnectionError) { MockGetError(); consumer->SetTimeout(0); - consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); Mock::VerifyAndClearExpectations(&mock_http_client); MockGetBrokerUri(); MockGet("error_response"); - consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); } TEST_F(ConsumerImplTests, GetMessageReturnsEofStreamFromHttpClientUntilTimeout) { @@ -383,7 +383,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsEofStreamFromHttpClientUntilTimeout) Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"""\"}"))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } @@ -406,7 +406,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsNoDataAfterTimeoutEvenIfOtherErrorOcc Return(""))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); } @@ -420,7 +420,7 @@ TEST_F(ConsumerImplTests, GetNextMessageReturnsImmediatelyOnTransferError) { Return(""))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); ASSERT_THAT(err->Explain(), HasSubstr("sss")); @@ -443,7 +443,7 @@ TEST_F(ConsumerImplTests, GetNextRetriesIfConnectionHttpClientErrorUntilTimeout) Return(""))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kUnavailableService)); } @@ -457,7 +457,7 @@ TEST_F(ConsumerImplTests, GetNextMessageReturnsImmediatelyOnFinshedStream) { Return("{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_stream\":\"next\"}"))); consumer->SetTimeout(300); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kStreamFinished)); } @@ -470,7 +470,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsMessageMeta) { MockGet(json); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(nullptr)); @@ -484,7 +484,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsParseError) { MockGetBrokerUri(); MockGet("error_response"); - auto err = consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + auto err = consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); } @@ -496,7 +496,7 @@ TEST_F(ConsumerImplTests, GetMessageReturnsIfNoDataNeeded) { EXPECT_CALL(mock_netclient, GetData_t(_, _)).Times(0); EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0); - consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); } TEST_F(ConsumerImplTests, GetMessageTriesToGetDataFromMemoryCache) { @@ -509,7 +509,7 @@ TEST_F(ConsumerImplTests, GetMessageTriesToGetDataFromMemoryCache) { EXPECT_CALL(mock_netclient, GetData_t(&info, &data)).WillOnce(Return(nullptr)); MockReadDataFromFile(0); - consumer->GetNext(&info, expected_group_id, expected_stream, &data); + consumer->GetNext(expected_group_id, &info, &data, expected_stream); ASSERT_THAT(info.buf_id, Eq(expected_buf_id)); @@ -527,7 +527,7 @@ TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfCannotReadFromCache) { &data)).WillOnce(Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release())); MockReadDataFromFile(); - consumer->GetNext(&info, expected_group_id, expected_stream, &data); + consumer->GetNext(expected_group_id, &info, &data, expected_stream); ASSERT_THAT(info.buf_id, Eq(0)); } @@ -543,7 +543,7 @@ TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfZeroBufId) { MockReadDataFromFile(); - consumer->GetNext(&info, expected_group_id, expected_stream, &data); + consumer->GetNext(expected_group_id, &info, &data, expected_stream); } TEST_F(ConsumerImplTests, GenerateNewGroupIdReturnsErrorCreateGroup) { @@ -670,7 +670,7 @@ TEST_F(ConsumerImplTests, GetByIdUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(json))); - auto err = consumer->GetById(expected_dataset_id, &info, expected_stream, nullptr); + auto err = consumer->GetById(expected_dataset_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(info.name, Eq(to_send.name)); @@ -687,7 +687,7 @@ TEST_F(ConsumerImplTests, GetByIdTimeouts) { SetArgPointee<2>(nullptr), Return(""))); - auto err = consumer->GetById(expected_dataset_id, &info, expected_stream, nullptr); + auto err = consumer->GetById(expected_dataset_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); } @@ -703,7 +703,7 @@ TEST_F(ConsumerImplTests, GetByIdReturnsEndOfStream) { SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"""\"}"))); - auto err = consumer->GetById(expected_dataset_id, &info, expected_stream, nullptr); + auto err = consumer->GetById(expected_dataset_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } @@ -719,7 +719,7 @@ TEST_F(ConsumerImplTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) { SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":100,\"id_max\":1,\"next_stream\":\"""\"}"))); - auto err = consumer->GetById(expected_dataset_id, &info, expected_stream, nullptr); + auto err = consumer->GetById(expected_dataset_id, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } @@ -862,12 +862,12 @@ TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(""))); asapo::Error err; - consumer->GetNextDataset(expected_group_id, expected_stream, 0, &err); + consumer->GetNextDataset(expected_group_id, 0, expected_stream, &err); } TEST_F(ConsumerImplTests, GetNextErrorOnEmptyStream) { MessageData data; - auto err = consumer->GetNext(&info, expected_group_id, "", &data); + auto err = consumer->GetNext(expected_group_id, &info, &data, ""); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } @@ -893,7 +893,7 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsMessageMetas) { MockGet(json); - auto dataset = consumer->GetNextDataset(expected_group_id, expected_stream, 0, &err); + auto dataset = consumer->GetNextDataset(expected_group_id, 0, expected_stream, &err); ASSERT_THAT(err, Eq(nullptr)); @@ -925,7 +925,7 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsPartialMessageMetas) { MockGet(json, asapo::HttpCode::PartialContent); - auto dataset = consumer->GetNextDataset(expected_group_id, expected_stream, 0, &err); + auto dataset = consumer->GetNextDataset(expected_group_id, 0, expected_stream, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kPartialData)); @@ -960,7 +960,7 @@ TEST_F(ConsumerImplTests, GetDataSetByIdReturnsPartialMessageMetas) { MockGet(json, asapo::HttpCode::PartialContent); - auto dataset = consumer->GetDatasetById(1, expected_stream, 0, &err); + auto dataset = consumer->GetDatasetById(1, 0, expected_stream, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kPartialData)); auto err_data = static_cast<const asapo::PartialErrorData*>(err->GetCustomData()); @@ -978,7 +978,7 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsParseError) { MockGet("error_response"); asapo::Error err; - auto dataset = consumer->GetNextDataset(expected_group_id, expected_stream, 0, &err); + auto dataset = consumer->GetNextDataset(expected_group_id, 0, expected_stream, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); ASSERT_THAT(dataset.content.size(), Eq(0)); @@ -997,7 +997,7 @@ TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(""))); asapo::Error err; - consumer->GetLastDataset(expected_stream, 1, &err); + consumer->GetLastDataset(1, expected_stream, &err); } TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) { @@ -1011,7 +1011,7 @@ TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(""))); asapo::Error err; - consumer->GetDatasetById(expected_dataset_id, expected_stream, 0, &err); + consumer->GetDatasetById(expected_dataset_id, 0, expected_stream, &err); } TEST_F(ConsumerImplTests, GetStreamListUsesCorrectUri) { @@ -1117,7 +1117,7 @@ void ConsumerImplTests::AssertSingleFileTransfer() { MockGetFTSUri(); ExpectFileTransfer(nullptr); - fts_consumer->GetNext(&info, expected_group_id, expected_stream, &data); + fts_consumer->GetNext(expected_group_id, &info, &data, expected_stream); ASSERT_THAT(data[0], Eq(expected_value)); Mock::VerifyAndClearExpectations(&mock_http_client); @@ -1163,7 +1163,7 @@ TEST_F(ConsumerImplTests, GetMessageReusesTokenAndUri) { MockBeforeFTS(&data); ExpectFileTransfer(nullptr); - auto err = fts_consumer->GetNext(&info, expected_group_id, expected_stream, &data); + auto err = fts_consumer->GetNext(expected_group_id, &info, &data, expected_stream); } TEST_F(ConsumerImplTests, GetMessageTriesToGetTokenAgainIfTransferFailed) { @@ -1174,7 +1174,7 @@ TEST_F(ConsumerImplTests, GetMessageTriesToGetTokenAgainIfTransferFailed) { ExpectRepeatedFileTransfer(); ExpectFolderToken(); - auto err = fts_consumer->GetNext(&info, expected_group_id, expected_stream, &data); + auto err = fts_consumer->GetNext(expected_group_id, &info, &data, expected_stream); } TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) { @@ -1207,7 +1207,7 @@ void ConsumerImplTests::ExpectIdList(bool error) { TEST_F(ConsumerImplTests, GetUnAcknowledgedListReturnsIds) { ExpectIdList(false); asapo::Error err; - auto list = consumer->GetUnacknowledgedMessages(expected_group_id, expected_stream, 1, 0, &err); + auto list = consumer->GetUnacknowledgedMessages(expected_group_id, 1, 0, expected_stream, &err); ASSERT_THAT(list, ElementsAre(1, 2, 3)); ASSERT_THAT(err, Eq(nullptr)); @@ -1244,7 +1244,7 @@ TEST_F(ConsumerImplTests, GetLastAcknowledgeReturnsNoData) { TEST_F(ConsumerImplTests, GetByIdErrorsForId0) { - auto err = consumer->GetById(0, &info, expected_stream, nullptr); + auto err = consumer->GetById(0, &info, nullptr, expected_stream); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } @@ -1261,7 +1261,7 @@ TEST_F(ConsumerImplTests, ResendNacks) { Return(""))); consumer->SetResendNacs(true, 10000, 3); - consumer->GetNext(&info, expected_group_id, expected_stream, nullptr); + consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); } TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) { @@ -1292,7 +1292,7 @@ TEST_F(ConsumerImplTests, CanInterruptOperation) { asapo::Error err; auto exec = [this,&err]() { consumer->SetTimeout(10000); - err = consumer->GetNext(&info, "", expected_stream, nullptr); + err = consumer->GetNext("", &info, nullptr, expected_stream); }; auto thread = std::thread(exec); std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 0aac9634a3607fb73566ce23d70cc436935330d1..e309bd22dfdf54ff4270dc90896971e2a80fd1cb 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -62,22 +62,22 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: void SetTimeout(uint64_t timeout_ms) void ForceNoRdma() NetworkConnectionType CurrentConnectionType() - Error GetNext(MessageMeta* info, string group_id,string stream, MessageData* data) - Error GetLast(MessageMeta* info, string stream, MessageData* data) - Error GetById(uint64_t id, MessageMeta* info, string stream, MessageData* data) + Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream) + Error GetLast(MessageMeta* info, MessageData* data, string stream) + Error GetById(uint64_t id, MessageMeta* info, MessageData* data, string stream) uint64_t GetCurrentSize(string stream, Error* err) Error SetLastReadMarker(string group_id, uint64_t value, string stream) Error ResetLastReadMarker(string group_id, string stream) Error Acknowledge(string group_id, uint64_t id, string stream) Error NegativeAcknowledge(string group_id, uint64_t id, uint64_t delay_ms, string stream) uint64_t GetLastAcknowledgedMessage(string group_id, string stream, Error* error) - IdList GetUnacknowledgedMessages(string group_id, string stream, uint64_t from_id, uint64_t to_id, Error* error) + IdList GetUnacknowledgedMessages(string group_id, uint64_t from_id, uint64_t to_id, string stream, Error* error) string GenerateNewGroupId(Error* err) string GetBeamtimeMeta(Error* err) MessageMetas QueryMessages(string query, string stream, Error* err) - DataSet GetNextDataset(string group_id, string stream, uint64_t min_size, Error* err) - DataSet GetLastDataset(string stream, uint64_t min_size, Error* err) - DataSet GetDatasetById(uint64_t id, string stream, uint64_t min_size, Error* err) + DataSet GetNextDataset(string group_id, uint64_t min_size, string stream, Error* err) + DataSet GetLastDataset(uint64_t min_size, string stream, Error* err) + DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err) Error RetrieveData(MessageMeta* info, MessageData* data) vector[StreamInfo] GetStreamList(string from_stream, Error* err) void SetResendNacs(bool resend, uint64_t delay_ms, uint64_t resend_attempts) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index d3e0ea51de15ad46ced86b7737fb96753940418c..399b510ed75bf03734ad8435b9c93f11b33770ab 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -114,13 +114,13 @@ cdef class PyConsumer: cdef np.npy_intp dims[1] if op == "next": with nogil: - err = self.c_consumer.get().GetNext(&info, b_group_id,b_stream, p_data) + err = self.c_consumer.get().GetNext(b_group_id, &info, p_data,b_stream) elif op == "last": with nogil: - err = self.c_consumer.get().GetLast(&info, b_stream, p_data) + err = self.c_consumer.get().GetLast(&info, p_data, b_stream) elif op == "id": with nogil: - err = self.c_consumer.get().GetById(id, &info, b_stream, p_data) + err = self.c_consumer.get().GetById(id, &info, p_data, b_stream) if err: throw_exception(err) info_str = _str(info.Json()) @@ -132,11 +132,11 @@ cdef class PyConsumer: arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) PyArray_ENABLEFLAGS(arr,np.NPY_OWNDATA) return arr,meta - def get_next(self, group_id, stream = "default", meta_only = True): + def get_next(self, group_id, meta_only = True, stream = "default"): return self._op("next",group_id,stream,meta_only,0) - def get_last(self, stream = "default", meta_only = True): + def get_last(self, meta_only = True, stream = "default"): return self._op("last","",stream,meta_only,0) - def get_by_id(self,uint64_t id,stream = "default",meta_only = True): + def get_by_id(self,uint64_t id,meta_only = True, stream = "default"): return self._op("id","",stream,meta_only,id) def retrieve_data(self,meta): json_str = json.dumps(meta) @@ -252,13 +252,13 @@ cdef class PyConsumer: throw_exception(err) return id - def get_unacknowledged_messages(self, group_id, stream = "default", uint64_t from_id = 0, uint64_t to_id = 0): + def get_unacknowledged_messages(self, group_id, uint64_t from_id = 0, uint64_t to_id = 0, stream = "default"): cdef Error err cdef string b_group_id = _bytes(group_id) cdef string b_stream = _bytes(stream) cdef IdList ids with nogil: - ids = self.c_consumer.get().GetUnacknowledgedMessages(b_group_id, b_stream, from_id, to_id, &err) + ids = self.c_consumer.get().GetUnacknowledgedMessages(b_group_id, from_id, to_id, b_stream, &err) if err: throw_exception(err) list = [] @@ -287,13 +287,13 @@ cdef class PyConsumer: cdef Error err if op == "next": with nogil: - dataset = self.c_consumer.get().GetNextDataset(b_group_id,b_stream, min_size, &err) + dataset = self.c_consumer.get().GetNextDataset(b_group_id, min_size,b_stream, &err) elif op == "last": with nogil: - dataset = self.c_consumer.get().GetLastDataset(b_stream, min_size, &err) + dataset = self.c_consumer.get().GetLastDataset(min_size,b_stream, &err) elif op == "id": with nogil: - dataset = self.c_consumer.get().GetDatasetById(id, b_stream, min_size, &err) + dataset = self.c_consumer.get().GetDatasetById(id, min_size,b_stream, &err) json_list = [] for fi in dataset.content: json_list.append(json.loads(_str(fi.Json()))) @@ -301,11 +301,11 @@ cdef class PyConsumer: if err: throw_exception(err,res) return res - def get_next_dataset(self, group_id, stream = "default", min_size = 0): + def get_next_dataset(self, group_id, min_size = 0, stream = "default"): return self._op_dataset("next",group_id,stream,min_size,0) - def get_last_dataset(self, stream = "default", min_size = 0): + def get_last_dataset(self, min_size = 0, stream = "default"): return self._op_dataset("last","0",stream,min_size,0) - def get_dataset_by_id(self, uint64_t id, stream = "default", min_size = 0): + def get_dataset_by_id(self, uint64_t id, min_size = 0, stream = "default"): return self._op_dataset("id","0",stream,min_size,id) def get_beamtime_meta(self): cdef Error err diff --git a/examples/consumer/getnext/getnext.cpp b/examples/consumer/getnext/getnext.cpp index ca310858c5ae139ae0c5c2bc200b624026cc3836..5e90a3885be448262e12bbd099278deb62eb6f8b 100644 --- a/examples/consumer/getnext/getnext.cpp +++ b/examples/consumer/getnext/getnext.cpp @@ -137,7 +137,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err bool isFirstFile = true; while (true) { if (params.datasets) { - auto dataset = consumer->GetNextDataset(group_id,"default", 0, &err); + auto dataset = consumer->GetNextDataset(group_id, 0, "default", &err); if (err == nullptr) { for (auto& fi : dataset.content) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; @@ -145,7 +145,7 @@ StartThreads(const Args& params, std::vector<int>* nfiles, std::vector<int>* err } } } else { - err = consumer->GetNext(&fi, group_id,"default", params.read_data ? &data : nullptr); + err = consumer->GetNext(group_id, &fi, params.read_data ? &data : nullptr, "default"); if (isFirstFile) { isFirstFile = false; timer->count_down_and_wait(); diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 27c4500950ae9dca56c6bc5b4dacbd28121e8b85..801d9f32a0653776f4a2093a2a3d545bcad19dc3 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -104,11 +104,11 @@ void SendDownstreamThePipeline(const Args &args, const asapo::MessageMeta &fi, a Error err_send; if (args.transfer_data) { header.file_name += "_" + args.stream_out; - err_send = producer->Send(header, "default", std::move(data), asapo::kDefaultIngestMode, ProcessAfterSend); + err_send = producer->Send(header, std::move(data), asapo::kDefaultIngestMode, "default", ProcessAfterSend); } else { header.file_name = args.file_path + asapo::kPathSeparator + header.file_name; err_send = - producer->Send(header, "default", nullptr, asapo::IngestModeFlags::kTransferMetaDataOnly, ProcessAfterSend); + producer->Send(header, nullptr, asapo::IngestModeFlags::kTransferMetaDataOnly, "default", ProcessAfterSend); std::cout << err_send << std::endl; } @@ -128,7 +128,7 @@ Error ProcessNextEvent(const Args &args, const ConsumerPtr &consumer, const Prod asapo::MessageData data; asapo::MessageMeta fi; - auto err = consumer->GetNext(&fi, group_id, "default", args.transfer_data ? &data : nullptr); + auto err = consumer->GetNext(group_id, &fi, args.transfer_data ? &data : nullptr, "default"); if (err) { return err; } diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index b05ecc0a975c8ff7cd23e223e644bddcc5dc5ba3..d983f01e2d419a5b5ca0356e746d4a64d16b54be 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -154,8 +154,12 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it message_header.file_name = message_folder+message_header.file_name; message_header.user_metadata = std::move(meta); if (messages_in_set == 1) { - auto err = producer->Send(message_header,"default", std::move(buffer), write_files ? asapo::kDefaultIngestMode : - asapo::kTransferData, &ProcessAfterSend); + auto err = producer->Send(message_header, + std::move(buffer), + write_files ? asapo::kDefaultIngestMode : + asapo::kTransferData, + "default", + &ProcessAfterSend); if (err) { std::cerr << "Cannot send file: " << err << std::endl; return false; @@ -173,8 +177,12 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it message_header.file_name = message_folder + message_header.file_name; message_header.user_metadata = meta; auto err = - producer->Send(message_header, "default", std::move(buffer), write_files ? asapo::kDefaultIngestMode : - asapo::kTransferData, &ProcessAfterSend); + producer->Send(message_header, + std::move(buffer), + write_files ? asapo::kDefaultIngestMode : + asapo::kTransferData, + "default", + &ProcessAfterSend); if (err) { std::cerr << "Cannot send file: " << err << std::endl; return false; diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h index 3b67335655fb408e5de12e890bca83d695b503f7..c8de7d637ff86f09f2f976e52f792e23a54cc816 100644 --- a/producer/api/cpp/include/asapo/producer/producer.h +++ b/producer/api/cpp/include/asapo/producer/producer.h @@ -44,13 +44,19 @@ class Producer { \param data - A smart pointer to the message data to send, can be nullptr \return Error - Will be nullptr on success */ - virtual Error Send(const MessageHeader& message_header, std::string stream, MessageData data, uint64_t ingest_mode, + virtual Error Send(const MessageHeader &message_header, + MessageData data, + uint64_t ingest_mode, + std::string stream, RequestCallback callback) = 0; //! Sends data to the receiver - same as Send - memory should not be freed until send is finished //! used e.g. for Python bindings - virtual Error Send__(const MessageHeader& message_header, std::string stream, void* data, uint64_t ingest_mode, + virtual Error Send__(const MessageHeader &message_header, + void* data, + uint64_t ingest_mode, + std::string stream, RequestCallback callback) = 0; //! Stop processing threads @@ -60,12 +66,14 @@ class Producer { //! Sends message from a file to a stream /*! \param message_header - A stucture with the meta information (file name, size is ignored). - \param full_path - A full path of the file to send + \param file_to_send - A full path of the file to send \return Error - Will be nullptr on success */ - virtual Error SendFile(const MessageHeader& message_header, std::string stream, std::string full_path, - uint64_t ingest_mode, - RequestCallback callback) = 0; + virtual Error SendFile(const MessageHeader &message_header, + std::string file_to_send, + uint64_t ingest_mode, + std::string stream, + RequestCallback callback) = 0; //! Marks stream finished /*! diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index fcb7db06c105c0963b7535d3faa667676e540218..0ed76a1b699e4a3335eba9812ed8019b7306c802 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -136,10 +136,10 @@ Error CheckData(uint64_t ingest_mode, const MessageHeader& message_header, const return nullptr; } -Error ProducerImpl::Send(const MessageHeader& message_header, - std::string stream, +Error ProducerImpl::Send(const MessageHeader &message_header, MessageData data, uint64_t ingest_mode, + std::string stream, RequestCallback callback) { if (auto err = CheckData(ingest_mode, message_header, &data)) { return err; @@ -219,11 +219,11 @@ Error ProducerImpl::SendMetadata(const std::string& metadata, RequestCallback ca }); } -Error ProducerImpl::Send__(const MessageHeader& message_header, - std::string stream, - void* data, - uint64_t ingest_mode, - RequestCallback callback) { +Error ProducerImpl::Send__(const MessageHeader &message_header, + void* data, + uint64_t ingest_mode, + std::string stream, + RequestCallback callback) { MessageData data_wrapped = MessageData{(uint8_t*)data}; if (auto err = CheckData(ingest_mode, message_header, &data_wrapped)) { @@ -249,11 +249,11 @@ Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) { void ProducerImpl::StopThreads__() { request_pool__->StopThreads(); } -Error ProducerImpl::SendFile(const MessageHeader& message_header, - std::string stream, - std::string full_path, - uint64_t ingest_mode, - RequestCallback callback) { +Error ProducerImpl::SendFile(const MessageHeader &message_header, + std::string full_path, + uint64_t ingest_mode, + std::string stream, + RequestCallback callback) { if (full_path.empty()) { return ProducerErrorTemplates::kWrongInput.Generate("empty filename"); } diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 5e075186c771d6428e54910b256d90c13f653df3..9108a20dad3bdbbc6f622c268a0a74e930dbe66f 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -38,13 +38,22 @@ class ProducerImpl : public Producer { void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error Send(const MessageHeader &message_header, std::string stream, MessageData data, uint64_t ingest_mode, + Error Send(const MessageHeader &message_header, + MessageData data, + uint64_t ingest_mode, + std::string stream, RequestCallback callback) override; - Error Send__(const MessageHeader &message_header, std::string stream, void* data, uint64_t ingest_mode, + Error Send__(const MessageHeader &message_header, + void* data, + uint64_t ingest_mode, + std::string stream, RequestCallback callback) override; void StopThreads__() override; - Error SendFile(const MessageHeader &message_header, std::string stream, std::string full_path, uint64_t ingest_mode, - RequestCallback callback) override; + Error SendFile(const MessageHeader &message_header, + std::string full_path, + uint64_t ingest_mode, + std::string stream, + RequestCallback callback) override; Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream, RequestCallback callback) override; diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp index 9503631899fa91a2e451fc60edbef7021c6685b9..984f10225e06f008a3df5161d32206d85e446d15 100644 --- a/producer/api/cpp/unittests/test_producer.cpp +++ b/producer/api/cpp/unittests/test_producer.cpp @@ -63,7 +63,7 @@ TEST(Producer, SimpleWorkflowWihoutConnection) { &err); asapo::MessageHeader message_header{1, 1, "test"}; - auto err_send = producer->Send(message_header,"stream", nullptr, asapo::kTransferMetaDataOnly, nullptr); + auto err_send = producer->Send(message_header, nullptr, asapo::kTransferMetaDataOnly, "stream", nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 595b2a833beeaf2be9fa035f68f09836a2121112..e9ea117ee49a9dbadcf1dff210de7655688d2c6d 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -100,20 +100,20 @@ TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_, false)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); asapo::MessageHeader message_header{1, 1, "test"}; - auto err = producer.Send(message_header,"default", nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, nullptr, expected_ingest_mode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::MessageHeader message_header{1, 1, long_string}; - auto err = producer.Send(message_header,"default", nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, nullptr, expected_ingest_mode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfStreamEmpty) { asapo::MessageHeader message_header{1, 100, expected_fullpath}; - auto err = producer.Send(message_header,"", nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, nullptr, expected_ingest_mode, "", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } @@ -121,52 +121,52 @@ TEST_F(ProducerImplTests, ErrorIfStreamEmpty) { TEST_F(ProducerImplTests, ErrorIfFileEmpty) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::MessageHeader message_header{1, 1, ""}; - auto err = producer.Send(message_header, "default", nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, nullptr, expected_ingest_mode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfDatasetSizeNotDefined) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("dataset dimensions"))); asapo::MessageHeader message_header{1, 1000, "test", "", 1}; - auto err = producer.Send(message_header, "default", nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, nullptr, expected_ingest_mode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfZeroDataSize) { asapo::MessageData data = asapo::MessageData{new uint8_t[100]}; asapo::MessageHeader message_header{1, 0, expected_fullpath}; - auto err = producer.Send(message_header, "default", std::move(data), asapo::kDefaultIngestMode, nullptr); + auto err = producer.Send(message_header, std::move(data), asapo::kDefaultIngestMode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfNoData) { asapo::MessageHeader message_header{1, 100, expected_fullpath}; - auto err = producer.Send(message_header, "default", nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.Send(message_header, nullptr, asapo::kDefaultIngestMode, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfNoDataSend_) { asapo::MessageHeader message_header{1, 100, expected_fullpath}; - auto err = producer.Send__(message_header, expected_stream, nullptr, asapo::kDefaultIngestMode, nullptr); + auto err = producer.Send__(message_header, nullptr, asapo::kDefaultIngestMode, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, ErrorIfSendingDataWithZeroId) { asapo::MessageHeader message_header{0, 100, expected_fullpath}; - auto err = producer.Send(message_header, "default", nullptr, asapo::kTransferMetaDataOnly, nullptr); + auto err = producer.Send(message_header, nullptr, asapo::kTransferMetaDataOnly, "default", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } TEST_F(ProducerImplTests, OkIfNoDataWithTransferMetadataOnlyMode) { asapo::MessageHeader message_header{1, 100, expected_fullpath}; - auto err = producer.Send(message_header, "default", nullptr, asapo::kTransferMetaDataOnly, nullptr); + auto err = producer.Send(message_header, nullptr, asapo::kTransferMetaDataOnly, "default", nullptr); ASSERT_THAT(err, Eq(nullptr)); } TEST_F(ProducerImplTests, OkIfZeroSizeWithTransferMetadataOnlyMode) { asapo::MessageData data = asapo::MessageData{new uint8_t[100]}; asapo::MessageHeader message_header{1, 0, expected_fullpath}; - auto err = producer.Send(message_header, "default", std::move(data), asapo::kTransferMetaDataOnly, nullptr); + auto err = producer.Send(message_header, std::move(data), asapo::kTransferMetaDataOnly, "default", nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -187,7 +187,7 @@ TEST_F(ProducerImplTests, OKSendingSendRequestWithStream) { nullptr)); asapo::MessageHeader message_header{expected_id, expected_size, expected_name, expected_metadata}; - auto err = producer.Send(message_header, expected_stream, nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, nullptr, expected_ingest_mode, expected_stream, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -264,7 +264,7 @@ TEST_F(ProducerImplTests, OKSendingSendDatasetDataRequest) { asapo::MessageHeader message_header {expected_id, expected_size, expected_name, expected_metadata, expected_dataset_id, expected_dataset_size}; - auto err = producer.Send(message_header, expected_stream, nullptr, expected_ingest_mode, nullptr); + auto err = producer.Send(message_header, nullptr, expected_ingest_mode, expected_stream, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -299,7 +299,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::MessageHeader message_header{expected_id, 0, expected_name}; - auto err = producer.SendFile(message_header, expected_stream, "", expected_ingest_mode, nullptr); + auto err = producer.SendFile(message_header, "", expected_ingest_mode, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); @@ -311,7 +311,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::MessageHeader message_header{expected_id, 0, ""}; - auto err = producer.SendFile(message_header, expected_stream, expected_fullpath, expected_ingest_mode, nullptr); + auto err = producer.SendFile(message_header, expected_fullpath, expected_ingest_mode, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); @@ -323,7 +323,7 @@ TEST_F(ProducerImplTests, ErrorSendingFileToEmptyStream) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); asapo::MessageHeader message_header{expected_id, 0, expected_name}; - auto err = producer.SendFile(message_header, "", expected_fullpath, expected_ingest_mode, nullptr); + auto err = producer.SendFile(message_header, expected_fullpath, expected_ingest_mode, "", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); @@ -346,7 +346,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequestWithStream) { asapo::MessageHeader message_header{expected_id, 0, expected_name}; auto err = - producer.SendFile(message_header, expected_stream, expected_fullpath, expected_ingest_mode, nullptr); + producer.SendFile(message_header, expected_fullpath, expected_ingest_mode, expected_stream, nullptr); ASSERT_THAT(err, Eq(nullptr)); } @@ -384,7 +384,7 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) { EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0); for (auto ingest_mode : ingest_modes) { - auto err = producer.SendFile(message_header, expected_stream, expected_fullpath, ingest_mode, nullptr); + auto err = producer.SendFile(message_header, expected_fullpath, ingest_mode, expected_stream, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput)); } diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 78879b01d7022d2e786895f07b6c05bca4b176a8..c387fe773f2a4f277bcef12ce96dee4b4fa8dcd6 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -95,8 +95,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: cppclass Producer: @staticmethod unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,uint64_t timeout_ms, Error* error) - Error SendFile(const MessageHeader& message_header, string stream, string full_path, uint64_t ingest_mode,RequestCallback callback) - Error Send__(const MessageHeader& message_header, string stream, void* data, uint64_t ingest_mode,RequestCallback callback) + Error SendFile(const MessageHeader& message_header, string file_to_send, uint64_t ingest_mode, string stream, RequestCallback callback) + Error Send__(const MessageHeader& message_header, void* data, uint64_t ingest_mode, string stream, RequestCallback callback) void StopThreads__() void SetLogLevel(LogLevel level) uint64_t GetRequestsQueueSize() diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 6a7e9f64e6138b0f6b1009652f2bd46a381a0bdb..67bb53dc169f1dd38806a6860dd2b8e85feb7ba5 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -113,7 +113,7 @@ cdef class PyProducer: message_header.data_size = 0 else: message_header.data_size = data.nbytes - err = self.c_producer.get().Send__(message_header, _bytes(stream), data_pointer_nparray(data),ingest_mode, + err = self.c_producer.get().Send__(message_header, data_pointer_nparray(data),ingest_mode,_bytes(stream), unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_ndarr, <void*>self,<void*>callback, <void*>data)) if err: @@ -142,7 +142,7 @@ cdef class PyProducer: def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None): cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) message_header.data_size = len(data) - err = self.c_producer.get().Send__(message_header,_bytes(stream), data_pointer_bytes(data), ingest_mode, + err = self.c_producer.get().Send__(message_header, data_pointer_bytes(data), ingest_mode, _bytes(stream), unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr, <void*>self,<void*>callback, <void*>data)) if err: @@ -152,7 +152,7 @@ cdef class PyProducer: Py_XINCREF(<PyObject*>callback) return - def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): + def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None): """ :param id: unique data id :type id: int @@ -164,10 +164,10 @@ cdef class PyProducer: :type user_meta: JSON string :param dataset: a tuple with two int values (dataset substream id, amount of dataset substreams), default None :type dataset: tuple - :param stream: stream name, default "default" - :type stream: string :param ingest_mode: ingest mode flag :type ingest_mode: int + :param stream: stream name, default "default" + :type stream: string :param callback: callback function, default None :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None :raises: @@ -234,7 +234,7 @@ cdef class PyProducer: if err: throw_exception(err) return json.loads(_str(info.Json(True))) - def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None): + def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, ingest_mode = DEFAULT_INGEST_MODE, stream = "default", callback=None): """ :param id: unique data id :type id: int @@ -246,10 +246,10 @@ cdef class PyProducer: :type user_meta: JSON string :param dataset: a tuple with two int values (dataset id, dataset size), default None :type dataset: tuple - :param stream: stream name, default "default" - :type stream: string :param ingest_mode: ingest mode flag :type ingest_mode: int + :param stream: stream name, default "default" + :type stream: string :param callback: callback function, default None :type callback: callback(info,err), where info - json string with event header that was used to send data and response, err - error string or None :raises: @@ -260,7 +260,7 @@ cdef class PyProducer: cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode) message_header.data_size = 0 - err = self.c_producer.get().SendFile(message_header, _bytes(stream), _bytes(local_path), ingest_mode, + err = self.c_producer.get().SendFile(message_header, _bytes(local_path), ingest_mode, _bytes(stream), unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL)) if err: throw_exception(err) diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp index 4b188649f5134dab310faef24dfb8423c6238f5e..d30e479a8bf6e8af518b818ee0aa4cc8accaf040 100644 --- a/producer/event_monitor_producer/src/main_eventmon.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -137,8 +137,8 @@ int main (int argc, char* argv[]) { } message_header.message_id = ++i; HandleDatasets(&message_header); - producer->SendFile(message_header,"default", GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator + - message_header.file_name, asapo::kDefaultIngestMode, ProcessAfterSend); + producer->SendFile(message_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator + + message_header.file_name, asapo::kDefaultIngestMode, "default", ProcessAfterSend); } logger->Info("Producer exit. Processed " + std::to_string(i) + " files"); diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 15e664b04f284055aa7ee1612b400b9301971f4b..e451289ed76e159ef3d4e8d7df80bfb54836fd9f 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -31,7 +31,7 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str asapo::MessageMeta fi; asapo::Error err; - err = consumer->GetNext(&fi, group_id, "default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); if (err) { std::cout << err->Explain() << std::endl; } @@ -45,12 +45,12 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertEq("hello1", std::string(data.get(), data.get() + fi.size)); - err = consumer->GetLast(&fi,"default", nullptr); + err = consumer->GetLast(&fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(fi.name == "10", "GetLast filename"); M_AssertTrue(fi.metadata == "{\"test\":10}", "GetLast metadata"); - err = consumer->GetNext(&fi, group_id,"default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNext2 no error"); M_AssertTrue(fi.name == "2", "GetNext2 filename"); @@ -59,16 +59,16 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(err == nullptr, "SetLastReadMarker no error"); - err = consumer->GetById(8, &fi,"default", nullptr); + err = consumer->GetById(8, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetById error"); M_AssertTrue(fi.name == "8", "GetById filename"); - err = consumer->GetNext(&fi, group_id,"default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNext After GetById no error"); M_AssertTrue(fi.name == "3", "GetNext After GetById filename"); - err = consumer->GetLast(&fi,"default", nullptr); + err = consumer->GetLast(&fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetLast2 no error"); @@ -76,7 +76,7 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(err == nullptr, "SetLastReadMarker 2 no error"); - err = consumer->GetNext(&fi, group_id,"default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNext3 no error"); M_AssertTrue(fi.name == "9", "GetNext3 filename"); @@ -87,12 +87,12 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str err = consumer->ResetLastReadMarker(group_id,"default"); M_AssertTrue(err == nullptr, "SetLastReadMarker"); - err = consumer->GetNext(&fi, group_id,"default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNext4 no error"); M_AssertTrue(fi.name == "1", "GetNext4 filename"); auto group_id2 = consumer->GenerateNewGroupId(&err); - err = consumer->GetNext(&fi, group_id2,"default", nullptr); + err = consumer->GetNext(group_id2, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNext5 no error"); M_AssertTrue(fi.name == "1", "GetNext5 filename"); @@ -121,7 +121,7 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str //streams - err = consumer->GetNext(&fi, group_id, "stream1", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "stream1"); if (err) { std::cout << err->Explain() << std::endl; } @@ -129,7 +129,7 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(err == nullptr, "GetNext stream1 no error"); M_AssertTrue(fi.name == "11", "GetNext stream1 filename"); - err = consumer->GetNext(&fi, group_id, "stream2", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "stream2"); M_AssertTrue(err == nullptr, "GetNext stream2 no error"); M_AssertTrue(fi.name == "21", "GetNext stream2 filename"); @@ -152,14 +152,14 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(err == asapo::ConsumerErrorTemplates::kNoData, "last ack default stream no data"); M_AssertTrue(id == 0, "last ack default stream no data id = 0"); - auto nacks = consumer->GetUnacknowledgedMessages(group_id,"default", 0, 0, &err); + auto nacks = consumer->GetUnacknowledgedMessages(group_id, 0, 0, "default", &err); M_AssertTrue(err == nullptr, "nacks default stream all"); M_AssertTrue(nacks.size() == 10, "nacks default stream size = 10"); err = consumer->Acknowledge(group_id, 1,"default"); M_AssertTrue(err == nullptr, "ack default stream no error"); - nacks = consumer->GetUnacknowledgedMessages(group_id,"default", 0, 0, &err); + nacks = consumer->GetUnacknowledgedMessages(group_id, 0, 0, "default", &err); M_AssertTrue(nacks.size() == 9, "nacks default stream size = 9 after ack"); id = consumer->GetLastAcknowledgedMessage(group_id,"default", &err); @@ -169,33 +169,33 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str err = consumer->Acknowledge(group_id, 1, "stream1"); M_AssertTrue(err == nullptr, "ack stream1 no error"); - nacks = consumer->GetUnacknowledgedMessages(group_id, "stream1", 0, 0, &err); + nacks = consumer->GetUnacknowledgedMessages(group_id, 0, 0, "stream1", &err); M_AssertTrue(nacks.size() == 4, "nacks stream1 size = 4 after ack"); // negative acks consumer->ResetLastReadMarker(group_id,"default"); - err = consumer->GetNext(&fi, group_id,"default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNextNegAckBeforeResend no error"); M_AssertTrue(fi.name == "1", "GetNextNegAckBeforeResend filename"); err = consumer->NegativeAcknowledge(group_id, 1, 0,"default"); M_AssertTrue(err == nullptr, "NegativeAcknowledge no error"); - err = consumer->GetNext(&fi, group_id,"default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNextNegAckWithResend no error"); M_AssertTrue(fi.name == "1", "GetNextNegAckWithResend filename"); // automatic resend consumer->ResetLastReadMarker(group_id,"default"); consumer->SetResendNacs(true, 0, 1); - err = consumer->GetNext(&fi, group_id,"default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNextBeforeResend no error"); M_AssertTrue(fi.name == "1", "GetNextBeforeResend filename"); - err = consumer->GetNext(&fi, group_id,"default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNextWithResend no error"); M_AssertTrue(fi.name == "1", "GetNextWithResend filename"); consumer->SetResendNacs(false, 0, 1); - err = consumer->GetNext(&fi, group_id,"default", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNextAfterResend no error"); M_AssertTrue(fi.name == "2", "GetNextAfterResend filename"); @@ -206,7 +206,7 @@ void TestDataset(const std::unique_ptr<asapo::Consumer>& consumer, const std::st asapo::MessageMeta fi; asapo::Error err; - auto dataset = consumer->GetNextDataset(group_id,"default", 0, &err); + auto dataset = consumer->GetNextDataset(group_id, 0, "default", &err); if (err) { std::cout << err->Explain() << std::endl; } @@ -222,25 +222,25 @@ void TestDataset(const std::unique_ptr<asapo::Consumer>& consumer, const std::st M_AssertEq("hello1", std::string(data.get(), data.get() + dataset.content[0].size)); - dataset = consumer->GetLastDataset("default", 0, &err); + dataset = consumer->GetLastDataset(0, "default", &err); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename"); M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetLastDataset metadata"); - dataset = consumer->GetNextDataset(group_id, "default", 0, &err); + dataset = consumer->GetNextDataset(group_id, 0, "default", &err); M_AssertTrue(err == nullptr, "GetNextDataset2 no error"); M_AssertTrue(dataset.content[0].name == "2_1", "GetNextDataSet2 filename"); - dataset = consumer->GetLastDataset("default", 0, &err); + dataset = consumer->GetLastDataset(0, "default", &err); M_AssertTrue(err == nullptr, "GetLastDataset2 no error"); - dataset = consumer->GetDatasetById(8,"default", 0, &err); + dataset = consumer->GetDatasetById(8, 0, "default", &err); M_AssertTrue(err == nullptr, "GetDatasetById error"); M_AssertTrue(dataset.content[2].name == "8_3", "GetDatasetById filename"); // incomplete datasets without min_size - dataset = consumer->GetNextDataset(group_id,"incomplete",0,&err); + dataset = consumer->GetNextDataset(group_id, 0, "incomplete", &err); M_AssertTrue(err == asapo::ConsumerErrorTemplates::kPartialData, "GetNextDataset incomplete error"); M_AssertTrue(dataset.content.size() == 2, "GetNextDataset incomplete size"); M_AssertTrue(dataset.content[0].name == "1_1", "GetNextDataset incomplete filename"); @@ -250,24 +250,24 @@ void TestDataset(const std::unique_ptr<asapo::Consumer>& consumer, const std::st M_AssertTrue(dataset.expected_size == 3, "GetDatasetById expected size"); M_AssertTrue(dataset.id == 1, "GetDatasetById expected id"); - dataset = consumer->GetLastDataset("incomplete", 0, &err); + dataset = consumer->GetLastDataset(0, "incomplete", &err); M_AssertTrue(err == asapo::ConsumerErrorTemplates::kEndOfStream, "GetLastDataset incomplete no data"); - dataset = consumer->GetDatasetById(2, "incomplete", 0, &err); + dataset = consumer->GetDatasetById(2, 0, "incomplete", &err); M_AssertTrue(err == asapo::ConsumerErrorTemplates::kPartialData, "GetDatasetById incomplete error"); M_AssertTrue(dataset.content[0].name == "2_1", "GetDatasetById incomplete filename"); // incomplete datasets with min_size - dataset = consumer->GetNextDataset(group_id,"incomplete",2,&err); + dataset = consumer->GetNextDataset(group_id, 2, "incomplete", &err); M_AssertTrue(err == nullptr, "GetNextDataset incomplete minsize error"); M_AssertTrue(dataset.id == 2, "GetDatasetById minsize id"); - dataset = consumer->GetLastDataset("incomplete", 2, &err); + dataset = consumer->GetLastDataset(2, "incomplete", &err); M_AssertTrue(err == nullptr, "GetNextDataset incomplete minsize error"); M_AssertTrue(dataset.id == 5, "GetLastDataset minsize id"); - dataset = consumer->GetDatasetById(2, "incomplete", 2, &err); + dataset = consumer->GetDatasetById(2, 2, "incomplete", &err); M_AssertTrue(err == nullptr, "GetDatasetById incomplete minsize error"); M_AssertTrue(dataset.content[0].name == "2_1", "GetDatasetById incomplete minsize filename"); diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index fee6ba7765cc803d6301a246291b50a0300cdf29..013ce0516a8113aa1ee22b255dc37ddfe93034ef 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -42,7 +42,7 @@ def check_file_transfer_service(consumer, group_id): consumer.set_timeout(1000) data, meta = consumer.get_by_id(1, meta_only=False) assert_eq(data.tostring().decode("utf-8"), "hello1", "check_file_transfer_service ok") - data, meta = consumer.get_by_id(1, "streamfts", meta_only=False) + data, meta = consumer.get_by_id(1, meta_only=False, stream = "streamfts") assert_eq(data.tostring().decode("utf-8"), "hello1", "check_file_transfer_service with auto size ok") @@ -115,10 +115,10 @@ def check_single(consumer, group_id): else: exit_on_noerr("io error") - _, meta = consumer.get_next(group_id, "stream1", meta_only=True) + _, meta = consumer.get_next(group_id, meta_only=True, stream = "stream1") assert_metaname(meta, "11", "get next stream1") - _, meta = consumer.get_next(group_id, "stream2", meta_only=True) + _, meta = consumer.get_next(group_id, meta_only=True, stream = "stream2") assert_metaname(meta, "21", "get next stream2") streams = consumer.get_stream_list("") @@ -271,7 +271,7 @@ def check_dataset(consumer, group_id): # incomplete datesets without min_size given try: - consumer.get_next_dataset(group_id, "incomplete") + consumer.get_next_dataset(group_id, stream = "incomplete") except asapo_consumer.AsapoPartialDataError as err: assert_eq(err.partial_data['expected_size'], 3, "get_next_dataset incomplete expected size") assert_eq(err.partial_data['id'], 1, "get_next_dataset incomplete id") @@ -282,7 +282,7 @@ def check_dataset(consumer, group_id): exit_on_noerr("get_next_dataset incomplete err") try: - consumer.get_dataset_by_id(2, "incomplete") + consumer.get_dataset_by_id(2, stream = "incomplete") except asapo_consumer.AsapoPartialDataError as err: assert_eq(err.partial_data['expected_size'], 3, "get_next_dataset incomplete expected size") assert_eq(err.partial_data['id'], 2, "get_next_dataset incomplete id") @@ -293,19 +293,19 @@ def check_dataset(consumer, group_id): exit_on_noerr("get_next_dataset incomplete err") try: - consumer.get_last_dataset("incomplete") + consumer.get_last_dataset(stream = "incomplete") except asapo_consumer.AsapoEndOfStreamError as err: pass else: exit_on_noerr("get_last_dataset incomplete err") # incomplete with min_size given - res = consumer.get_next_dataset(group_id, "incomplete", min_size=2) + res = consumer.get_next_dataset(group_id, min_size=2, stream = "incomplete") assert_eq(res['id'], 2, "get_next_dataset incomplete with minsize") - res = consumer.get_last_dataset("incomplete", min_size=2) + res = consumer.get_last_dataset(min_size=2, stream = "incomplete") assert_eq(res['id'], 5, "get_last_dataset incomplete with minsize") - res = consumer.get_dataset_by_id(2, "incomplete", min_size=1) + res = consumer.get_dataset_by_id(2, min_size=1, stream = "incomplete") assert_eq(res['id'], 2, "get_dataset_by_id incomplete with minsize") diff --git a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp index 96a753089ea0055b88a2a968de18faf0b96a0a2d..d12de6de804c5884d55b371bccb58de80fabe000 100644 --- a/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp +++ b/tests/automatic/consumer/next_multithread_broker/next_multithread_broker.cpp @@ -69,7 +69,7 @@ void TestAll(const Args& args) { std::vector<asapo::MessageMetas>message_metas(args.nthreads); auto exec_next = [&](int i) { asapo::MessageMeta fi; - while ((err = consumer->GetNext(&fi, group_id,"default", nullptr)) == nullptr) { + while ((err = consumer->GetNext(group_id, &fi, nullptr, "default")) == nullptr) { message_metas[i].emplace_back(fi); } printf("%s\n", err->Explain().c_str()); diff --git a/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp b/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp index c80d5ecfef5ee794743acc513a1843be9017fddd..5f025e5f434998f801316a0be33e0426d7937334 100644 --- a/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp +++ b/tests/automatic/full_chain/send_recv_streams/send_recv_streams.cpp @@ -91,7 +91,7 @@ int main(int argc, char* argv[]) { for (uint64_t i = 0; i < n; i++) { asapo::MessageHeader message_header{i + 1, 0, std::to_string(i + 1)}; - producer->Send(message_header, "stream1", nullptr, asapo::kTransferMetaDataOnly, ProcessAfterSend); + producer->Send(message_header, nullptr, asapo::kTransferMetaDataOnly, "stream1", ProcessAfterSend); } producer->SendStreamFinishedFlag("stream1", n, "stream2", ProcessAfterSend); producer->WaitRequestsFinished(10000); @@ -105,10 +105,10 @@ int main(int argc, char* argv[]) { asapo::MessageMeta fi; for (uint64_t i = 0; i < n; i++) { - consumer->GetNext(&fi, group_id, "stream1", nullptr); + consumer->GetNext(group_id, &fi, nullptr, "stream1"); } - err = consumer->GetNext(&fi, group_id, "stream1", nullptr); + err = consumer->GetNext(group_id, &fi, nullptr, "stream1"); if (err != asapo::ConsumerErrorTemplates::kStreamFinished) { return 1; } diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index db1f1ae9cc9c9f48bbb79a8dadaced7189de25bd..59011a35aeee20a613a19c85b37a8d264715e4fc 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -83,7 +83,7 @@ std::vector<std::thread> StartThreads(const Args& params, while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() < params.timeout_ms) { if (params.datasets) { - auto dataset = consumer->GetLastDataset("default", 0, &err); + auto dataset = consumer->GetLastDataset(0, "default", &err); if (err == nullptr) { for (auto& fi : dataset.content) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; @@ -91,7 +91,7 @@ std::vector<std::thread> StartThreads(const Args& params, } } } else { - err = consumer->GetLast(&fi,"default", params.read_data ? &data : nullptr); + err = consumer->GetLast(&fi, params.read_data ? &data : nullptr, "default"); if (err == nullptr) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { diff --git a/tests/manual/producer_cpp/producer.cpp b/tests/manual/producer_cpp/producer.cpp index 795d25e21cbc2909b2b34b065257c78760f66e38..c198a7506144518a54c0f81f5c15aea2935c2a49 100644 --- a/tests/manual/producer_cpp/producer.cpp +++ b/tests/manual/producer_cpp/producer.cpp @@ -83,8 +83,8 @@ int main(int argc, char* argv[]) { // err = producer->Send(message_header,stream, std::move(buffer), // asapo::kTransferMetaDataOnly, &ProcessAfterSend); - err = producer->Send(message_header, stream, std::move(buffer), - asapo::kDefaultIngestMode, &ProcessAfterSend); + err = producer->Send(message_header, std::move(buffer), + asapo::kDefaultIngestMode, stream, &ProcessAfterSend); exit_if_error("Cannot send file", err); err = producer->WaitRequestsFinished(1000);