diff --git a/CHANGELOG.md b/CHANGELOG.md index 816f111bda673a1dacccca92bcfbd0ac80c3ab9e..1aaba2da139ea282040cbbae4180284fd894e09b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 20.09.2 +## 20.09.2 (in progress) FEATURES * implemented possibility to send data without writing to database (no need of consecutive indexes, etc. but will not be able to consume such data) @@ -9,10 +9,9 @@ FEATURES * Producer API - return original data in callback payload. * Producer API - allow to set queue limits (number of pending requests and/or max memory), reject new requests if reached the limits - BREAKING CHANGES * Consumer API - get_next_dataset, get_last_dataset, get_dataset_by_id return dictionary with 'id','expected_size','content' fields, not tuple (id,content) as before - +* Consumer API - remove group_id argument from get_last/get_by_id/get_last_dataset/get_dataset_by_id functions ## 20.09.1 diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h index 51c624c23630732a69eb166775abab8aac514b7b..e82175856fd94de1775fa663daf3f2f22955d180 100644 --- a/consumer/api/cpp/include/consumer/data_broker.h +++ b/consumer/api/cpp/include/consumer/data_broker.h @@ -132,25 +132,23 @@ class DataBroker { //! Receive last available dataset which has min_size data tuples. /*! \param err - will be set to error data cannot be read, nullptr otherwise. - \param group_id - group id to use. \param substream - substream to use ("" for default). \param min_size - amount of data tuples in dataset (0 for maximum size) \return DataSet - information about the dataset */ - virtual DataSet GetLastDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) = 0; - virtual DataSet GetLastDataset(std::string group_id, uint64_t min_size, Error* err) = 0; + virtual DataSet GetLastDataset(std::string substream, uint64_t min_size, Error* err) = 0; + virtual DataSet GetLastDataset(uint64_t min_size, Error* err) = 0; //! Receive dataset by id. /*! \param id - dataset id \param err - will be set to error data cannot be read or dataset size less than min_size, nullptr otherwise. - \param group_id - group id to use. \param substream - substream to use ("" for default). \param min_size - wait until dataset has min_size data tuples (0 for maximum size) \return DataSet - information about the dataset */ - virtual DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, uint64_t min_size, Error* err) = 0; - virtual DataSet GetDatasetById(uint64_t id, std::string group_id, uint64_t min_size, Error* err) = 0; + virtual DataSet GetDatasetById(uint64_t id, std::string substream, uint64_t min_size, Error* err) = 0; + virtual DataSet GetDatasetById(uint64_t id, uint64_t min_size, Error* err) = 0; //! Receive single image by id. /*! @@ -159,8 +157,8 @@ class DataBroker { \param data - where to store image data. Can be set to nullptr only image metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) = 0; - virtual Error GetById(uint64_t id, FileInfo* info, std::string group_id, std::string substream, FileData* data) = 0; + virtual Error GetById(uint64_t id, FileInfo* info, FileData* data) = 0; + virtual Error GetById(uint64_t id, FileInfo* info, std::string substream, FileData* data) = 0; //! Receive id of last acknowledged data tuple /*! @@ -175,12 +173,11 @@ class DataBroker { //! Receive last available image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. - \param group_id - group id to use. \param data - where to store image data. Can be set to nullptr only image metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ - virtual Error GetLast(FileInfo* info, std::string group_id, FileData* data) = 0; - virtual Error GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) = 0; + virtual Error GetLast(FileInfo* info, FileData* data) = 0; + virtual Error GetLast(FileInfo* info, std::string substream, FileData* data) = 0; //! Get all images matching the query. /*! diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index f8086c0c3743445bad1c1828aaab115f9a44002d..0d9519cf0719963a5e8c2c8d87c9d725b56f2c96 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -302,11 +302,11 @@ Error ServerDataBroker::GetNext(FileInfo* info, std::string group_id, std::strin data); } -Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, FileData* data) { - return GetLast(info, std::move(group_id), kDefaultSubstream, data); +Error ServerDataBroker::GetLast(FileInfo* info, FileData* data) { + return GetLast(info, kDefaultSubstream, data); } -Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) { +Error ServerDataBroker::GetLast(FileInfo* info, std::string substream, FileData* data) { return GetImageFromServer(GetImageServerOperation::GetLast, 0, "0", @@ -549,19 +549,15 @@ uint64_t ServerDataBroker::GetCurrentSize(std::string substream, Error* err) { uint64_t ServerDataBroker::GetCurrentSize(Error* err) { return GetCurrentSize(kDefaultSubstream, err); } -Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) { +Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, FileData* data) { if (id == 0) { return ConsumerErrorTemplates::kWrongInput.Generate("id should be positive"); } - return GetById(id, info, std::move(group_id), kDefaultSubstream, data); + return GetById(id, info, kDefaultSubstream, data); } -Error ServerDataBroker::GetById(uint64_t id, - FileInfo* info, - std::string group_id, - std::string substream, - FileData* data) { +Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string substream, FileData* data) { return GetImageFromServer(GetImageServerOperation::GetID, id, "0", substream, info, data); } @@ -628,12 +624,12 @@ DataSet ServerDataBroker::GetNextDataset(std::string group_id, std::string subst return GetDatasetFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), std::move(substream),min_size, err); } -DataSet ServerDataBroker::GetLastDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) { +DataSet ServerDataBroker::GetLastDataset(std::string substream, uint64_t min_size, Error* err) { return GetDatasetFromServer(GetImageServerOperation::GetLast, 0, "0", std::move(substream),min_size, err); } -DataSet ServerDataBroker::GetLastDataset(std::string group_id, uint64_t min_size, Error* err) { - return GetLastDataset(std::move(group_id), kDefaultSubstream, min_size, err); +DataSet ServerDataBroker::GetLastDataset(uint64_t min_size, Error* err) { + return GetLastDataset(kDefaultSubstream, min_size, err); } DataSet ServerDataBroker::GetDatasetFromServer(GetImageServerOperation op, @@ -654,11 +650,11 @@ DataSet ServerDataBroker::GetDatasetFromServer(GetImageServerOperation op, return DecodeDatasetFromResponse(response, err); } -DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, uint64_t min_size, Error* err) { - return GetDatasetById(id, std::move(group_id), kDefaultSubstream, min_size, err); +DataSet ServerDataBroker::GetDatasetById(uint64_t id, uint64_t min_size, Error* err) { + return GetDatasetById(id, kDefaultSubstream, min_size, err); } -DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, std::string substream, uint64_t min_size, Error* err) { +DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string substream, uint64_t min_size, Error* err) { return GetDatasetFromServer(GetImageServerOperation::GetID, id, "0", std::move(substream), min_size, err); } diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index c83ac84a86d7b5a8fac7057102c87cb9be9934e6..53a938813ca74ecb9c3ec126f95bab20627389f1 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -78,8 +78,8 @@ class ServerDataBroker final : public asapo::DataBroker { Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; Error GetNext(FileInfo* info, std::string group_id, std::string substream, FileData* data) override; - Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; - Error GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) override; + Error GetLast(FileInfo* info, FileData* data) override; + Error GetLast(FileInfo* info, std::string substream, FileData* data) override; std::string GenerateNewGroupId(Error* err) override; std::string GetBeamtimeMeta(Error* err) override; @@ -87,8 +87,8 @@ class ServerDataBroker final : public asapo::DataBroker { uint64_t GetCurrentSize(Error* err) override; uint64_t GetCurrentSize(std::string substream, Error* err) override; - Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override; - Error GetById(uint64_t id, FileInfo* info, std::string group_id, std::string substream, FileData* data) override; + Error GetById(uint64_t id, FileInfo* info, FileData* data) override; + Error GetById(uint64_t id, FileInfo* info, std::string substream, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override; @@ -102,11 +102,11 @@ class ServerDataBroker final : public asapo::DataBroker { DataSet GetNextDataset(std::string group_id, uint64_t min_size, Error* err) override; DataSet GetNextDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) override; - DataSet GetLastDataset(std::string group_id, uint64_t min_size, Error* err) override; - DataSet GetLastDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) override; + DataSet GetLastDataset(uint64_t min_size, Error* err) override; + DataSet GetLastDataset(std::string substream, uint64_t min_size, Error* err) override; - DataSet GetDatasetById(uint64_t id, std::string group_id, uint64_t min_size, Error* err) override; - DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, uint64_t min_size, Error* err) override; + DataSet GetDatasetById(uint64_t id, uint64_t min_size, Error* err) override; + DataSet GetDatasetById(uint64_t id, std::string substream, uint64_t min_size, Error* err) override; Error RetrieveData(FileInfo* info, FileData* data) override; diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 2fd04236f4aeb21a3d0beffda60b2965287a76e9..d7b1f95368ee612af813b987badfaf36b42c2bfb 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -247,7 +247,7 @@ TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) { SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); - data_broker->GetLast(&info, expected_group_id, nullptr); + data_broker->GetLast(&info, nullptr); } TEST_F(ServerDataBrokerTests, GetImageReturnsEndOfStreamFromHttpClient) { @@ -713,7 +713,7 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(json))); - auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr); + auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(info.name, Eq(to_send.name)); @@ -730,7 +730,7 @@ TEST_F(ServerDataBrokerTests, GetByIdTimeouts) { SetArgPointee<2>(nullptr), Return(""))); - auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr); + auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); } @@ -746,7 +746,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStream) { SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"""\"}"))); - auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr); + auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } @@ -762,7 +762,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) { SetArgPointee<2>(nullptr), Return("{\"op\":\"get_record_by_id\",\"id\":100,\"id_max\":1,\"next_substream\":\"""\"}"))); - auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr); + auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } @@ -1015,7 +1015,7 @@ TEST_F(ServerDataBrokerTests, GetDataSetByIdReturnsPartialFileInfos) { MockGet(json, asapo::HttpCode::PartialContent); - auto dataset = data_broker->GetDatasetById(1, expected_group_id, 0, &err); + auto dataset = data_broker->GetDatasetById(1, 0, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kPartialData)); auto err_data = static_cast<const asapo::PartialErrorData*>(err->GetCustomData()); @@ -1052,7 +1052,7 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(""))); asapo::Error err; - data_broker->GetLastDataset(expected_group_id, 2, &err); + data_broker->GetLastDataset(2, &err); } TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) { @@ -1066,7 +1066,7 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) { SetArgPointee<2>(nullptr), Return(""))); asapo::Error err; - data_broker->GetLastDataset(expected_group_id, expected_substream, 1, &err); + data_broker->GetLastDataset(expected_substream, 1, &err); } TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) { @@ -1080,7 +1080,7 @@ TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) { SetArgPointee<2>(nullptr), Return(""))); asapo::Error err; - data_broker->GetDatasetById(expected_dataset_id, expected_group_id, 0, &err); + data_broker->GetDatasetById(expected_dataset_id, 0, &err); } TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) { @@ -1330,7 +1330,7 @@ TEST_F(ServerDataBrokerTests, GetLastAcknowledgeReturnsNoData) { TEST_F(ServerDataBrokerTests, GetByIdErrorsForId0) { - auto err = data_broker->GetById(0, &info, expected_group_id, nullptr); + auto err = data_broker->GetById(0, &info, nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 78a6ccaaa2ae9f1f02c9586c1afd5531dc65aa37..34450e0e146d167b08f7e69c988ccb2e3aac3acd 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -63,8 +63,8 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: void ForceNoRdma() NetworkConnectionType CurrentConnectionType() Error GetNext(FileInfo* info, string group_id,string substream, FileData* data) - Error GetLast(FileInfo* info, string group_id,string substream, FileData* data) - Error GetById(uint64_t id, FileInfo* info, string group_id, string substream, FileData* data) + Error GetLast(FileInfo* info, string substream, FileData* data) + Error GetById(uint64_t id, FileInfo* info, string substream, FileData* data) uint64_t GetCurrentSize(string substream, Error* err) Error SetLastReadMarker(uint64_t value, string group_id, string substream) Error ResetLastReadMarker(string group_id, string substream) @@ -76,8 +76,8 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil: string GetBeamtimeMeta(Error* err) FileInfos QueryImages(string query, string substream, Error* err) DataSet GetNextDataset(string group_id, string substream, uint64_t min_size, Error* err) - DataSet GetLastDataset(string group_id, string substream, uint64_t min_size, Error* err) - DataSet GetDatasetById(uint64_t id, string group_id, string substream, uint64_t min_size, Error* err) + DataSet GetLastDataset(string substream, uint64_t min_size, Error* err) + DataSet GetDatasetById(uint64_t id, string substream, uint64_t min_size, Error* err) Error RetrieveData(FileInfo* info, FileData* data) vector[StreamInfo] GetSubstreamList(string from_substream, Error* err) void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index 6958b6436e73ecb794e872c4da78e461b7009e4b..67de7608c905b0403afb64eab15311a1d69cdf87 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -117,10 +117,10 @@ cdef class PyDataBroker: err = self.c_broker.GetNext(&info, b_group_id,b_substream, p_data) elif op == "last": with nogil: - err = self.c_broker.GetLast(&info, b_group_id,b_substream, p_data) + err = self.c_broker.GetLast(&info, b_substream, p_data) elif op == "id": with nogil: - err = self.c_broker.GetById(id, &info, b_group_id,b_substream, p_data) + err = self.c_broker.GetById(id, &info, b_substream, p_data) if err: throw_exception(err) info_str = _str(info.Json()) @@ -134,10 +134,10 @@ cdef class PyDataBroker: return arr,meta def get_next(self, group_id, substream = "default", meta_only = True): return self._op("next",group_id,substream,meta_only,0) - def get_last(self, group_id, substream = "default", meta_only = True): - return self._op("last",group_id,substream,meta_only,0) - def get_by_id(self,uint64_t id,group_id, substream = "default",meta_only = True): - return self._op("id",group_id,substream,meta_only,id) + def get_last(self, substream = "default", meta_only = True): + return self._op("last","",substream,meta_only,0) + def get_by_id(self,uint64_t id,substream = "default",meta_only = True): + return self._op("id","",substream,meta_only,id) def retrieve_data(self,meta): json_str = json.dumps(meta) cdef FileInfo info @@ -290,10 +290,10 @@ cdef class PyDataBroker: dataset = self.c_broker.GetNextDataset(b_group_id,b_substream, min_size, &err) elif op == "last": with nogil: - dataset = self.c_broker.GetLastDataset(b_group_id,b_substream, min_size, &err) + dataset = self.c_broker.GetLastDataset(b_substream, min_size, &err) elif op == "id": with nogil: - dataset = self.c_broker.GetDatasetById(id, b_group_id,b_substream, min_size, &err) + dataset = self.c_broker.GetDatasetById(id, b_substream, min_size, &err) json_list = [] for fi in dataset.content: json_list.append(json.loads(_str(fi.Json()))) @@ -303,10 +303,10 @@ cdef class PyDataBroker: return res def get_next_dataset(self, group_id, substream = "default", min_size = 0): return self._op_dataset("next",group_id,substream,min_size,0) - def get_last_dataset(self, group_id, substream = "default", min_size = 0): - return self._op_dataset("last",group_id,substream,min_size,0) - def get_dataset_by_id(self, uint64_t id, group_id, substream = "default", min_size = 0): - return self._op_dataset("id",group_id,substream,min_size,id) + def get_last_dataset(self, substream = "default", min_size = 0): + return self._op_dataset("last","0",substream,min_size,0) + def get_dataset_by_id(self, uint64_t id, substream = "default", min_size = 0): + return self._op_dataset("id","0",substream,min_size,id) def get_beamtime_meta(self): cdef Error err cdef string meta_str diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 166d563543083924a3cdd84e574a75d153a74e0f..3663163624f9206275efb9189e9009564eda3d7c 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -45,7 +45,7 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str M_AssertEq("hello1", std::string(data.get(), data.get() + fi.size)); - err = broker->GetLast(&fi, group_id, nullptr); + err = broker->GetLast(&fi, nullptr); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(fi.name == "10", "GetLast filename"); M_AssertTrue(fi.metadata == "{\"test\":10}", "GetLast metadata"); @@ -59,7 +59,7 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str M_AssertTrue(err == nullptr, "SetLastReadMarker no error"); - err = broker->GetById(8, &fi, group_id, nullptr); + err = broker->GetById(8, &fi, nullptr); M_AssertTrue(err == nullptr, "GetById error"); M_AssertTrue(fi.name == "8", "GetById filename"); @@ -68,7 +68,7 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str M_AssertTrue(fi.name == "3", "GetNext After GetById filename"); - err = broker->GetLast(&fi, group_id, nullptr); + err = broker->GetLast(&fi, nullptr); M_AssertTrue(err == nullptr, "GetLast2 no error"); @@ -222,7 +222,7 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st M_AssertEq("hello1", std::string(data.get(), data.get() + dataset.content[0].size)); - dataset = broker->GetLastDataset(group_id, 0, &err); + dataset = broker->GetLastDataset(0, &err); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename"); M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetLastDataset metadata"); @@ -231,10 +231,10 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st M_AssertTrue(err == nullptr, "GetNextDataset2 no error"); M_AssertTrue(dataset.content[0].name == "2_1", "GetNextDataSet2 filename"); - dataset = broker->GetLastDataset(group_id,0, &err); + dataset = broker->GetLastDataset(0, &err); M_AssertTrue(err == nullptr, "GetLastDataset2 no error"); - dataset = broker->GetDatasetById(8, group_id, 0, &err); + dataset = broker->GetDatasetById(8, 0, &err); M_AssertTrue(err == nullptr, "GetDatasetById error"); M_AssertTrue(dataset.content[2].name == "8_3", "GetDatasetById filename"); @@ -250,10 +250,10 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st M_AssertTrue(dataset.expected_size == 3, "GetDatasetById expected size"); M_AssertTrue(dataset.id == 1, "GetDatasetById expected id"); - dataset = broker->GetLastDataset(group_id,"incomplete",0,&err); + dataset = broker->GetLastDataset("incomplete", 0, &err); M_AssertTrue(err == asapo::ConsumerErrorTemplates::kEndOfStream, "GetLastDataset incomplete no data"); - dataset = broker->GetDatasetById(2, group_id,"incomplete", 0, &err); + dataset = broker->GetDatasetById(2, "incomplete", 0, &err); M_AssertTrue(err == asapo::ConsumerErrorTemplates::kPartialData, "GetDatasetById incomplete error"); M_AssertTrue(dataset.content[0].name == "2_1", "GetDatasetById incomplete filename"); @@ -263,11 +263,11 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st M_AssertTrue(err == nullptr, "GetNextDataset incomplete minsize error"); M_AssertTrue(dataset.id == 2, "GetDatasetById minsize id"); - dataset = broker->GetLastDataset(group_id,"incomplete",2,&err); + dataset = broker->GetLastDataset("incomplete", 2, &err); M_AssertTrue(err == nullptr, "GetNextDataset incomplete minsize error"); M_AssertTrue(dataset.id == 5, "GetLastDataset minsize id"); - dataset = broker->GetDatasetById(2, group_id,"incomplete", 2, &err); + dataset = broker->GetDatasetById(2, "incomplete", 2, &err); M_AssertTrue(err == nullptr, "GetDatasetById incomplete minsize error"); M_AssertTrue(dataset.content[0].name == "2_1", "GetDatasetById incomplete minsize filename"); diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index e0ca82fe0940821c4408379419103c6fe9ccd73b..f830ac0313838eaf04935f33f9d212bc80c4f517 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -37,9 +37,9 @@ def assert_eq(val, expected, name): def check_file_transfer_service(broker, group_id): broker.set_timeout(1000) - data, meta = broker.get_by_id(1, group_id, meta_only=False) + data, meta = broker.get_by_id(1, meta_only=False) assert_eq(data.tostring().decode("utf-8"), "hello1", "check_file_transfer_service ok") - data, meta = broker.get_by_id(1, group_id, "streamfts", meta_only=False) + data, meta = broker.get_by_id(1, "streamfts", meta_only=False) assert_eq(data.tostring().decode("utf-8"), "hello1", "check_file_transfer_service with auto size ok") @@ -57,12 +57,12 @@ def check_single(broker, group_id): assert_metaname(meta, "2", "get next2") assert_usermetadata(meta, "get next2") - _, meta = broker.get_last(group_id, meta_only=True) + _, meta = broker.get_last(meta_only=True) assert_metaname(meta, "5", "get last1") assert_usermetadata(meta, "get last1") try: - broker.get_by_id(30, group_id, meta_only=True) + broker.get_by_id(30, meta_only=True) except asapo_consumer.AsapoEndOfStreamError: pass else: @@ -81,7 +81,7 @@ def check_single(broker, group_id): assert_metaname(meta, "1", "get next4") assert_usermetadata(meta, "get next4") - _, meta = broker.get_by_id(3, group_id, meta_only=True) + _, meta = broker.get_by_id(3, meta_only=True) assert_metaname(meta, "3", "get get_by_id") assert_usermetadata(meta, "get get_by_id") @@ -104,7 +104,7 @@ def check_single(broker, group_id): exit_on_noerr("should give wrong input error") try: - broker.get_last(group_id, meta_only=False) + broker.get_last(meta_only=False) except asapo_consumer.AsapoLocalIOError as err: print(err) pass @@ -195,7 +195,7 @@ def check_single(broker, group_id): broker = asapo_consumer.create_server_broker("bla", path, True, beamtime, "", token, 1000) try: - broker.get_last(group_id, meta_only=True) + broker.get_last(meta_only=True) except asapo_consumer.AsapoUnavailableServiceError as err: print(err) pass @@ -219,7 +219,7 @@ def check_dataset(broker, group_id): assert_eq(res['id'], 2, "get_next_dataset2") assert_metaname(res['content'][0], "2_1", "get nextdataset2 name1") - res = broker.get_last_dataset(group_id) + res = broker.get_last_dataset() assert_eq(res['id'], 10, "get_last_dataset1") assert_eq(res['expected_size'], 3, "get_last_dataset1 size ") assert_metaname(res['content'][2], "10_3", "get get_last_dataset1 name3") @@ -227,7 +227,7 @@ def check_dataset(broker, group_id): res = broker.get_next_dataset(group_id) assert_eq(res['id'], 3, "get_next_dataset3") - res = broker.get_dataset_by_id(8, group_id) + res = broker.get_dataset_by_id(8) assert_eq(res['id'], 8, "get_dataset_by_id1 id") assert_metaname(res['content'][2], "8_3", "get get_dataset_by_id1 name3") @@ -244,7 +244,7 @@ def check_dataset(broker, group_id): exit_on_noerr("get_next_dataset incomplete err") try: - broker.get_dataset_by_id(2, group_id, "incomplete") + broker.get_dataset_by_id(2, "incomplete") except asapo_consumer.AsapoPartialDataError as err: assert_eq(err.partial_data['expected_size'], 3, "get_next_dataset incomplete expected size") assert_eq(err.partial_data['id'], 2, "get_next_dataset incomplete id") @@ -255,7 +255,7 @@ def check_dataset(broker, group_id): exit_on_noerr("get_next_dataset incomplete err") try: - broker.get_last_dataset(group_id, "incomplete") + broker.get_last_dataset("incomplete") except asapo_consumer.AsapoEndOfStreamError as err: pass else: @@ -264,10 +264,10 @@ def check_dataset(broker, group_id): res = broker.get_next_dataset(group_id, "incomplete", min_size=2) assert_eq(res['id'], 2, "get_next_dataset incomplete with minsize") - res = broker.get_last_dataset(group_id, "incomplete", min_size=2) + res = broker.get_last_dataset("incomplete", min_size=2) assert_eq(res['id'], 5, "get_last_dataset incomplete with minsize") - res = broker.get_dataset_by_id(2, group_id, "incomplete", min_size=1) + res = broker.get_dataset_by_id(2, "incomplete", min_size=1) assert_eq(res['id'], 2, "get_dataset_by_id incomplete with minsize") diff --git a/tests/manual/performance_broker_receiver/getlast_broker.cpp b/tests/manual/performance_broker_receiver/getlast_broker.cpp index c3a285c54a0cdc6000e83ce1afe4483ccb351336..b73a0101671d7ace2adada85d23aa52fdbe1581d 100644 --- a/tests/manual/performance_broker_receiver/getlast_broker.cpp +++ b/tests/manual/performance_broker_receiver/getlast_broker.cpp @@ -81,7 +81,7 @@ std::vector<std::thread> StartThreads(const Args& params, while (std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now() - start).count() < params.timeout_ms) { if (params.datasets) { - auto dataset = broker->GetLastDataset(group_id, 0, &err); + auto dataset = broker->GetLastDataset(0, &err); if (err == nullptr) { for (auto& fi : dataset.content) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; @@ -89,7 +89,7 @@ std::vector<std::thread> StartThreads(const Args& params, } } } else { - err = broker->GetLast(&fi, group_id, params.read_data ? &data : nullptr); + err = broker->GetLast(&fi, params.read_data ? &data : nullptr); if (err == nullptr) { (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) {