diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 64dc01e2e9827cf9a15dd46d3887d4897e357530..9346521258eac9cc8610d4d7e0d8ed9ff2d840cd 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -344,13 +344,17 @@ func (db *Mongodb) GetSize(db_name string) ([]byte, error) { return json.Marshal(&rec) } -func (db *Mongodb) ResetCounter(db_name string, group_id string) ([]byte, error) { +func (db *Mongodb) ResetCounter(db_name string, group_id string, id_str string) ([]byte, error) { + id, err := strconv.Atoi(id_str) + if err != nil { + return nil, err + } if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil { return nil, err } - err := db.setCounter(db_name, group_id, 0) + err = db.setCounter(db_name, group_id, id) return []byte(""), err } @@ -422,7 +426,7 @@ func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, ex case "last": return db.GetLastRecord(db_name, group_id, dataset) case "resetcounter": - return db.ResetCounter(db_name, group_id) + return db.ResetCounter(db_name, group_id, extra_param) case "size": return db.GetSize(db_name) case "meta": diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 9038f6e28b05b1c242287b70c6e04532bc46b97b..5b9bf9e20b884f20767a53cb5a0871f87569c9aa 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -308,14 +308,13 @@ func TestMongoDBResetCounter(t *testing.T) { assert.Nil(t, err1) assert.Equal(t, string(rec1_expect), string(res1)) - _, err_reset := db.ProcessRequest(dbname, groupId, "resetcounter", "0") + _, err_reset := db.ProcessRequest(dbname, groupId, "resetcounter", "1") assert.Nil(t, err_reset) res2, err2 := db.ProcessRequest(dbname, groupId, "next", "0") assert.Nil(t, err2) - assert.Equal(t, string(rec1_expect), string(res2)) - + assert.Equal(t, string(rec2_expect), string(res2)) } func TestMongoDBGetMetaOK(t *testing.T) { diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index 6925d89e6500fd7e4fdaf22820b91547eae1f043..76595e4073f186237e83d33ead6f45f50d10dc7e 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -48,7 +48,7 @@ var listRoutes = utils.Routes{ routeQueryImages, }, utils.Route{ - "ResetCounter", + "ResetConter", "Post", "/database/{dbname}/{stream}/{groupid}/resetcounter", routeResetCounter, diff --git a/broker/src/asapo_broker/server/post_reset_counter.go b/broker/src/asapo_broker/server/post_reset_counter.go index fd881f72f1fb81596077ae08f80f69888c9ad9d3..b67934d4e7dde2eea0ca198f8f3368f4eea29179 100644 --- a/broker/src/asapo_broker/server/post_reset_counter.go +++ b/broker/src/asapo_broker/server/post_reset_counter.go @@ -4,6 +4,15 @@ import ( "net/http" ) +func extractRequestParametersValue(r *http.Request) string { + val := r.URL.Query().Get("value") + if len(val) == 0 { + return "0" + } + return val +} + func routeResetCounter(w http.ResponseWriter, r *http.Request) { - processRequest(w, r, "resetcounter", "0", true) + val := extractRequestParametersValue(r) + processRequest(w, r, "resetcounter", val, true) } diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go index 4cff3551b0f540cc73b54e3866f4c52a00e0fb6b..e0b67f29a1092ce9da74a28c5686581688fc0430 100644 --- a/broker/src/asapo_broker/server/post_reset_counter_test.go +++ b/broker/src/asapo_broker/server/post_reset_counter_test.go @@ -33,10 +33,10 @@ func TestResetCounterTestSuite(t *testing.T) { } func (suite *ResetCounterTestSuite) TestResetCounterOK() { - suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "resetcounter", "0").Return([]byte(""), nil) + suite.mock_db.On("ProcessRequest", expectedDBName, expectedGroupID, "resetcounter", "10").Return([]byte(""), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter"))) ExpectCopyClose(suite.mock_db) - w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix, "POST") + w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedStream+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix+"&value=10", "POST") suite.Equal(http.StatusOK, w.Code, "ResetCounter OK") } diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go index d53a67d592cfe4b7788302af4cbfc5d805249e46..2790ce453ad94f025345781c140a8bcfca48c3cf 100644 --- a/broker/src/asapo_broker/server/request_common.go +++ b/broker/src/asapo_broker/server/request_common.go @@ -15,7 +15,6 @@ func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string, func ValueTrue(r *http.Request, key string) bool { val := r.URL.Query().Get(key) - if len(val) == 0 { return false } @@ -23,9 +22,7 @@ func ValueTrue(r *http.Request, key string) bool { if val == "true" { return true } - return false - } func resetRequested(r *http.Request) bool { diff --git a/tests/automatic/worker/worker_api/worker_api.cpp b/tests/automatic/worker/worker_api/worker_api.cpp index 82683d07ba4238ba9dd7ccb07b32afe7729ad45e..7b42092ffe377666fd64e08f2255cf5cfec1765a 100644 --- a/tests/automatic/worker/worker_api/worker_api.cpp +++ b/tests/automatic/worker/worker_api/worker_api.cpp @@ -56,13 +56,27 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str err = broker->GetNext(&fi, group_id, nullptr); M_AssertTrue(err != nullptr, "GetNext2 error"); - err = broker->GetLast(&fi, group_id, nullptr); - M_AssertTrue(err == nullptr, "GetLast2 no error"); + err = broker->SetLastReadMarker(2, group_id); + M_AssertTrue(err == nullptr, "SetLastReadMarker no error"); + err = broker->GetById(8, &fi, group_id, nullptr); M_AssertTrue(err == nullptr, "GetById error"); M_AssertTrue(fi.name == "8", "GetById filename"); + err = broker->GetNext(&fi, group_id, nullptr); + M_AssertTrue(err == nullptr, "GetNext After GetById no error"); + M_AssertTrue(fi.name == "3", "GetNext After GetById filename"); + + + err = broker->GetLast(&fi, group_id, nullptr); + M_AssertTrue(err == nullptr, "GetLast2 no error"); + + + err = broker->SetLastReadMarker(8, group_id); + M_AssertTrue(err == nullptr, "SetLastReadMarker 2 no error"); + + err = broker->GetNext(&fi, group_id, nullptr); M_AssertTrue(err == nullptr, "GetNext3 no error"); M_AssertTrue(fi.name == "9", "GetNext3 filename"); @@ -71,8 +85,8 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str M_AssertTrue(err == nullptr, "GetNDataSets no error"); M_AssertTrue(size == 10, "GetNDataSets size"); - err = broker->ResetCounter(group_id); - M_AssertTrue(err == nullptr, "ResetCounter"); + err = broker->ResetLastReadMarker(group_id); + M_AssertTrue(err == nullptr, "SetLastReadMarker"); err = broker->GetNext(&fi, group_id, nullptr); M_AssertTrue(err == nullptr, "GetNext4 no error"); diff --git a/tests/automatic/worker/worker_api_python/worker_api.py b/tests/automatic/worker/worker_api_python/worker_api.py index c7086e9b3db9c20737597e56ba782ea6aabc6cd0..9badee46776890e5fb32ab97ab668186b3c855bf 100644 --- a/tests/automatic/worker/worker_api_python/worker_api.py +++ b/tests/automatic/worker/worker_api_python/worker_api.py @@ -63,8 +63,8 @@ def check_single(broker,group_id_new): assert_eq(size,5,"get_ndatasets") - err = broker.reset_counter(group_id_new) - assert_noterr(err, "reset_counter") + err = broker.reset_lastread_marker(group_id_new) + assert_noterr(err, "reset_lastread_marker") _, meta, err = broker.get_next(group_id_new, meta_only=True) assert_noterr(err, "get_next4") @@ -79,10 +79,19 @@ def check_single(broker,group_id_new): _, meta, err = broker.get_next(group_id_new, meta_only=True) assert_noterr(err, "get_next5") - assert_metaname(meta,"4","get next5") + assert_metaname(meta,"2","get next5") assert_usermetadata(meta,"get next5") + err = broker.set_lastread_marker(4, group_id_new) + assert_noterr(err, "set_lastread_marker") + + _, meta, err = broker.get_next(group_id_new, meta_only=True) + assert_noterr(err, "get_next6") + assert_metaname(meta,"5","get next6") + assert_usermetadata(meta,"get next6") + + images,err = broker.query_images("meta.test = 10") assert_noterr(err, "query1") assert_eq(len(images),5,"size of query answer 1") @@ -138,7 +147,7 @@ def check_dataset(broker,group_id_new): assert_metaname(metas[2],"8_3","get get_dataset_by_id1 name3") id, metas, err = broker.get_next_dataset(group_id_new) - assert_eq(id,9,"get_next_dataset4 id") + assert_eq(id,None,"get_next_dataset4 id") source, path, beamtime, token, mode = sys.argv[1:] diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index 9629b33898a5a8df3662c42068d01870331fea4b..e82086357c1cef00714f298795ed583218487196 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -19,7 +19,8 @@ class DataBroker { \param group_id - group id to use. \return nullptr of command was successful, otherwise error. */ - virtual Error ResetCounter(std::string group_id) = 0; + virtual Error ResetLastReadMarker(std::string group_id) = 0; + virtual Error SetLastReadMarker(uint64_t value, std::string group_id) = 0; //! Set timeout for broker operations. Default - no timeout virtual void SetTimeout(uint64_t timeout_ms) = 0; diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index cdc0fb1fa518392d5463e3f9632e6b5b85960089..24f574a7b916463201817b22b2e5cad792fffe86 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -90,7 +90,7 @@ std::string FolderDataBroker::GenerateNewGroupId(Error* err) { *err = nullptr; return ""; } -Error FolderDataBroker::ResetCounter(std::string group_id) { +Error FolderDataBroker::ResetLastReadMarker(std::string group_id) { std::lock_guard<std::mutex> lock{mutex_}; current_file_ = -1; return nullptr; @@ -125,6 +125,10 @@ DataSet FolderDataBroker::GetDatasetById(uint64_t id, std::string group_id, Erro *err = TextError("Not supported for folder data broker"); return {0, FileInfos{}}; } - +Error FolderDataBroker::SetLastReadMarker(uint64_t value, std::string group_id) { + std::lock_guard<std::mutex> lock{mutex_}; + current_file_ = value - 1; + return nullptr; +} } diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index 372ef2d514535e7bad5a0c82c348abdd403895dc..662800180aef366c5ee630b48d58346bf646ca9c 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -14,7 +14,8 @@ class FolderDataBroker final : public asapo::DataBroker { public: explicit FolderDataBroker(const std::string& source_name); Error Connect() override; - Error ResetCounter(std::string group_id) override; + Error ResetLastReadMarker(std::string group_id) override; + Error SetLastReadMarker(uint64_t value, std::string group_id) override; Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override {}; // to timeout in this case diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index 6f00f55290af55dad743db0cd0c6c8816600f47a..4dcc2492a019a8400e2bcceac135fd65bc10591f 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -255,7 +255,7 @@ std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Erro if (*err == nullptr) { request.host = current_broker_uri_; *err = ProcessRequest(&response, request); - if (*err == nullptr || (*err)->GetErrorType() == ErrorType::kEndOfFile || (*err) == WorkerErrorTemplates::kWrongInput) { + if (*err == nullptr || (*err) == WorkerErrorTemplates::kWrongInput) { return response; } } @@ -266,10 +266,12 @@ std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Erro return ""; } -Error ServerDataBroker::ResetCounter(std::string group_id) { + +Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move( group_id) + "/resetcounter"; + ri.extra_params = "&value=" + std::to_string(value); ri.post = true; Error err; @@ -277,6 +279,11 @@ Error ServerDataBroker::ResetCounter(std::string group_id) { return err; } + +Error ServerDataBroker::ResetLastReadMarker(std::string group_id) { + return SetLastReadMarker(0, group_id); +} + uint64_t ServerDataBroker::GetNDataSets(Error* err) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/size"; @@ -303,7 +310,6 @@ Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* respon RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/" + std::move( group_id) + "/" + std::to_string(id); - ri.extra_params = "&reset=true"; if (dataset) { ri.extra_params += "&dataset=true"; } @@ -391,5 +397,4 @@ DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, Erro return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), err); } - } diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index e87bb996875ca26d55db715d43da7ae3776f7250..3144e77854f194d794048ed1cfff202780265641 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -29,7 +29,8 @@ class ServerDataBroker final : public asapo::DataBroker { public: explicit ServerDataBroker(std::string server_uri, std::string source_path, SourceCredentials source); Error Connect() override; - Error ResetCounter(std::string group_id) override; + Error ResetLastReadMarker(std::string group_id) override; + Error SetLastReadMarker(uint64_t value, std::string group_id) override; Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; std::string GenerateNewGroupId(Error* err) override; diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index d72d46166bfd0b9e79c7328f9e49054c5898c927..664d34b117ac4f72d91f1f28895646d5d58a2cdc 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -209,7 +209,7 @@ TEST_F(FolderDataBrokerTests, SecondNextReturnsSameFileInfoIfReset) { FileInfo fi; data_broker->GetNext(&fi, "", nullptr); - auto err = data_broker->ResetCounter(""); + auto err = data_broker->ResetLastReadMarker(""); ASSERT_THAT(err, Eq(nullptr)); err = data_broker->GetNext(&fi, "", nullptr); diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 092aa793be10cfb1de36506bfbd3677eb223e4c4..514d6eec079c0aab8fa6b90660bdcdf0a4494b15 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -421,18 +421,31 @@ TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsGroupID) { ASSERT_THAT(groupid, Eq(expected_group_id)); } +TEST_F(ServerDataBrokerTests, ResetCounterByDefaultUsesCorrectUri) { + MockGetBrokerUri(); + data_broker->SetTimeout(100); + + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + expected_group_id + + "/resetcounter?token=" + expected_token + "&value=0", _, _, _)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(""))); + auto err = data_broker->ResetLastReadMarker(expected_group_id); + ASSERT_THAT(err, Eq(nullptr)); +} + TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) { MockGetBrokerUri(); data_broker->SetTimeout(100); EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + expected_group_id + - "/resetcounter?token=" - + expected_token, _, _, _)).WillOnce(DoAll( + "/resetcounter?token=" + expected_token + "&value=10", _, _, _)).WillOnce(DoAll( SetArgPointee<2>(HttpCode::OK), SetArgPointee<3>(nullptr), Return(""))); - auto err = data_broker->ResetCounter(expected_group_id); + auto err = data_broker->SetLastReadMarker(10, expected_group_id); ASSERT_THAT(err, Eq(nullptr)); } @@ -494,7 +507,7 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) { expected_group_id + "/" + std::to_string( expected_dataset_id) + "?token=" - + expected_token + "&reset=true", _, + + expected_token, _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -504,7 +517,23 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) { ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(info.name, Eq(to_send.name)); +} + +TEST_F(ServerDataBrokerTests, GetByIdTimeouts) { + MockGetBrokerUri(); + data_broker->SetTimeout(10); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + + expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token, _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::Conflict), + SetArgPointee<2>(nullptr), + Return(""))); + + auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr); + + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kTimeout)); } TEST_F(ServerDataBrokerTests, GetMetaDataOK) { @@ -719,7 +748,7 @@ TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) { EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" + expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token=" - + expected_token + "&reset=true&dataset=true", _, + + expected_token + "&dataset=true", _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), diff --git a/worker/api/python/asapo_worker.pxd b/worker/api/python/asapo_worker.pxd index 3146e51f797daf4e841d3956bdd2d39cefe36261..eea83b8c84cf3e65cabbf46e3f7dd7f0e2009a9b 100644 --- a/worker/api/python/asapo_worker.pxd +++ b/worker/api/python/asapo_worker.pxd @@ -43,7 +43,8 @@ cdef extern from "asapo_worker.h" namespace "asapo" nogil: Error GetLast(FileInfo* info, string group_id, FileData* data) Error GetById(uint64_t id, FileInfo* info, string group_id, FileData* data) uint64_t GetNDataSets(Error* err) - Error ResetCounter(string group_id) + Error SetLastReadMarker(uint64_t value, string group_id) + Error ResetLastReadMarker(string group_id) string GenerateNewGroupId(Error* err) string GetBeamtimeMeta(Error* err) FileInfos QueryImages(string query, Error* err) diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in index cd19e16d5304de8fceef47e88edced7c12041bbb..8ea255691fffb1505fdc541454172a3af121e857 100644 --- a/worker/api/python/asapo_worker.pyx.in +++ b/worker/api/python/asapo_worker.pyx.in @@ -93,11 +93,22 @@ cdef class PyDataBroker: return None,err_str else: return size,None - def reset_counter(self,group_id): + def set_lastread_marker(self,value,group_id): cdef string b_group_id = _bytes(group_id) cdef Error err + cdef uint64_t id = value with nogil: - err = self.c_broker.ResetCounter(b_group_id) + err = self.c_broker.SetLastReadMarker(id,b_group_id) + err_str = _str(GetErrorString(&err)) + if err_str.strip(): + return err_str + else: + return None + def reset_lastread_marker(self,group_id): + cdef string b_group_id = _bytes(group_id) + cdef Error err + with nogil: + err = self.c_broker.ResetLastReadMarker(b_group_id) err_str = _str(GetErrorString(&err)) if err_str.strip(): return err_str