diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index 5f2358a52326a55e6502e31ce68f2fb98fd49438..a8678621d8dfa4bd0fd7775d2bf76b2b057c245c 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -54,8 +54,13 @@ class DataBroker { \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0; - - + //! Receive next available image. + /*! + \param err - will be set to error data cannot be read, nullptr otherwise. + \param group_id - group id to use. + \return FileInfos - vector of dataset images + */ + virtual FileInfos GetNextDataset(std::string group_id,Error* err) = 0; //! Receive dataset by id. /*! \param id - dataset id diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index f64f140bbc15986971c48b86c10ad0c2ad33ef71..25063cc995b0842e68bc065b34454339c0e78baa 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -104,4 +104,9 @@ FileInfos FolderDataBroker::QueryImages(std::string query, Error* err) { return FileInfos{}; } +FileInfos FolderDataBroker::GetNextDataset(std::string group_id,Error* err) { + *err = TextError("Not supported for folder data broker"); + return FileInfos{}; +} + } diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index 9ffb04d3b04beeebd4339fa09250e89c40c9dc25..326e828f3b1f276b45664afefdbf68ba642353de 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -25,6 +25,7 @@ class FolderDataBroker final : public asapo::DataBroker { Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override; std::unique_ptr<asapo::IO> io__; // modified in testings to mock system calls,otherwise do not touch FileInfos QueryImages(std::string query, Error* err) override; + FileInfos GetNextDataset(std::string group_id,Error* err) override; private: std::string base_path_; bool is_connected_; diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index f20d00482245086eeea3616648c785349128cebb..7e1ea657a9c980e43740534061082666aa7b26a4 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -114,24 +114,26 @@ Error ServerDataBroker::GetBrokerUri() { } -Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op) { +Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, GetImageServerOperation op,bool dataset) { std::string request_suffix = OpToUriCmd(op); std::string request_api = "/database/" + source_name_ + "/" + std::move(group_id) + "/"; uint64_t elapsed_ms = 0; - std::string response; while (true) { auto err = GetBrokerUri(); if (err == nullptr) { RequestInfo ri; ri.host = current_broker_uri_; ri.api = request_api + request_suffix; - err = ProcessRequest(&response, ri); + if (dataset) { + ri.extra_params = "&dataset=true"; + } + err = ProcessRequest(response, ri); if (err == nullptr) { break; } } - ProcessServerError(&err, response, &request_suffix); + ProcessServerError(&err, *response, &request_suffix); if (elapsed_ms >= timeout_ms_) { err = IOErrorTemplates::kTimeout.Generate( ", last error: " + err->Explain()); @@ -141,9 +143,6 @@ Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, std::string group_ elapsed_ms += 100; } - if (!info->SetFromJson(response)) { - return WorkerErrorTemplates::kErrorReadingSource.Generate(std::string(":") + response); - } return nullptr; } @@ -174,16 +173,20 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t } Error err; + std::string response; if (op == GetImageServerOperation::GetID) { - err = GetFileInfoFromServerById(id, info, std::move(group_id)); + err = GetRecordFromServerById(id, &response, std::move(group_id)); } else { - err = GetFileInfoFromServer(info, std::move(group_id), op); + err = GetRecordFromServer(&response, std::move(group_id), op); } - if (err != nullptr) { return err; } + if (!info->SetFromJson(response)) { + return WorkerErrorTemplates::kErrorReadingSource.Generate(std::string(":") + response); + } + return GetDataIfNeeded(info, data); } @@ -278,24 +281,17 @@ Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_i } -Error ServerDataBroker::GetFileInfoFromServerById(uint64_t id, FileInfo* info, std::string group_id) { - +Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id,bool dataset) { RequestInfo ri; ri.api = "/database/" + source_name_ + "/" + std::move(group_id) + "/" + std::to_string(id); ri.extra_params = "&reset=true"; - - - Error err; - auto responce = BrokerRequestWithTimeout(ri, &err); - if (err) { - return err; + if (dataset) { + ri.extra_params+="&dataset=true"; } - if (!info->SetFromJson(responce)) { - return WorkerErrorTemplates::kErrorReadingSource.Generate(); - } - - return nullptr; + Error err; + *response = BrokerRequestWithTimeout(ri, &err); + return err; } std::string ServerDataBroker::GetBeamtimeMeta(Error* err) { @@ -306,8 +302,8 @@ std::string ServerDataBroker::GetBeamtimeMeta(Error* err) { } -FileInfos ServerDataBroker::DecodeFromResponse(std::string response, Error* err) { - auto parser = JsonStringParser("{ \"images\":" + response + "}"); +FileInfos ServerDataBroker::DecodeFileInfosFromResponse(std::string response, Error* err) { + auto parser = JsonStringParser(std::move(response)); std::vector<std::string> vec_fi_endcoded; auto parse_err = parser.GetArrayRawStrings("images", &vec_fi_endcoded); @@ -340,7 +336,28 @@ FileInfos ServerDataBroker::QueryImages(std::string query, Error* err) { return FileInfos{}; } - return DecodeFromResponse(response, err); + return DecodeFileInfosFromResponse("{ \"images\":" + response + "}", err); +} + +FileInfos ServerDataBroker::GetNextDataset(std::string group_id,Error* err) { + return GetFileInfosFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), err); +} + +FileInfos ServerDataBroker::GetFileInfosFromServer(GetImageServerOperation op, + uint64_t id, + std::string group_id, + Error* err) { + FileInfos infos; + std::string response; + if (op == GetImageServerOperation::GetID) { + *err = GetRecordFromServerById(id, &response, std::move(group_id),true); + } else { + *err = GetRecordFromServer(&response, std::move(group_id), op,true); + } + if (*err != nullptr) { + return FileInfos{}; + } + return DecodeFileInfosFromResponse(response, err); } } diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index 4f90f37ec4675dee3a6785a931a77b8112f280ab..80a99a2c793d2c0daf56d0c014b9afd7b97fa38c 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -38,24 +38,26 @@ class ServerDataBroker final : public asapo::DataBroker { Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override; FileInfos QueryImages(std::string query, Error* err) override; + FileInfos GetNextDataset(std::string group_id,Error* err) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch std::unique_ptr<HttpClient> httpclient__; std::unique_ptr<NetClient> net_client__; private: std::string RequestWithToken(std::string uri); - Error GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op); - Error GetFileInfoFromServerById(uint64_t id, FileInfo* info, std::string group_id); + Error GetRecordFromServer(std::string* info, std::string group_id, GetImageServerOperation op,bool dataset=false); + Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id,bool dataset=false); Error GetDataIfNeeded(FileInfo* info, FileData* data); Error GetBrokerUri(); void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri); Error ProcessRequest(std::string* response, const RequestInfo& request); Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, FileInfo* info, FileData* data); + FileInfos GetFileInfosFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, Error* err); bool DataCanBeInBuffer(const FileInfo* info); Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); std::string AppendUri(std::string request_string); - FileInfos DecodeFromResponse(std::string response, Error* err); + FileInfos DecodeFileInfosFromResponse(std::string response, Error* err); std::string OpToUriCmd(GetImageServerOperation op); std::string server_uri_; diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index 9b9c254938b009c44fab8ae43723c40ae941d6ef..f7a38fa138e9a402686acd718b466e6644ce1275 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -354,5 +354,16 @@ TEST(FolderDataBroker, QueryImages) { } +TEST(FolderDataBroker, NextDataset) { + auto data_broker = std::unique_ptr<FolderDataBroker> {new FolderDataBroker("test")}; + + Error err; + auto infos = data_broker->GetNextDataset("bla", &err); + + ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(infos.size(), Eq(0)); +} + + } diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 4c0f85fea4bc05065ba321f8fef7f394989d0375..453bcccbe4a7dd02255714af9813b447e6e7b423 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -299,7 +299,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsParseError) { ASSERT_THAT(err, Eq(asapo::WorkerErrorTemplates::kErrorReadingSource)); } -TEST_F(ServerDataBrokerTests, GetImageReturnsIfNoDtataNeeded) { +TEST_F(ServerDataBrokerTests, GetImageReturnsIfNoDataNeeded) { MockGetBrokerUri(); MockGet("error_response"); @@ -604,6 +604,63 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) { ASSERT_THAT(images[1], Eq(rec2)); } +TEST_F(ServerDataBrokerTests, GetNextDatasetUsesCorrectUri) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/next?token=" + + expected_token+"&dataset=true", _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); + asapo::Error err; + data_broker->GetNextDataset(expected_group_id,&err); +} + + +TEST_F(ServerDataBrokerTests, GetDataSetReturnsFileInfos) { + asapo::Error err; + MockGetBrokerUri(); + + auto to_send1 = CreateFI(); + auto json1 = to_send1.Json(); + auto to_send2 = CreateFI(); + to_send2.id = 2; + auto json2 = to_send2.Json(); + auto to_send3 = CreateFI(); + to_send3.id = 3; + auto json3 = to_send3.Json(); + + auto json=std::string("{")+ + "\"_id\":1,"+ + "\"size\":3,"+ + "\"images\":["+json1+","+json2+","+json3+"]"+ + "}"; + + MockGet(json); + + auto infos = data_broker->GetNextDataset(expected_group_id,&err); + + ASSERT_THAT(err, Eq(nullptr)); + + ASSERT_THAT(infos.size(), Eq(3)); + ASSERT_THAT(infos[0].id, Eq(to_send1.id)); + ASSERT_THAT(infos[1].id, Eq(to_send2.id)); + ASSERT_THAT(infos[2].id, Eq(to_send3.id)); +} + +TEST_F(ServerDataBrokerTests, GetDataSetReturnsParseError) { + MockGetBrokerUri(); + MockGet("error_response"); + + asapo::Error err; + auto infos = data_broker->GetNextDataset(expected_group_id,&err); + + ASSERT_THAT(err, Eq(asapo::WorkerErrorTemplates::kInternalError)); + ASSERT_THAT(infos.size(), Eq(0)); + +} + }