Skip to content
Snippets Groups Projects
Commit 2f81c50a authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

datasets implemented in worker C++ api

parent 007b1505
No related branches found
No related tags found
No related merge requests found
......@@ -54,8 +54,13 @@ class DataBroker {
\return Error if both pointers are nullptr or data cannot be read, nullptr otherwise.
*/
virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0;
//! Receive next available image.
/*!
\param err - will be set to error data cannot be read, nullptr otherwise.
\param group_id - group id to use.
\return FileInfos - vector of dataset images
*/
virtual FileInfos GetNextDataset(std::string group_id,Error* err) = 0;
//! Receive dataset by id.
/*!
\param id - dataset id
......
......@@ -104,4 +104,9 @@ FileInfos FolderDataBroker::QueryImages(std::string query, Error* err) {
return FileInfos{};
}
FileInfos FolderDataBroker::GetNextDataset(std::string group_id,Error* err) {
*err = TextError("Not supported for folder data broker");
return FileInfos{};
}
}
......@@ -25,6 +25,7 @@ class FolderDataBroker final : public asapo::DataBroker {
Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override;
std::unique_ptr<asapo::IO> io__; // modified in testings to mock system calls,otherwise do not touch
FileInfos QueryImages(std::string query, Error* err) override;
FileInfos GetNextDataset(std::string group_id,Error* err) override;
private:
std::string base_path_;
bool is_connected_;
......
......@@ -114,24 +114,26 @@ Error ServerDataBroker::GetBrokerUri() {
}
Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op) {
Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, GetImageServerOperation op,bool dataset) {
std::string request_suffix = OpToUriCmd(op);
std::string request_api = "/database/" + source_name_ + "/" + std::move(group_id) + "/";
uint64_t elapsed_ms = 0;
std::string response;
while (true) {
auto err = GetBrokerUri();
if (err == nullptr) {
RequestInfo ri;
ri.host = current_broker_uri_;
ri.api = request_api + request_suffix;
err = ProcessRequest(&response, ri);
if (dataset) {
ri.extra_params = "&dataset=true";
}
err = ProcessRequest(response, ri);
if (err == nullptr) {
break;
}
}
ProcessServerError(&err, response, &request_suffix);
ProcessServerError(&err, *response, &request_suffix);
if (elapsed_ms >= timeout_ms_) {
err = IOErrorTemplates::kTimeout.Generate( ", last error: " + err->Explain());
......@@ -141,9 +143,6 @@ Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, std::string group_
elapsed_ms += 100;
}
if (!info->SetFromJson(response)) {
return WorkerErrorTemplates::kErrorReadingSource.Generate(std::string(":") + response);
}
return nullptr;
}
......@@ -174,16 +173,20 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t
}
Error err;
std::string response;
if (op == GetImageServerOperation::GetID) {
err = GetFileInfoFromServerById(id, info, std::move(group_id));
err = GetRecordFromServerById(id, &response, std::move(group_id));
} else {
err = GetFileInfoFromServer(info, std::move(group_id), op);
err = GetRecordFromServer(&response, std::move(group_id), op);
}
if (err != nullptr) {
return err;
}
if (!info->SetFromJson(response)) {
return WorkerErrorTemplates::kErrorReadingSource.Generate(std::string(":") + response);
}
return GetDataIfNeeded(info, data);
}
......@@ -278,24 +281,17 @@ Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_i
}
Error ServerDataBroker::GetFileInfoFromServerById(uint64_t id, FileInfo* info, std::string group_id) {
Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id,bool dataset) {
RequestInfo ri;
ri.api = "/database/" + source_name_ + "/" + std::move(group_id) + "/" + std::to_string(id);
ri.extra_params = "&reset=true";
Error err;
auto responce = BrokerRequestWithTimeout(ri, &err);
if (err) {
return err;
if (dataset) {
ri.extra_params+="&dataset=true";
}
if (!info->SetFromJson(responce)) {
return WorkerErrorTemplates::kErrorReadingSource.Generate();
}
return nullptr;
Error err;
*response = BrokerRequestWithTimeout(ri, &err);
return err;
}
std::string ServerDataBroker::GetBeamtimeMeta(Error* err) {
......@@ -306,8 +302,8 @@ std::string ServerDataBroker::GetBeamtimeMeta(Error* err) {
}
FileInfos ServerDataBroker::DecodeFromResponse(std::string response, Error* err) {
auto parser = JsonStringParser("{ \"images\":" + response + "}");
FileInfos ServerDataBroker::DecodeFileInfosFromResponse(std::string response, Error* err) {
auto parser = JsonStringParser(std::move(response));
std::vector<std::string> vec_fi_endcoded;
auto parse_err = parser.GetArrayRawStrings("images", &vec_fi_endcoded);
......@@ -340,7 +336,28 @@ FileInfos ServerDataBroker::QueryImages(std::string query, Error* err) {
return FileInfos{};
}
return DecodeFromResponse(response, err);
return DecodeFileInfosFromResponse("{ \"images\":" + response + "}", err);
}
FileInfos ServerDataBroker::GetNextDataset(std::string group_id,Error* err) {
return GetFileInfosFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), err);
}
FileInfos ServerDataBroker::GetFileInfosFromServer(GetImageServerOperation op,
uint64_t id,
std::string group_id,
Error* err) {
FileInfos infos;
std::string response;
if (op == GetImageServerOperation::GetID) {
*err = GetRecordFromServerById(id, &response, std::move(group_id),true);
} else {
*err = GetRecordFromServer(&response, std::move(group_id), op,true);
}
if (*err != nullptr) {
return FileInfos{};
}
return DecodeFileInfosFromResponse(response, err);
}
}
......@@ -38,24 +38,26 @@ class ServerDataBroker final : public asapo::DataBroker {
Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override;
void SetTimeout(uint64_t timeout_ms) override;
FileInfos QueryImages(std::string query, Error* err) override;
FileInfos GetNextDataset(std::string group_id,Error* err) override;
std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch
std::unique_ptr<HttpClient> httpclient__;
std::unique_ptr<NetClient> net_client__;
private:
std::string RequestWithToken(std::string uri);
Error GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op);
Error GetFileInfoFromServerById(uint64_t id, FileInfo* info, std::string group_id);
Error GetRecordFromServer(std::string* info, std::string group_id, GetImageServerOperation op,bool dataset=false);
Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id,bool dataset=false);
Error GetDataIfNeeded(FileInfo* info, FileData* data);
Error GetBrokerUri();
void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri);
Error ProcessRequest(std::string* response, const RequestInfo& request);
Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, FileInfo* info, FileData* data);
FileInfos GetFileInfosFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, Error* err);
bool DataCanBeInBuffer(const FileInfo* info);
Error TryGetDataFromBuffer(const FileInfo* info, FileData* data);
std::string BrokerRequestWithTimeout(RequestInfo request, Error* err);
std::string AppendUri(std::string request_string);
FileInfos DecodeFromResponse(std::string response, Error* err);
FileInfos DecodeFileInfosFromResponse(std::string response, Error* err);
std::string OpToUriCmd(GetImageServerOperation op);
std::string server_uri_;
......
......@@ -354,5 +354,16 @@ TEST(FolderDataBroker, QueryImages) {
}
TEST(FolderDataBroker, NextDataset) {
auto data_broker = std::unique_ptr<FolderDataBroker> {new FolderDataBroker("test")};
Error err;
auto infos = data_broker->GetNextDataset("bla", &err);
ASSERT_THAT(err, Ne(nullptr));
ASSERT_THAT(infos.size(), Eq(0));
}
}
......@@ -299,7 +299,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsParseError) {
ASSERT_THAT(err, Eq(asapo::WorkerErrorTemplates::kErrorReadingSource));
}
TEST_F(ServerDataBrokerTests, GetImageReturnsIfNoDtataNeeded) {
TEST_F(ServerDataBrokerTests, GetImageReturnsIfNoDataNeeded) {
MockGetBrokerUri();
MockGet("error_response");
......@@ -604,6 +604,63 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) {
ASSERT_THAT(images[1], Eq(rec2));
}
TEST_F(ServerDataBrokerTests, GetNextDatasetUsesCorrectUri) {
MockGetBrokerUri();
EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/next?token="
+ expected_token+"&dataset=true", _,
_)).WillOnce(DoAll(
SetArgPointee<1>(HttpCode::OK),
SetArgPointee<2>(nullptr),
Return("")));
asapo::Error err;
data_broker->GetNextDataset(expected_group_id,&err);
}
TEST_F(ServerDataBrokerTests, GetDataSetReturnsFileInfos) {
asapo::Error err;
MockGetBrokerUri();
auto to_send1 = CreateFI();
auto json1 = to_send1.Json();
auto to_send2 = CreateFI();
to_send2.id = 2;
auto json2 = to_send2.Json();
auto to_send3 = CreateFI();
to_send3.id = 3;
auto json3 = to_send3.Json();
auto json=std::string("{")+
"\"_id\":1,"+
"\"size\":3,"+
"\"images\":["+json1+","+json2+","+json3+"]"+
"}";
MockGet(json);
auto infos = data_broker->GetNextDataset(expected_group_id,&err);
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(infos.size(), Eq(3));
ASSERT_THAT(infos[0].id, Eq(to_send1.id));
ASSERT_THAT(infos[1].id, Eq(to_send2.id));
ASSERT_THAT(infos[2].id, Eq(to_send3.id));
}
TEST_F(ServerDataBrokerTests, GetDataSetReturnsParseError) {
MockGetBrokerUri();
MockGet("error_response");
asapo::Error err;
auto infos = data_broker->GetNextDataset(expected_group_id,&err);
ASSERT_THAT(err, Eq(asapo::WorkerErrorTemplates::kInternalError));
ASSERT_THAT(infos.size(), Eq(0));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment