From a7a1c5501e2042386df313ee3790323d74e697eb Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Thu, 4 Apr 2019 13:52:57 +0200 Subject: [PATCH] more work --- broker/src/asapo_broker/database/mongodb.go | 58 +++++++++++++--- .../src/asapo_broker/database/mongodb_test.go | 60 +++++++++++++--- broker/src/asapo_broker/server/get_id.go | 2 +- broker/src/asapo_broker/server/get_id_test.go | 14 +++- broker/src/asapo_broker/server/listroutes.go | 8 ++- .../asapo_broker/server/post_reset_counter.go | 9 +++ .../server/post_reset_counter_test.go | 42 ++++++++++++ .../asapo_broker/server/process_request.go | 4 ++ .../src/asapo_broker/server/request_common.go | 14 ++++ common/cpp/src/system_io/system_io_mac.cpp | 6 +- worker/api/cpp/include/worker/data_broker.h | 44 ++++++------ worker/api/cpp/src/folder_data_broker.cpp | 2 +- worker/api/cpp/src/folder_data_broker.h | 5 +- worker/api/cpp/src/server_data_broker.cpp | 30 ++++---- worker/api/cpp/src/server_data_broker.h | 6 +- .../api/cpp/unittests/test_folder_broker.cpp | 6 +- .../api/cpp/unittests/test_server_broker.cpp | 68 ++++++++++--------- 17 files changed, 274 insertions(+), 104 deletions(-) create mode 100644 broker/src/asapo_broker/server/post_reset_counter.go create mode 100644 broker/src/asapo_broker/server/post_reset_counter_test.go diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 1890dbeb8..951f77ee6 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -29,6 +29,10 @@ const already_connected_msg = "already connected" var dbListLock sync.RWMutex var dbPointersLock sync.RWMutex +type SizeRecord struct { + Size int `bson:"size" json:"size"` +} + type Mongodb struct { session *mgo.Session timeout time.Duration @@ -161,7 +165,8 @@ func (db *Mongodb) incrementField(dbname string, group_id string, max_ind int, r return err } -func (db *Mongodb) GetRecordByID(dbname string, id int, returnID bool) ([]byte, error) { +func (db *Mongodb) GetRecordByIDRow(dbname string, id int, returnID bool) ([]byte, error) { + var res map[string]interface{} q := bson.M{"_id": id} c := db.session.DB(dbname).C(data_collection_name) @@ -170,22 +175,35 @@ func (db *Mongodb) GetRecordByID(dbname string, id int, returnID bool) ([]byte, var r = struct { Id int `json:"id""` }{id} - res, _ := json.Marshal(&r) + answer, _ := json.Marshal(&r) log_str := "error getting record id " + strconv.Itoa(id) + " for " + dbname + " : " + err.Error() logger.Debug(log_str) if returnID { - return nil, &DBError{utils.StatusNoData, string(res)} + return nil, &DBError{utils.StatusNoData, string(answer)} } else { return nil, &DBError{utils.StatusNoData, err.Error()} } } - log_str := "got record id " + strconv.Itoa(id) + " for " + dbname logger.Debug(log_str) return utils.MapToJson(&res) } +func (db *Mongodb) GetRecordByID(dbname string, group_id string, id int, returnID bool, reset bool) ([]byte, error) { + + if err := db.checkDatabaseOperationPrerequisites(dbname, group_id); err != nil { + return nil, err + } + res, err := db.GetRecordByIDRow(dbname, id, returnID) + + if reset { + db.setCounter(dbname, group_id, id) + } + + return res, err +} + func (db *Mongodb) needCreateLocationPointersInDb(group_id string) bool { dbPointersLock.RLock() needCreate := !db.db_pointers_created[group_id] @@ -226,8 +244,9 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id return &DBError{utils.StatusWrongInput, err.Error()} } - db.getParentDB().generateLocationPointersInDbIfNeeded(db_name, group_id) - + if len(group_id) > 0 { + db.getParentDB().generateLocationPointersInDbIfNeeded(db_name, group_id) + } return nil } @@ -259,7 +278,7 @@ func (db *Mongodb) GetNextRecord(db_name string, group_id string) ([]byte, error } log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id logger.Debug(log_str) - return db.GetRecordByID(db_name, curPointer.Value, true) + return db.GetRecordByIDRow(db_name, curPointer.Value, true) } @@ -275,21 +294,42 @@ func (db *Mongodb) GetLastRecord(db_name string, group_id string) ([]byte, error logger.Debug(log_str) return nil, err } - res, err := db.GetRecordByID(db_name, max_ind, false) + res, err := db.GetRecordByIDRow(db_name, max_ind, false) db.setCounter(db_name, group_id, max_ind) return res, err } +func (db *Mongodb) GetSize(db_name string) ([]byte, error) { + + if err := db.checkDatabaseOperationPrerequisites(db_name, ""); err != nil { + return nil, err + } + + c := db.session.DB(db_name).C(data_collection_name) + var rec SizeRecord + var err error + rec.Size, err = c.Count() + if err != nil { + return nil, err + } + return json.Marshal(&rec) +} + func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, id int) (answer []byte, err error) { switch op { case "next": return db.GetNextRecord(db_name, group_id) case "id": - return db.GetRecordByID(db_name, id, true) + return db.GetRecordByID(db_name, group_id, id, true, false) + case "idreset": + return db.GetRecordByID(db_name, group_id, id, true, true) case "last": return db.GetLastRecord(db_name, group_id) + case "size": + return db.GetSize(db_name) } + return nil, errors.New("Wrong db operation: " + op) } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index b74c37ba7..19a5db2bf 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -15,11 +15,6 @@ type TestRecord struct { FName string `bson:"fname" json:"fname"` } -type TestSizeRecord struct { - Size int `bson:"size" json:"size"` -} - - var db Mongodb const dbname = "run1" @@ -33,9 +28,10 @@ var rec1_expect, _ = json.Marshal(rec1) var rec2_expect, _ = json.Marshal(rec2) var rec3_expect, _ = json.Marshal(rec3) -var recs1 = TestSizeRecord{3} +var recs1 = SizeRecord{3} var recs1_expect, _ = json.Marshal(recs1) - +var recs2 = SizeRecord{0} +var recs2_expect, _ = json.Marshal(recs2) func cleanup() { db.DeleteAllRecords(dbname) @@ -169,7 +165,7 @@ func TestMongoDBGetRecordByID(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - res, err := db.GetRecordByID(dbname, 1, true) + res, err := db.GetRecordByID(dbname, "", 1, true, false) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -178,7 +174,7 @@ func TestMongoDBGetRecordByIDFails(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - _, err := db.GetRecordByID(dbname, 2, true) + _, err := db.GetRecordByID(dbname, "", 2, true, false) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"id\":2}", err.Error()) } @@ -238,12 +234,54 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) { } -/* func TestMongoDBGetSize(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) + db.InsertRecord(dbname, &rec2) + db.InsertRecord(dbname, &rec3) + res, err := db.ProcessRequest(dbname, "", "size", 0) assert.Nil(t, err) assert.Equal(t, string(recs1_expect), string(res)) -}*/ \ No newline at end of file +} + +func TestMongoDBGetSizeNoRecords(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + // to have empty collection + db.InsertRecord(dbname, &rec1) + db.session.DB(dbname).C(data_collection_name).RemoveId(1) + + res, err := db.ProcessRequest(dbname, "", "size", 0) + assert.Nil(t, err) + assert.Equal(t, string(recs2_expect), string(res)) +} + +func TestMongoDBGetSizeNoDatabase(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + _, err := db.ProcessRequest(dbname, "", "size", 0) + assert.NotNil(t, err) +} + +func TestMongoDBGetRecordIDWithReset(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.InsertRecord(dbname, &rec1) + db.InsertRecord(dbname, &rec2) + + res1, err1 := db.ProcessRequest(dbname, groupId, "idreset", 1) + res2, err2 := db.ProcessRequest(dbname, groupId, "next", 0) + + assert.Nil(t, err1) + assert.Equal(t, string(rec1_expect), string(res1)) + assert.Nil(t, err2) + assert.Equal(t, string(rec2_expect), string(res2)) + +} + +func TestMongoDBGetRecordByIDNotConnected(t *testing.T) { + _, err := db.GetRecordByID(dbname, "", 2, true, false) + assert.Equal(t, utils.StatusError, err.(*DBError).Code) +} diff --git a/broker/src/asapo_broker/server/get_id.go b/broker/src/asapo_broker/server/get_id.go index d5b888dd5..da4fd7ee4 100644 --- a/broker/src/asapo_broker/server/get_id.go +++ b/broker/src/asapo_broker/server/get_id.go @@ -22,5 +22,5 @@ func routeGetByID(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - processRequest(w, r, "id", id, false) + processRequest(w, r, "id", id, true) } diff --git a/broker/src/asapo_broker/server/get_id_test.go b/broker/src/asapo_broker/server/get_id_test.go index 984f1f3f1..3330c8184 100644 --- a/broker/src/asapo_broker/server/get_id_test.go +++ b/broker/src/asapo_broker/server/get_id_test.go @@ -45,11 +45,21 @@ func TestGetIDTestSuite(t *testing.T) { } func (suite *GetIDTestSuite) TestGetIdCallsCorrectRoutine() { - suite.mock_db.On("ProcessRequest", expectedBeamtimeId, "", "id", 1).Return([]byte("Hello"), nil) + suite.mock_db.On("ProcessRequest", expectedBeamtimeId, expectedGroupID, "id", 1).Return([]byte("Hello"), nil) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request"))) ExpectCopyClose(suite.mock_db) - w := doRequest("/database/" + expectedBeamtimeId + "/1" + correctTokenSuffix) + w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/1" + correctTokenSuffix) + suite.Equal(http.StatusOK, w.Code, "GetImage OK") + suite.Equal("Hello", string(w.Body.Bytes()), "GetID sends data") +} + +func (suite *GetIDTestSuite) TestGetIdWithResetCallsCorrectRoutine() { + suite.mock_db.On("ProcessRequest", expectedBeamtimeId, expectedGroupID, "idreset", 1).Return([]byte("Hello"), nil) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request"))) + ExpectCopyClose(suite.mock_db) + + w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/1" + correctTokenSuffix + "&reset=true") suite.Equal(http.StatusOK, w.Code, "GetImage OK") suite.Equal("Hello", string(w.Body.Bytes()), "GetID sends data") } diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index cfd0a4b86..750ee8c40 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -26,7 +26,7 @@ var listRoutes = utils.Routes{ utils.Route{ "GetID", "Get", - "/database/{dbname}/{id}", + "/database/{dbname}/{groupid}/{id}", routeGetByID, }, utils.Route{ @@ -35,6 +35,12 @@ var listRoutes = utils.Routes{ "/creategroup", routeCreateGroupID, }, + utils.Route{ + "ResetCounter", + "Post", + "/database/{dbname}/{groupid}/resetcounter", + routeResetCounter, + }, utils.Route{ "Health", "Get", diff --git a/broker/src/asapo_broker/server/post_reset_counter.go b/broker/src/asapo_broker/server/post_reset_counter.go new file mode 100644 index 000000000..d93b600ad --- /dev/null +++ b/broker/src/asapo_broker/server/post_reset_counter.go @@ -0,0 +1,9 @@ +package server + +import ( + "net/http" +) + +func routeResetCounter(w http.ResponseWriter, r *http.Request) { + processRequest(w, r, "resetcounter", 0, true) +} diff --git a/broker/src/asapo_broker/server/post_reset_counter_test.go b/broker/src/asapo_broker/server/post_reset_counter_test.go new file mode 100644 index 000000000..8db63c421 --- /dev/null +++ b/broker/src/asapo_broker/server/post_reset_counter_test.go @@ -0,0 +1,42 @@ +package server + +import ( + "asapo_broker/database" + "asapo_common/logger" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "net/http" + "testing" +) + +type ResetCounterTestSuite struct { + suite.Suite + mock_db *database.MockedDatabase +} + +func (suite *ResetCounterTestSuite) SetupTest() { + statistics.Reset() + suite.mock_db = new(database.MockedDatabase) + db = suite.mock_db + prepareTestAuth() + logger.SetMockLog() +} + +func (suite *ResetCounterTestSuite) TearDownTest() { + assertExpectations(suite.T(), suite.mock_db) + logger.UnsetMockLog() + db = nil +} + +func TestResetCounterTestSuite(t *testing.T) { + suite.Run(t, new(ResetCounterTestSuite)) +} + +func (suite *ResetCounterTestSuite) TestResetCounterOK() { + suite.mock_db.On("ProcessRequest", expectedBeamtimeId, expectedGroupID, "resetcounter", 0).Return([]byte(""), nil) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request resetcounter"))) + ExpectCopyClose(suite.mock_db) + + w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedGroupID+"/resetcounter"+correctTokenSuffix, "POST") + suite.Equal(http.StatusOK, w.Code, "ResetCounter OK") +} diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index 31a422c86..0e4f37903 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -55,6 +55,10 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, id int, n return } + if op == "id" && resetRequested(r) { + op = "idreset" + } + answer, code := processRequestInDb(db_name, group_id, op, id) w.WriteHeader(code) w.Write(answer) diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go index 775ddd9bd..53ae802ff 100644 --- a/broker/src/asapo_broker/server/request_common.go +++ b/broker/src/asapo_broker/server/request_common.go @@ -13,6 +13,20 @@ func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string, w.Write([]byte(err)) } +func resetRequested(r *http.Request) bool { + val := r.URL.Query().Get("reset") + + if len(val) == 0 { + return false + } + + if val == "true" { + return true + } + + return false +} + func testAuth(r *http.Request, beamtime_id string) error { token_got := r.URL.Query().Get("token") diff --git a/common/cpp/src/system_io/system_io_mac.cpp b/common/cpp/src/system_io/system_io_mac.cpp index 0ebfa3f3e..6d70c051a 100644 --- a/common/cpp/src/system_io/system_io_mac.cpp +++ b/common/cpp/src/system_io/system_io_mac.cpp @@ -21,9 +21,9 @@ using std::chrono::system_clock; namespace asapo { ListSocketDescriptors SystemIO::WaitSocketsActivity(SocketDescriptor master_socket, - ListSocketDescriptors* sockets_to_listen, - std::vector<std::string>* new_connections, - Error* err) const { + ListSocketDescriptors* sockets_to_listen, + std::vector<std::string>* new_connections, + Error* err) const { fd_set readfds; ListSocketDescriptors active_sockets; bool client_activity = false; diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index 17731f0fd..042f85478 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -31,14 +31,14 @@ class DataBroker { //! Connect to the data source - will scan file folders or connect to the database. // TODO: do we need this? virtual Error Connect() = 0; - //! Reset counter for the specific group. - /*! - \param group_id - group id to use. - \return nullptr of command was successful, otherwise error. - */ - virtual Error ResetCounter(std::string group_id) = 0; - - //! Set timeout for broker operations. Default - no timeout + //! Reset counter for the specific group. + /*! + \param group_id - group id to use. + \return nullptr of command was successful, otherwise error. + */ + virtual Error ResetCounter(std::string group_id) = 0; + + //! Set timeout for broker operations. Default - no timeout virtual void SetTimeout(uint64_t timeout_ms) = 0; @@ -49,11 +49,11 @@ class DataBroker { */ virtual uint64_t GetNDataSets(Error* err) = 0; - //! Generate new GroupID. - /*! - \param err - return nullptr of operation succeed, error otherwise. - \return group ID. - */ + //! Generate new GroupID. + /*! + \param err - return nullptr of operation succeed, error otherwise. + \return group ID. + */ virtual std::string GenerateNewGroupId(Error* err) = 0; @@ -67,17 +67,17 @@ class DataBroker { virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0; - //! Receive dataset by id. - /*! - \param id - dataset id - \param info - where to store image metadata. Can be set to nullptr only image data is needed. - \param data - where to store image data. Can be set to nullptr only image metadata is needed. - \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. - */ - virtual Error GetById(uint64_t id,FileInfo* info, FileData* data) = 0; + //! Receive dataset by id. + /*! + \param id - dataset id + \param info - where to store image metadata. Can be set to nullptr only image data is needed. + \param data - where to store image data. Can be set to nullptr only image metadata is needed. + \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. + */ + virtual Error GetById(uint64_t id, FileInfo* info, FileData* data) = 0; - //! Receive last available image. + //! Receive last available image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. \param group_id - group id to use. diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index 67a549da9..629d03689 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -90,7 +90,7 @@ uint64_t FolderDataBroker::GetNDataSets(Error* err) { } Error FolderDataBroker::GetById(uint64_t id, FileInfo* info, FileData* data) { - return GetFileByIndex(id -1 , info, data); + return GetFileByIndex(id - 1 , info, data); } } diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index b360c285e..8c06cde2b 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -18,9 +18,10 @@ class FolderDataBroker final : public asapo::DataBroker { Error GetNext(FileInfo* info, std::string group_id, FileData* data) override; Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override {}; // to timeout in this case - std::string GenerateNewGroupId(Error* err) override; // return "0" always and no error - no group ids for folder datra broker + std::string GenerateNewGroupId(Error* err) + override; // return "0" always and no error - no group ids for folder datra broker uint64_t GetNDataSets(Error* err) override; - Error GetById(uint64_t id,FileInfo* info, FileData* data) override; + Error GetById(uint64_t id, FileInfo* info, FileData* data) override; std::unique_ptr<asapo::IO> io__; // modified in testings to mock system calls,otherwise do not touch private: std::string base_path_; diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index 70167e66c..6435c6931 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -88,13 +88,13 @@ std::string ServerDataBroker::RequestWithToken(std::string uri) { return std::move(uri) + "?token=" + token_; } -Error ServerDataBroker::ProcessRequest(std::string* response, std::string request_uri, bool post) { +Error ServerDataBroker::ProcessRequest(std::string* response, std::string request_uri, std::string extra_params, bool post) { Error err; HttpCode code; if (post) { - *response = httpclient__->Post(RequestWithToken(request_uri), "", &code, &err); + *response = httpclient__->Post(RequestWithToken(request_uri)+extra_params, "", &code, &err); } else { - *response = httpclient__->Get(RequestWithToken(request_uri), &code, &err); + *response = httpclient__->Get(RequestWithToken(request_uri)+extra_params, &code, &err); } if (err != nullptr) { current_broker_uri_ = ""; @@ -110,7 +110,7 @@ Error ServerDataBroker::GetBrokerUri() { std::string request_uri = server_uri_ + "/discovery/broker"; Error err; - err = ProcessRequest(¤t_broker_uri_, request_uri, false); + err = ProcessRequest(¤t_broker_uri_, request_uri, "", false); if (err != nullptr || current_broker_uri_.empty()) { current_broker_uri_ = ""; return TextError("cannot get broker uri from " + server_uri_); @@ -127,7 +127,7 @@ Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, std::string group_ auto err = GetBrokerUri(); if (err == nullptr) { std::string request_api = current_broker_uri_ + "/database/" + source_name_ + "/"; - err = ProcessRequest(&response, request_api + request_suffix, false); + err = ProcessRequest(&response, request_api + request_suffix, "", false); if (err == nullptr) { break; } @@ -209,16 +209,17 @@ Error ServerDataBroker::TryGetDataFromBuffer(const FileInfo* info, FileData* dat std::string ServerDataBroker::GenerateNewGroupId(Error* err) { - return BrokerRequestWithTimeout("creategroup",true,err); + return BrokerRequestWithTimeout("creategroup","", true, err); } -std::string ServerDataBroker::BrokerRequestWithTimeout(std::string request_string, bool post_request, Error* err) { +std::string ServerDataBroker::BrokerRequestWithTimeout(std::string request_string, std::string extra_params, + bool post_request, Error* err) { uint64_t elapsed_ms = 0; std::string response; while (elapsed_ms <= timeout_ms_) { *err = GetBrokerUri(); if (*err == nullptr) { - *err = ProcessRequest(&response, current_broker_uri_ + "/" + request_string, post_request); + *err = ProcessRequest(&response, current_broker_uri_ + "/" + request_string,extra_params, post_request); if (*err == nullptr || (*err)->GetErrorType() == ErrorType::kEndOfFile) { return response; } @@ -231,15 +232,15 @@ std::string ServerDataBroker::BrokerRequestWithTimeout(std::string request_strin } Error ServerDataBroker::ResetCounter(std::string group_id) { - std::string request_string = "database/" + source_name_+"/"+std::move(group_id) + "/resetcounter"; + std::string request_string = "database/" + source_name_ + "/" + std::move(group_id) + "/resetcounter"; Error err; - BrokerRequestWithTimeout(request_string,true,&err); + BrokerRequestWithTimeout(request_string,"", true, &err); return err; } uint64_t ServerDataBroker::GetNDataSets(Error* err) { - std::string request_string = "database/" + source_name_+"/size"; - auto responce = BrokerRequestWithTimeout(request_string,false,err); + std::string request_string = "database/" + source_name_ + "/size"; + auto responce = BrokerRequestWithTimeout(request_string,"", false, err); if (*err) { return 0; } @@ -252,9 +253,10 @@ uint64_t ServerDataBroker::GetNDataSets(Error* err) { } Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, FileData* data) { - std::string request_string = "database/" + source_name_+"/"+std::to_string(id); + std::string request_string = "database/" + source_name_ + "/" + std::to_string(id); + std::string extra_params = "&reset=true"; Error err; - auto responce = BrokerRequestWithTimeout(request_string,false,&err); + auto responce = BrokerRequestWithTimeout(request_string, extra_params, false, &err); if (err) { return err; } diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index b823b1984..9a3eeb9bd 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -24,7 +24,7 @@ class ServerDataBroker final : public asapo::DataBroker { Error GetLast(FileInfo* info, std::string group_id, FileData* data) override; std::string GenerateNewGroupId(Error* err) override; uint64_t GetNDataSets(Error* err) override; - Error GetById(uint64_t id,FileInfo* info, FileData* data) override; + Error GetById(uint64_t id, FileInfo* info, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch std::unique_ptr<HttpClient> httpclient__; @@ -35,11 +35,11 @@ class ServerDataBroker final : public asapo::DataBroker { Error GetDataIfNeeded(FileInfo* info, FileData* data); Error GetBrokerUri(); void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri); - Error ProcessRequest(std::string* response, std::string request_uri, bool post); + Error ProcessRequest(std::string* response, std::string request_uri,std::string extra_params, bool post); Error GetImageFromServer(GetImageServerOperation op, std::string group_id, FileInfo* info, FileData* data); bool DataCanBeInBuffer(const FileInfo* info); Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); - std::string BrokerRequestWithTimeout(std::string request_string, bool post_request, Error* err); + std::string BrokerRequestWithTimeout(std::string request_string,std::string extra_params, bool post_request, Error* err); std::string OpToUriCmd(GetImageServerOperation op); std::string server_uri_; std::string current_broker_uri_; diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index 49477921a..dc415e3cc 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -307,7 +307,7 @@ TEST_F(FolderDataBrokerTests, GetByIdReturnsFileInfo) { data_broker->Connect(); FileInfo fi; - auto err = data_broker->GetById(1, &fi,nullptr); + auto err = data_broker->GetById(1, &fi, nullptr); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(fi.name, Eq("1")); @@ -319,8 +319,8 @@ TEST_F(FolderDataBrokerTests, GetByIdReturnsError) { data_broker->Connect(); FileInfo fi; - auto err1 = data_broker->GetById(0, &fi,nullptr); - auto err2 = data_broker->GetById(10, &fi,nullptr); + auto err1 = data_broker->GetById(0, &fi, nullptr); + auto err2 = data_broker->GetById(10, &fi, nullptr); ASSERT_THAT(err1, Ne(nullptr)); ASSERT_THAT(err2, Ne(nullptr)); diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 047a5ea71..68ad645f4 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -373,10 +373,11 @@ TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsErrorCreateGroup) { TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsGroupID) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri+"/creategroup?token="+expected_token, "", _, _)).WillOnce(DoAll( - SetArgPointee<2>(HttpCode::OK), - SetArgPointee<3>(nullptr), - Return(expected_group_id))); + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/creategroup?token=" + expected_token, "", _, + _)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(expected_group_id))); data_broker->SetTimeout(100); asapo::Error err; @@ -389,11 +390,12 @@ TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) { MockGetBrokerUri(); data_broker->SetTimeout(100); - EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/resetcounter?token=" - + expected_token, _,_,_)).WillOnce(DoAll( - SetArgPointee<2>(HttpCode::OK), - SetArgPointee<3>(nullptr), - Return(""))); + EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + + "/resetcounter?token=" + + expected_token, _, _, _)).WillOnce(DoAll( + SetArgPointee<2>(HttpCode::OK), + SetArgPointee<3>(nullptr), + Return(""))); auto err = data_broker->ResetCounter(expected_group_id); ASSERT_THAT(err, Eq(nullptr)); } @@ -404,10 +406,10 @@ TEST_F(ServerDataBrokerTests, GetNDataSetsUsesCorrectUri) { data_broker->SetTimeout(100); EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/size?token=" - + expected_token, _,_)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return("{\"size\":10}"))); + + expected_token, _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return("{\"size\":10}"))); asapo::Error err; auto size = data_broker->GetNDataSets(&err); ASSERT_THAT(err, Eq(nullptr)); @@ -420,10 +422,10 @@ TEST_F(ServerDataBrokerTests, GetNDataSetsErrorOnWrongResponce) { data_broker->SetTimeout(100); EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/size?token=" - + expected_token, _,_)).WillRepeatedly(DoAll( - SetArgPointee<1>(HttpCode::Unauthorized), - SetArgPointee<2>(nullptr), - Return(""))); + + expected_token, _, _)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::Unauthorized), + SetArgPointee<2>(nullptr), + Return(""))); asapo::Error err; auto size = data_broker->GetNDataSets(&err); ASSERT_THAT(err, Ne(nullptr)); @@ -436,10 +438,10 @@ TEST_F(ServerDataBrokerTests, GetNDataErrorOnWrongParse) { data_broker->SetTimeout(100); EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/size?token=" - + expected_token, _,_)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return("{\"siz\":10}"))); + + expected_token, _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return("{\"siz\":10}"))); asapo::Error err; auto size = data_broker->GetNDataSets(&err); ASSERT_THAT(err, Ne(nullptr)); @@ -452,12 +454,13 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) { auto to_send = CreateFI(); auto json = to_send.Json(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string(expected_dataset_id) + "?token=" - + expected_token, _, + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string( + expected_dataset_id) + "?token=" + + expected_token+"&reset=true", _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(json))); + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(json))); auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); @@ -472,16 +475,17 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsNoData) { auto to_send = CreateFI(); auto json = to_send.Json(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string(expected_dataset_id) + "?token=" - + expected_token, _, + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + std::to_string( + expected_dataset_id) + "?token=" + + expected_token+"&reset=true", _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::Conflict), - SetArgPointee<2>(nullptr), - Return("{\"id\":1}"))); + SetArgPointee<1>(HttpCode::Conflict), + SetArgPointee<2>(nullptr), + Return("{\"id\":1}"))); auto err = data_broker->GetById(expected_dataset_id, &info, nullptr); - ASSERT_THAT(err->GetErrorType(),Eq(asapo::ErrorType::kEndOfFile)); + ASSERT_THAT(err->GetErrorType(), Eq(asapo::ErrorType::kEndOfFile)); } -- GitLab