diff --git a/common/cpp/src/json_parser/rapid_json.cpp b/common/cpp/src/json_parser/rapid_json.cpp index 84eab91f1aa50b053052de2fedb0a954859a1cb7..3c79048900e0f00bcd4271492614be3c45b7c8f3 100644 --- a/common/cpp/src/json_parser/rapid_json.cpp +++ b/common/cpp/src/json_parser/rapid_json.cpp @@ -28,11 +28,13 @@ Error RapidJson::LazyInitialize() const noexcept { } } - if (doc_.Parse(str.c_str()).HasParseError()) { + ParseResult ok = doc_.Parse(str.c_str()); + if (!ok || !doc_.IsObject()) { return TextError("Cannot parse document"); } object_ = doc_.GetObject(); + object_p_ = &object_; initialized_ = true; return nullptr; diff --git a/common/cpp/unittests/json_parser/test_json_parser.cpp b/common/cpp/unittests/json_parser/test_json_parser.cpp index 863e56516d44a30e58b5055ce232666c7d6be695..36a52780ff520ab44f9e112d1e854b7db81933ff 100644 --- a/common/cpp/unittests/json_parser/test_json_parser.cpp +++ b/common/cpp/unittests/json_parser/test_json_parser.cpp @@ -141,6 +141,19 @@ TEST(ParseString, ErrorOnWrongDocument) { } +TEST(ParseString, ErrorOnWrongFormatt) { + std::string json = "10"; + + JsonStringParser parser{json}; + + uint64_t id; + auto err = parser.GetUInt64("_id", &id); + + ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(err->Explain(), ::testing::HasSubstr("parse")); + +} + TEST(ParseString, IntArrayConvertToJson) { std::string json = R"({"array":[1,2,3]})"; diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index 50d6b90cea1a9377f5a96da2e2757a2a1d57f28a..17731f0fdb234d1dc627d1ed789ebf6916e44f41 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -31,24 +31,56 @@ class DataBroker { //! Connect to the data source - will scan file folders or connect to the database. // TODO: do we need this? virtual Error Connect() = 0; - //! Set timeout for broker operations. Default - no timeout + //! Reset counter for the specific group. + /*! + \param group_id - group id to use. + \return nullptr of command was successful, otherwise error. + */ + virtual Error ResetCounter(std::string group_id) = 0; + + //! Set timeout for broker operations. Default - no timeout virtual void SetTimeout(uint64_t timeout_ms) = 0; - //! Receive next image. + + + //! Get current number of datasets /*! \param err - return nullptr of operation succeed, error otherwise. - \return group id if OK, "" otherwise. + \return number of datasets. */ + virtual uint64_t GetNDataSets(Error* err) = 0; + + //! Generate new GroupID. + /*! + \param err - return nullptr of operation succeed, error otherwise. + \return group ID. + */ virtual std::string GenerateNewGroupId(Error* err) = 0; - //! Receive last available image. + + + //! Receive next available image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. + \param group_id - group id to use. \param data - where to store image data. Can be set to nullptr only image metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0; - //! Receive last available image. + + + //! Receive dataset by id. + /*! + \param id - dataset id + \param info - where to store image metadata. Can be set to nullptr only image data is needed. + \param data - where to store image data. Can be set to nullptr only image metadata is needed. + \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. + */ + virtual Error GetById(uint64_t id,FileInfo* info, FileData* data) = 0; + + + //! Receive last available image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. + \param group_id - group id to use. \param data - where to store image data. Can be set to nullptr only image metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index d2a47b65e3a2fa6040e8143186fe5ad784ddd2c9..67a549da9b16c9a3886dcd1371cce268e99ac509 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -79,5 +79,18 @@ std::string FolderDataBroker::GenerateNewGroupId(Error* err) { *err = nullptr; return ""; } +Error FolderDataBroker::ResetCounter(std::string group_id) { + std::lock_guard<std::mutex> lock{mutex_}; + current_file_ = -1; + return nullptr; +} +uint64_t FolderDataBroker::GetNDataSets(Error* err) { + std::lock_guard<std::mutex> lock{mutex_}; + return filelist_.size(); +} + +Error FolderDataBroker::GetById(uint64_t id, FileInfo* info, FileData* data) { + return GetFileByIndex(id -1 , info, data); +} } diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index 78a2591f848124b7644364a1ebacb71f61491089..b360c285eda6b86a0679734d124b548640055f31 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -14,12 +14,13 @@ class FolderDataBroker final : public asapo::DataBroker { public: explicit FolderDataBroker(const std::string& source_name); Error Connect() override; + Error ResetCounter(std::string group_id) override; Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override {}; // to timeout in this case - std::string GenerateNewGroupId(Error* err) - override; // return "0" always and no error - no group ids for folder datra broker - + std::string GenerateNewGroupId(Error* err) override; // return "0" always and no error - no group ids for folder datra broker + uint64_t GetNDataSets(Error* err) override; + Error GetById(uint64_t id,FileInfo* info, FileData* data) override; std::unique_ptr<asapo::IO> io__; // modified in testings to mock system calls,otherwise do not touch private: std::string base_path_; diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index db61b1a8c8f18d9bbcdbf7e5e8da9db36d3f8505..70167e66caf36df7a4a9e7a9eb15e76bd4724e1c 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -207,24 +207,63 @@ Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* dat return net_client__->GetData(info, data); } + std::string ServerDataBroker::GenerateNewGroupId(Error* err) { + return BrokerRequestWithTimeout("creategroup",true,err); +} + +std::string ServerDataBroker::BrokerRequestWithTimeout(std::string request_string, bool post_request, Error* err) { uint64_t elapsed_ms = 0; std::string response; while (elapsed_ms <= timeout_ms_) { *err = GetBrokerUri(); if (*err == nullptr) { - std::string request = current_broker_uri_ + "/creategroup"; - *err = ProcessRequest(&response, request, true); - if (*err == nullptr) { + *err = ProcessRequest(&response, current_broker_uri_ + "/" + request_string, post_request); + if (*err == nullptr || (*err)->GetErrorType() == ErrorType::kEndOfFile) { return response; } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); elapsed_ms += 100; } - *err = TextErrorWithType("exit on timeout, last error: " + (*err)->Explain(), asapo::ErrorType::kTimeOut); return ""; } +Error ServerDataBroker::ResetCounter(std::string group_id) { + std::string request_string = "database/" + source_name_+"/"+std::move(group_id) + "/resetcounter"; + Error err; + BrokerRequestWithTimeout(request_string,true,&err); + return err; +} + +uint64_t ServerDataBroker::GetNDataSets(Error* err) { + std::string request_string = "database/" + source_name_+"/size"; + auto responce = BrokerRequestWithTimeout(request_string,false,err); + if (*err) { + return 0; + } + JsonStringParser parser(responce); + uint64_t size; + if ((*err = parser.GetUInt64("size", &size)) != nullptr) { + return 0; + } + return size; + +} +Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, FileData* data) { + std::string request_string = "database/" + source_name_+"/"+std::to_string(id); + Error err; + auto responce = BrokerRequestWithTimeout(request_string,false,&err); + if (err) { + return err; + } + + if (!info->SetFromJson(responce)) { + return TextError(WorkerErrorMessage::kErrorReadingSource); + } + + return nullptr; +} + } diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index 80dfe0d7c5199daaa194ee396fd1a99991acbfee..b823b19840ff5491aef017a4becc46efd7682fa4 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -19,10 +19,12 @@ class ServerDataBroker final : public asapo::DataBroker { public: explicit ServerDataBroker(std::string server_uri, std::string source_path, std::string source_name, std::string token); Error Connect() override; + Error ResetCounter(std::string group_id) override; Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; std::string GenerateNewGroupId(Error* err) override; - + uint64_t GetNDataSets(Error* err) override; + Error GetById(uint64_t id,FileInfo* info, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch std::unique_ptr<HttpClient> httpclient__; @@ -37,6 +39,7 @@ class ServerDataBroker final : public asapo::DataBroker { Error GetImageFromServer(GetImageServerOperation op, std::string group_id, FileInfo* info, FileData* data); bool DataCanBeInBuffer(const FileInfo* info); Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); + std::string BrokerRequestWithTimeout(std::string request_string, bool post_request, Error* err); std::string OpToUriCmd(GetImageServerOperation op); std::string server_uri_; std::string current_broker_uri_; diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index 6d8beba9697d9be7c19c4e3a0654e7c191df891f..49477921af0d5c33188da0b0083b9ee78265fa2a 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -160,6 +160,15 @@ TEST_F(FolderDataBrokerTests, GetNextReturnsFileInfo) { } +TEST_F(FolderDataBrokerTests, GetNDataSets) { + data_broker->Connect(); + Error err; + auto n = data_broker->GetNDataSets(&err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(n, Eq(3)); +} + + TEST_F(FolderDataBrokerTests, GetLastReturnsFileInfo) { data_broker->Connect(); FileInfo fi; @@ -199,6 +208,20 @@ TEST_F(FolderDataBrokerTests, SecondNextReturnsAnotherFileInfo) { ASSERT_THAT(fi.name, Eq("2")); } +TEST_F(FolderDataBrokerTests, SecondNextReturnsSameFileInfoIfReset) { + data_broker->Connect(); + FileInfo fi; + data_broker->GetNext(&fi, "", nullptr); + + auto err = data_broker->ResetCounter(""); + ASSERT_THAT(err, Eq(nullptr)); + + err = data_broker->GetNext(&fi, "", nullptr); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(fi.name, Eq("1")); +} + TEST_F(FolderDataBrokerTests, GetNextFromEmptyFolderReturnsError) { data_broker->io__ = std::unique_ptr<IO> {new IOEmptyFolder()}; data_broker->Connect(); @@ -280,4 +303,28 @@ TEST_F(GetDataFromFileTests, GetNextReturnsErrorWhenCannotAllocateData) { } +TEST_F(FolderDataBrokerTests, GetByIdReturnsFileInfo) { + data_broker->Connect(); + FileInfo fi; + + auto err = data_broker->GetById(1, &fi,nullptr); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(fi.name, Eq("1")); + ASSERT_THAT(fi.size, Eq(100)); + +} + +TEST_F(FolderDataBrokerTests, GetByIdReturnsError) { + data_broker->Connect(); + FileInfo fi; + + auto err1 = data_broker->GetById(0, &fi,nullptr); + auto err2 = data_broker->GetById(10, &fi,nullptr); + + ASSERT_THAT(err1, Ne(nullptr)); + ASSERT_THAT(err2, Ne(nullptr)); +} + + } diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 7c7216e33bba4f5b45c56cad457e71e2ee378f79..047a5ea71391890e0fff9dd30bef01b719a38eb8 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -62,7 +62,7 @@ class ServerDataBrokerTests : public Test { std::string expected_filename = "filename"; std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename; std::string expected_group_id = "groupid"; - + uint64_t expected_dataset_id = 1; static const uint64_t expected_buf_id = 123; void SetUp() override { data_broker = std::unique_ptr<ServerDataBroker> { @@ -373,7 +373,7 @@ TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsErrorCreateGroup) { TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsGroupID) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Post_t(HasSubstr("creategroup"), "", _, _)).WillOnce(DoAll( + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri+"/creategroup?token="+expected_token, "", _, _)).WillOnce(DoAll( SetArgPointee<2>(HttpCode::OK), SetArgPointee<3>(nullptr), Return(expected_group_id))); @@ -385,5 +385,106 @@ TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsGroupID) { ASSERT_THAT(groupid, Eq(expected_group_id)); } +TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) { + MockGetBrokerUri(); + data_broker->SetTimeout(100); + + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/resetcounter?token=" + + expected_token, _,_,_)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(""))); + auto err = data_broker->ResetCounter(expected_group_id); + ASSERT_THAT(err, Eq(nullptr)); +} + + +TEST_F(ServerDataBrokerTests, GetNDataSetsUsesCorrectUri) { + MockGetBrokerUri(); + data_broker->SetTimeout(100); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/size?token=" + + expected_token, _,_)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return("{\"size\":10}"))); + asapo::Error err; + auto size = data_broker->GetNDataSets(&err); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(size, Eq(10)); +} + + +TEST_F(ServerDataBrokerTests, GetNDataSetsErrorOnWrongResponce) { + MockGetBrokerUri(); + data_broker->SetTimeout(100); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/size?token=" + + expected_token, _,_)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::Unauthorized), + SetArgPointee<2>(nullptr), + Return(""))); + asapo::Error err; + auto size = data_broker->GetNDataSets(&err); + ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(size, Eq(0)); +} + + +TEST_F(ServerDataBrokerTests, GetNDataErrorOnWrongParse) { + MockGetBrokerUri(); + data_broker->SetTimeout(100); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/size?token=" + + expected_token, _,_)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return("{\"siz\":10}"))); + asapo::Error err; + auto size = data_broker->GetNDataSets(&err); + ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(size, Eq(0)); +} + +TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) { + MockGetBrokerUri(); + data_broker->SetTimeout(100); + auto to_send = CreateFI(); + auto json = to_send.Json(); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token, _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(json))); + + auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(info.name, Eq(to_send.name)); + +} + +TEST_F(ServerDataBrokerTests, GetByIdReturnsNoData) { + MockGetBrokerUri(); + data_broker->SetTimeout(100); + auto to_send = CreateFI(); + auto json = to_send.Json(); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token, _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::Conflict), + SetArgPointee<2>(nullptr), + Return("{\"id\":1}"))); + + auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); + + ASSERT_THAT(err->GetErrorType(),Eq(asapo::ErrorType::kEndOfFile)); + +} + + }