diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 79eced88a632c5457c602556152b373860cdfa30..f8086c0c3743445bad1c1828aaab115f9a44002d 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -200,6 +200,7 @@ Error ServerDataBroker::DiscoverService(const std::string &service_name, std::st bool ServerDataBroker::SwitchToGetByIdIfPartialData(Error* err, const std::string &response, + std::string* group_id, std::string* redirect_uri) { if (*err == ConsumerErrorTemplates::kPartialData) { auto error_data = static_cast<const PartialErrorData*>((*err)->GetCustomData()); @@ -208,12 +209,13 @@ bool ServerDataBroker::SwitchToGetByIdIfPartialData(Error* err, return false; } *redirect_uri = std::to_string(error_data->id); + *group_id = "0"; return true; } return false; } -bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string &response, std::string* redirect_uri) { +bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string &response,std::string* group_id, std::string* redirect_uri) { if (*err == ConsumerErrorTemplates::kNoData) { auto error_data = static_cast<const ConsumerErrorData*>((*err)->GetCustomData()); if (error_data == nullptr) { @@ -221,6 +223,7 @@ bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string &re return false; } *redirect_uri = std::to_string(error_data->id); + *group_id = "0"; return true; } return false; @@ -241,16 +244,16 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g GetImageServerOperation op, bool dataset, uint64_t min_size) { std::string request_suffix = OpToUriCmd(op); + std::string request_group = OpToUriCmd(op); std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream - + "/" + std::move(substream) + - +"/" + std::move(group_id) + "/"; + + "/" + std::move(substream); uint64_t elapsed_ms = 0; Error no_data_error; while (true) { auto start = system_clock::now(); auto err = DiscoverService(kBrokerServiceName, ¤t_broker_uri_); if (err == nullptr) { - auto ri = PrepareRequestInfo(request_api + request_suffix, dataset, min_size); + auto ri = PrepareRequestInfo(request_api + "/" + group_id + "/" + request_suffix, dataset, min_size); if (request_suffix == "next" && resend_) { ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_sec=" + std::to_string(delay_sec_) + "&resend_attempts=" + std::to_string(resend_attempts_); @@ -268,8 +271,8 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g } if (request_suffix == "next") { - auto save_error = SwitchToGetByIdIfNoData(&err, *response, &request_suffix) - || SwitchToGetByIdIfPartialData(&err, *response, &request_suffix); + auto save_error = SwitchToGetByIdIfNoData(&err, *response, &group_id, &request_suffix) + || SwitchToGetByIdIfPartialData(&err, *response, &group_id, &request_suffix); if (err == ConsumerErrorTemplates::kInterruptedTransaction) { return err; } @@ -306,7 +309,7 @@ Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, FileData* Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, std::string substream, FileData* data) { return GetImageFromServer(GetImageServerOperation::GetLast, 0, - std::move(group_id), + "0", std::move(substream), info, data); @@ -559,7 +562,7 @@ Error ServerDataBroker::GetById(uint64_t id, std::string group_id, std::string substream, FileData* data) { - return GetImageFromServer(GetImageServerOperation::GetID, id, group_id, substream, info, data); + return GetImageFromServer(GetImageServerOperation::GetID, id, "0", substream, info, data); } Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id, @@ -626,7 +629,7 @@ DataSet ServerDataBroker::GetNextDataset(std::string group_id, std::string subst } DataSet ServerDataBroker::GetLastDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) { - return GetDatasetFromServer(GetImageServerOperation::GetLast, 0, std::move(group_id), std::move(substream),min_size, err); + return GetDatasetFromServer(GetImageServerOperation::GetLast, 0, "0", std::move(substream),min_size, err); } DataSet ServerDataBroker::GetLastDataset(std::string group_id, uint64_t min_size, Error* err) { @@ -656,7 +659,7 @@ DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, uint } DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, std::string substream, uint64_t min_size, Error* err) { - return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), std::move(substream), min_size, err); + return GetDatasetFromServer(GetImageServerOperation::GetID, id, "0", std::move(substream), min_size, err); } StreamInfos ParseSubstreamsFromResponse(std::string response, Error* err) { diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index 298201591730f52fe076757edfc1f779ef291fb9..c83ac84a86d7b5a8fac7057102c87cb9be9934e6 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -130,8 +130,8 @@ class ServerDataBroker final : public asapo::DataBroker { bool dataset = false, uint64_t min_size = 0); Error GetDataIfNeeded(FileInfo* info, FileData* data); Error DiscoverService(const std::string& service_name, std::string* uri_to_set); - bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri); - bool SwitchToGetByIdIfPartialData(Error* err, const std::string& response, std::string* redirect_uri); + bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* group_id,std::string* redirect_uri); + bool SwitchToGetByIdIfPartialData(Error* err, const std::string& response, std::string* group_id,std::string* redirect_uri); Error ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri); Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream, FileInfo* info, FileData* data); diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 512e339c069612960568967c200170e2a1f245f0..2fd04236f4aeb21a3d0beffda60b2965287a76e9 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -240,10 +240,10 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUriWithSubstream) { TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + - expected_group_id + "/last?token=" - + expected_token, _, - _)).WillOnce(DoAll( + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/last?token=" + + expected_token, _, + _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); @@ -410,8 +410,8 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorO Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) + ",\"id_max\":2,\"next_substream\":\"""\"}"))); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + - expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::NotFound), SetArgPointee<2>(nullptr), @@ -704,10 +704,9 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) { auto to_send = CreateFI(); auto json = to_send.Json(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + - expected_group_id - + "/" + std::to_string( - expected_dataset_id) + "?token=" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + + std::to_string( + expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -724,8 +723,8 @@ TEST_F(ServerDataBrokerTests, GetByIdTimeouts) { MockGetBrokerUri(); data_broker->SetTimeout(10); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + - expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), @@ -740,8 +739,8 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStream) { MockGetBrokerUri(); data_broker->SetTimeout(10); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + - expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), @@ -756,8 +755,8 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) { MockGetBrokerUri(); data_broker->SetTimeout(10); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + - expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + + std::to_string(expected_dataset_id) + "?token=" + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::Conflict), SetArgPointee<2>(nullptr), @@ -1016,14 +1015,13 @@ TEST_F(ServerDataBrokerTests, GetDataSetByIdReturnsPartialFileInfos) { MockGet(json, asapo::HttpCode::PartialContent); - auto dataset = data_broker->GetDatasetById(1,expected_group_id, 0, &err); + auto dataset = data_broker->GetDatasetById(1, expected_group_id, 0, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kPartialData)); auto err_data = static_cast<const asapo::PartialErrorData*>(err->GetCustomData()); ASSERT_THAT(err_data->expected_size, Eq(3)); ASSERT_THAT(err_data->id, Eq(1)); - ASSERT_THAT(dataset.id, Eq(1)); ASSERT_THAT(dataset.content.size(), Eq(2)); ASSERT_THAT(dataset.content[0].id, Eq(to_send1.id)); @@ -1046,10 +1044,10 @@ TEST_F(ServerDataBrokerTests, GetDataSetReturnsParseError) { TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + - expected_group_id + "/last?token=" - + expected_token + "&dataset=true&minsize=2", _, - _)).WillOnce(DoAll( + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/last?token=" + + expected_token + "&dataset=true&minsize=2", _, + _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), Return(""))); @@ -1061,8 +1059,7 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + - expected_substream + "/" + - expected_group_id + "/last?token=" + expected_substream + "/0/last?token=" + expected_token + "&dataset=true&minsize=1", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -1075,9 +1072,8 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) { TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" + - expected_group_id + - "/" + std::to_string(expected_dataset_id) + "?token=" + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/" + + std::to_string(expected_dataset_id) + "?token=" + expected_token + "&dataset=true" + "&minsize=0", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK),