diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index d20b44223afe5c76bcb971173ca187db4d6d761a..64dc01e2e9827cf9a15dd46d3887d4897e357530 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -10,6 +10,7 @@ import ( "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "strconv" + "strings" "sync" "time" ) @@ -132,10 +133,16 @@ func (db *Mongodb) InsertMeta(dbname string, s interface{}) error { return c.Insert(s) } -func (db *Mongodb) getMaxIndex(dbname string) (max_id int, err error) { +func (db *Mongodb) getMaxIndex(dbname string, dataset bool) (max_id int, err error) { c := db.session.DB(dbname).C(data_collection_name) var id Pointer - err = c.Find(nil).Sort("-_id").Select(bson.M{"_id": 1}).One(&id) + var q bson.M + if dataset { + q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}} + } else { + q = nil + } + err = c.Find(q).Sort("-_id").Select(bson.M{"_id": 1}).One(&id) if err != nil { return 0, nil } @@ -176,9 +183,15 @@ func (db *Mongodb) incrementField(dbname string, group_id string, max_ind int, r return err } -func (db *Mongodb) GetRecordByIDRow(dbname string, id int, returnID bool) ([]byte, error) { +func (db *Mongodb) GetRecordByIDRow(dbname string, id int, returnID bool, dataset bool) ([]byte, error) { var res map[string]interface{} - q := bson.M{"_id": id} + var q bson.M + if dataset { + q = bson.M{"$and": []bson.M{bson.M{"_id": id}, bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}}}} + } else { + q = bson.M{"_id": id} + } + c := db.session.DB(dbname).C(data_collection_name) err := c.Find(q).One(&res) if err != nil { @@ -200,7 +213,7 @@ func (db *Mongodb) GetRecordByIDRow(dbname string, id int, returnID bool) ([]byt return utils.MapToJson(&res) } -func (db *Mongodb) GetRecordByID(dbname string, group_id string, id_str string, returnID bool, reset bool) ([]byte, error) { +func (db *Mongodb) GetRecordByID(dbname string, group_id string, id_str string, returnID bool, reset bool, dataset bool) ([]byte, error) { id, err := strconv.Atoi(id_str) if err != nil { return nil, err @@ -209,7 +222,7 @@ func (db *Mongodb) GetRecordByID(dbname string, group_id string, id_str string, if err := db.checkDatabaseOperationPrerequisites(dbname, group_id); err != nil { return nil, err } - res, err := db.GetRecordByIDRow(dbname, id, returnID) + res, err := db.GetRecordByIDRow(dbname, id, returnID, dataset) if reset { db.setCounter(dbname, group_id, id) @@ -265,7 +278,7 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string, group_id } func (db *Mongodb) getCurrentPointer(db_name string, group_id string) (Pointer, error) { - max_ind, err := db.getMaxIndex(db_name) + max_ind, err := db.getMaxIndex(db_name, false) if err != nil { return Pointer{}, err } @@ -278,7 +291,7 @@ func (db *Mongodb) getCurrentPointer(db_name string, group_id string) (Pointer, return curPointer, nil } -func (db *Mongodb) GetNextRecord(db_name string, group_id string) ([]byte, error) { +func (db *Mongodb) GetNextRecord(db_name string, group_id string, dataset bool) ([]byte, error) { if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil { return nil, err @@ -292,23 +305,23 @@ func (db *Mongodb) GetNextRecord(db_name string, group_id string) ([]byte, error } log_str := "got next pointer " + strconv.Itoa(curPointer.Value) + " for " + db_name + ", groupid: " + group_id logger.Debug(log_str) - return db.GetRecordByIDRow(db_name, curPointer.Value, true) + return db.GetRecordByIDRow(db_name, curPointer.Value, true, dataset) } -func (db *Mongodb) GetLastRecord(db_name string, group_id string) ([]byte, error) { +func (db *Mongodb) GetLastRecord(db_name string, group_id string, dataset bool) ([]byte, error) { if err := db.checkDatabaseOperationPrerequisites(db_name, group_id); err != nil { return nil, err } - max_ind, err := db.getMaxIndex(db_name) + max_ind, err := db.getMaxIndex(db_name, dataset) if err != nil { log_str := "error getting last pointer for " + db_name + ", groupid: " + group_id + ":" + err.Error() logger.Debug(log_str) return nil, err } - res, err := db.GetRecordByIDRow(db_name, max_ind, false) + res, err := db.GetRecordByIDRow(db_name, max_ind, false, dataset) db.setCounter(db_name, group_id, max_ind) @@ -394,15 +407,20 @@ func (db *Mongodb) queryImages(dbname string, query string) ([]byte, error) { } func (db *Mongodb) ProcessRequest(db_name string, group_id string, op string, extra_param string) (answer []byte, err error) { + dataset := false + if strings.HasSuffix(op, "_dataset") { + dataset = true + op = op[:len(op)-8] + } switch op { case "next": - return db.GetNextRecord(db_name, group_id) + return db.GetNextRecord(db_name, group_id, dataset) case "id": - return db.GetRecordByID(db_name, group_id, extra_param, true, false) + return db.GetRecordByID(db_name, group_id, extra_param, true, false, dataset) case "idreset": - return db.GetRecordByID(db_name, group_id, extra_param, true, true) + return db.GetRecordByID(db_name, group_id, extra_param, true, true, dataset) case "last": - return db.GetLastRecord(db_name, group_id) + return db.GetLastRecord(db_name, group_id, dataset) case "resetcounter": return db.ResetCounter(db_name, group_id) case "size": diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 980a53fe8eb3ff66bd2eccbb38337e7dbbf15884..1002472707de1994a802a402b187741943dbe7ab 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -17,6 +17,12 @@ type TestRecord struct { FName string `bson:"fname" json:"fname"` } +type TestDataset struct { + ID int `bson:"_id" json:"_id"` + Size int `bson:"size" json:"size"` + Images []TestRecord `bson:"images" json:"images"` +} + var db Mongodb const dbname = "run1" @@ -60,14 +66,14 @@ func TestMongoDBConnectOK(t *testing.T) { } func TestMongoDBGetNextErrorWhenNotConnected(t *testing.T) { - _, err := db.GetNextRecord("", groupId) + _, err := db.GetNextRecord("", groupId, false) assert.Equal(t, utils.StatusError, err.(*DBError).Code) } func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) { db.Connect(dbaddress) defer cleanup() - _, err := db.GetNextRecord("", groupId) + _, err := db.GetNextRecord("", groupId, false) assert.Equal(t, utils.StatusWrongInput, err.(*DBError).Code) } @@ -75,7 +81,7 @@ func TestMongoDBGetNextErrorWhenEmptyCollection(t *testing.T) { db.Connect(dbaddress) db.databases = append(db.databases, dbname) defer cleanup() - _, err := db.GetNextRecord(dbname, groupId) + _, err := db.GetNextRecord(dbname, groupId, false) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) } @@ -83,7 +89,7 @@ func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec2) - _, err := db.GetNextRecord(dbname, groupId) + _, err := db.GetNextRecord(dbname, groupId, false) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"id\":1}", err.Error()) } @@ -92,7 +98,7 @@ func TestMongoDBGetNextOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - res, err := db.GetNextRecord(dbname, groupId) + res, err := db.GetNextRecord(dbname, groupId, false) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -101,8 +107,8 @@ func TestMongoDBGetNextErrorOnNoMoreData(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - db.GetNextRecord(dbname, groupId) - _, err := db.GetNextRecord(dbname, groupId) + db.GetNextRecord(dbname, groupId, false) + _, err := db.GetNextRecord(dbname, groupId, false) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) } @@ -111,8 +117,8 @@ func TestMongoDBGetNextCorrectOrder(t *testing.T) { defer cleanup() db.InsertRecord(dbname, &rec2) db.InsertRecord(dbname, &rec1) - res1, _ := db.GetNextRecord(dbname, groupId) - res2, _ := db.GetNextRecord(dbname, groupId) + res1, _ := db.GetNextRecord(dbname, groupId, false) + res2, _ := db.GetNextRecord(dbname, groupId, false) assert.Equal(t, string(rec1_expect), string(res1)) assert.Equal(t, string(rec2_expect), string(res2)) } @@ -144,7 +150,7 @@ func getRecords(n int) []int { for i := 0; i < n; i++ { go func() { defer wg.Done() - res_bin, _ := db.GetNextRecord(dbname, groupId) + res_bin, _ := db.GetNextRecord(dbname, groupId, false) var res TestRecord json.Unmarshal(res_bin, &res) results[res.ID] = 1 @@ -170,7 +176,7 @@ func TestMongoDBGetRecordByID(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - res, err := db.GetRecordByID(dbname, "", "1", true, false) + res, err := db.GetRecordByID(dbname, "", "1", true, false, false) assert.Nil(t, err) assert.Equal(t, string(rec1_expect), string(res)) } @@ -179,7 +185,7 @@ func TestMongoDBGetRecordByIDFails(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.InsertRecord(dbname, &rec1) - _, err := db.GetRecordByID(dbname, "", "2", true, false) + _, err := db.GetRecordByID(dbname, "", "2", true, false, false) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) assert.Equal(t, "{\"id\":2}", err.Error()) } @@ -287,7 +293,7 @@ func TestMongoDBGetRecordIDWithReset(t *testing.T) { } func TestMongoDBGetRecordByIDNotConnected(t *testing.T) { - _, err := db.GetRecordByID(dbname, "", "2", true, false) + _, err := db.GetRecordByID(dbname, "", "2", true, false, false) assert.Equal(t, utils.StatusError, err.(*DBError).Code) } @@ -416,3 +422,104 @@ func TestMongoDBQueryImagesOK(t *testing.T) { } } + +var rec_dataset1 = TestDataset{1, 3, []TestRecord{rec1, rec2, rec3}} +var rec_dataset2 = TestDataset{2, 2, []TestRecord{rec1, rec2, rec3}} +var rec_dataset3 = TestDataset{3, 3, []TestRecord{rec3, rec2, rec2}} + +func TestMongoDBGetDataset(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.InsertRecord(dbname, &rec_dataset1) + + res_string, err := db.ProcessRequest(dbname, groupId, "next_dataset", "0") + + assert.Nil(t, err) + + var res TestDataset + json.Unmarshal(res_string, &res) + + assert.Equal(t, rec_dataset1, res) +} + +func TestMongoDBNoDataOnNotCompletedDataset(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.InsertRecord(dbname, &rec_dataset2) + + res_string, err := db.ProcessRequest(dbname, groupId, "next_dataset", "0") + + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "", string(res_string)) +} + +func TestMongoDBGetRecordLastDataSetSkipsIncompleteSets(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.InsertRecord(dbname, &rec_dataset1) + db.InsertRecord(dbname, &rec_dataset2) + + res_string, err := db.ProcessRequest(dbname, groupId, "last_dataset", "0") + + assert.Nil(t, err) + + var res TestDataset + json.Unmarshal(res_string, &res) + + assert.Equal(t, rec_dataset1, res) +} + +func TestMongoDBGetRecordLastDataSetOK(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + + db.InsertRecord(dbname, &rec_dataset1) + db.InsertRecord(dbname, &rec_dataset1) + + res_string, err := db.ProcessRequest(dbname, groupId, "last_dataset", "0") + + assert.Nil(t, err) + + var res TestDataset + json.Unmarshal(res_string, &res) + + assert.Equal(t, rec_dataset3, res) +} + +func TestMongoDBGetDatasetIDWithReset(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.InsertRecord(dbname, &rec_dataset1) + db.InsertRecord(dbname, &rec_dataset3) + + _, err1 := db.ProcessRequest(dbname, groupId, "idreset_dataset", "2") //error while record is not complete, but reset counter to 2 + res2s, err2 := db.ProcessRequest(dbname, groupId, "next_dataset", "0") // so getnext would get record number 3 + + assert.NotNil(t, err1) + assert.Nil(t, err2) + + var res2 TestDataset + json.Unmarshal(res2s, &res2) + + assert.Equal(t, rec_dataset3, res2) + +} + +func TestMongoDBGetDatasetID(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.InsertRecord(dbname, &rec_dataset1) + + res_string, err := db.ProcessRequest(dbname, groupId, "id_dataset", "1") + + assert.Nil(t, err) + + var res TestDataset + json.Unmarshal(res_string, &res) + + assert.Equal(t, rec_dataset1, res) + +} diff --git a/broker/src/asapo_broker/server/process_request.go b/broker/src/asapo_broker/server/process_request.go index 3cede6c0014d855064980565dce4ca584fefb5f1..552fb64f8594f775a4a955fb1b22a5056b06d0b3 100644 --- a/broker/src/asapo_broker/server/process_request.go +++ b/broker/src/asapo_broker/server/process_request.go @@ -59,6 +59,10 @@ func processRequest(w http.ResponseWriter, r *http.Request, op string, extra_par op = "idreset" } + if datasetRequested(r) { + op = op + "_dataset" + } + answer, code := processRequestInDb(db_name, group_id, op, extra_param) w.WriteHeader(code) w.Write(answer) diff --git a/broker/src/asapo_broker/server/process_request_test.go b/broker/src/asapo_broker/server/process_request_test.go index 727f2aa3e68d190874ecd206f90863887832d6ef..3b4b4604d84f3fed40874219dd35bb16765b789e 100644 --- a/broker/src/asapo_broker/server/process_request_test.go +++ b/broker/src/asapo_broker/server/process_request_test.go @@ -151,3 +151,11 @@ func (suite *ProcessRequestTestSuite) TestProcessRequestWrongGroupID() { w := doRequest("/database/" + expectedBeamtimeId + "/" + wrongGroupID + "/next" + correctTokenSuffix) suite.Equal(http.StatusBadRequest, w.Code, "wrong group id") } + +func (suite *ProcessRequestTestSuite) TestProcessRequestAddsDataset() { + suite.mock_db.On("ProcessRequest", expectedBeamtimeId, expectedGroupID, "next_dataset", "0").Return([]byte("Hello"), nil) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request next_dataset in "+expectedBeamtimeId))) + ExpectCopyClose(suite.mock_db) + + doRequest("/database/" + expectedBeamtimeId + "/" + expectedGroupID + "/next" + correctTokenSuffix + "&dataset=true") +} diff --git a/broker/src/asapo_broker/server/request_common.go b/broker/src/asapo_broker/server/request_common.go index 53ae802ff4f4745da012cb23d425471ca7553f4b..8cda3b9967a66f06a5639f5b99f4d01a498a9be8 100644 --- a/broker/src/asapo_broker/server/request_common.go +++ b/broker/src/asapo_broker/server/request_common.go @@ -13,8 +13,8 @@ func writeAuthAnswer(w http.ResponseWriter, requestName string, db_name string, w.Write([]byte(err)) } -func resetRequested(r *http.Request) bool { - val := r.URL.Query().Get("reset") +func ValueTrue(r *http.Request, key string) bool { + val := r.URL.Query().Get(key) if len(val) == 0 { return false @@ -25,6 +25,15 @@ func resetRequested(r *http.Request) bool { } return false + +} + +func resetRequested(r *http.Request) bool { + return ValueTrue(r, "reset") +} + +func datasetRequested(r *http.Request) bool { + return ValueTrue(r, "dataset") } func testAuth(r *http.Request, beamtime_id string) error { diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h index f10c6a5d8a02a64da4f30b94f364b41ec5071c7e..2ce84c713732b7de7f7f8d6f89e83996892f223f 100644 --- a/common/cpp/include/common/data_structs.h +++ b/common/cpp/include/common/data_structs.h @@ -37,6 +37,12 @@ inline bool operator==(const FileInfo& lhs, const FileInfo& rhs) { using FileData = std::unique_ptr<uint8_t[]>; using FileInfos = std::vector<FileInfo>; + +struct DataSet { + uint64_t id; + FileInfos content; +}; + using SubDirList = std::vector<std::string>; } diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index bc65a16874fc24d54d26634bc1dc5fa49b1b7590..6136e2f5ba8efea410bedfe52e30313f7bb3c4e8 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -13,6 +13,7 @@ typedef uint64_t NetworkRequestId; enum Opcode : uint8_t { kOpcodeUnknownOp = 1, kOpcodeTransferData, + kOpcodeTransferSubsetData, kOpcodeGetBufferData, kOpcodeAuthorize, kOpcodeTransferMetaData, @@ -33,6 +34,9 @@ enum NetworkErrorCode : uint16_t { //TODO need to use an serialization framework to ensure struct consistency on different computers const std::size_t kMaxMessageSize = 1024; +const std::size_t kNCustomParams = 2; +using CustomRequestData = uint64_t[kNCustomParams]; + struct GenericRequestHeader { GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0, @@ -42,6 +46,7 @@ struct GenericRequestHeader { } GenericRequestHeader(const GenericRequestHeader& header) { op_code = header.op_code, data_id = header.data_id, data_size = header.data_size, meta_size = header.meta_size, + memcpy(custom_data, header.custom_data, kNCustomParams * sizeof(uint64_t)), strncpy(message, header.message, kMaxMessageSize); } @@ -49,6 +54,7 @@ struct GenericRequestHeader { uint64_t data_id; uint64_t data_size; uint64_t meta_size; + CustomRequestData custom_data; char message[kMaxMessageSize]; }; diff --git a/common/cpp/include/database/database.h b/common/cpp/include/database/database.h index 129a5e3a6ec927ceb1e3d9a64e088e85d378db0c..04979a76123644b4df0266c3ca0f35eee0eaaff3 100644 --- a/common/cpp/include/database/database.h +++ b/common/cpp/include/database/database.h @@ -18,6 +18,8 @@ class Database { const std::string& collection ) = 0; virtual Error Insert(const FileInfo& file, bool ignore_duplicates) const = 0; virtual Error Upsert(uint64_t id, const uint8_t* data, uint64_t size) const = 0; + virtual Error InsertAsSubset(const FileInfo& file, uint64_t subset_id, uint64_t subset_size, + bool ignore_duplicates) const = 0; virtual ~Database() = default; }; diff --git a/common/cpp/include/unittests/MockDatabase.h b/common/cpp/include/unittests/MockDatabase.h index 9216e8c4615019e0a31a7f0f73bcf8705242717f..94c611d5f70240dae80cb61be336dba9ed8bdab2 100644 --- a/common/cpp/include/unittests/MockDatabase.h +++ b/common/cpp/include/unittests/MockDatabase.h @@ -20,9 +20,19 @@ class MockDatabase : public Database { return Error{Insert_t(file, ignore_duplicates)}; } + Error InsertAsSubset(const FileInfo& file, uint64_t subset_id, + uint64_t subset_size, bool ignore_duplicates) const override { + return Error{InsertAsSubset_t(file, subset_id, subset_size, ignore_duplicates)}; + } + + MOCK_METHOD3(Connect_t, ErrorInterface * (const std::string&, const std::string&, const std::string&)); MOCK_CONST_METHOD2(Insert_t, ErrorInterface * (const FileInfo&, bool)); + + MOCK_CONST_METHOD4(InsertAsSubset_t, ErrorInterface * (const FileInfo&, uint64_t, uint64_t, bool)); + + Error Upsert(uint64_t id, const uint8_t* data, uint64_t size) const override { return Error{Upsert_t(id, data, size)}; diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index 4b65b5c63431168cddae537187e1ff8492780d3c..f3225f60e190a56e3d6ab05d5af8aa616901df60 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -192,5 +192,42 @@ Error MongoDBClient::Upsert(uint64_t id, const uint8_t* data, uint64_t size) con return UpdateBsonDocument(id, document, true); } +Error MongoDBClient::InsertAsSubset(const FileInfo& file, + uint64_t subset_id, + uint64_t subset_size, + bool ignore_duplicates) const { + if (!connected_) { + return DBErrorTemplates::kNotConnected.Generate(); + } + + Error err; + auto document = PrepareBsonDocument(file, &err); + if (err) { + return err; + } + auto query = BCON_NEW ("_id", BCON_INT64(subset_id)); + auto update = BCON_NEW ("$setOnInsert", "{", + "size", BCON_INT64 (subset_size), + "}", + "$addToSet", "{", + "images", BCON_DOCUMENT(document.get()), "}"); + + + bson_error_t mongo_err; + if (!mongoc_collection_update (collection_, MONGOC_UPDATE_UPSERT, query, update, NULL, &mongo_err)) { + if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) { + if (!mongoc_collection_update (collection_, MONGOC_UPDATE_NONE, query, update, NULL, &mongo_err)) { + err = DBErrorTemplates::kInsertError.Generate(mongo_err.message); + } + } else { + err = DBErrorTemplates::kInsertError.Generate(mongo_err.message); + } + } + + bson_destroy (query); + bson_destroy (update); + + return err; +} } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index fb6fe0ab5015930b128dac75575ffd407e3062bb..82266de6ee99bd5a09edd76dcf01fd5e294872fb 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -39,6 +39,8 @@ class MongoDBClient final : public Database { Error Connect(const std::string& address, const std::string& database, const std::string& collection) override; Error Insert(const FileInfo& file, bool ignore_duplicates) const override; + Error InsertAsSubset(const FileInfo& file, uint64_t subset_id, uint64_t subset_size, + bool ignore_duplicates) const override; Error Upsert(uint64_t id, const uint8_t* data, uint64_t size) const override; ~MongoDBClient() override; private: diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 49bf28341180e6afbe3635d8cf0b9dad8be060b0..9a270e058f2ccffbdae1f96ed606910615e34041 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -20,6 +20,7 @@ struct Args { uint64_t nthreads; uint64_t mode; uint64_t timeout_sec; + uint64_t images_in_set; }; void PrintCommandArguments(const Args& args) { @@ -30,17 +31,18 @@ void PrintCommandArguments(const Args& args) { << "nthreads: " << args.nthreads << std::endl << "mode: " << args.mode << std::endl << "timeout: " << args.timeout_sec << std::endl + << "images in set: " << args.images_in_set << std::endl << std::endl; } void ProcessCommandArguments(int argc, char* argv[], Args* args) { asapo::ExitAfterPrintVersionIfNeeded("Dummy Data Producer", argc, argv); - if (argc != 8) { + if (argc != 8 && argc != 9) { std::cout << "Usage: " << argv[0] << " <destination> <beamtime_id> <number_of_byte> <iterations> <nthreads>" - " <mode 0 -t tcp, 1 - filesystem> <timeout (sec)>" + " <mode 0 -t tcp, 1 - filesystem> <timeout (sec)> [n images in set (default 1)]" << std::endl; exit(EXIT_FAILURE); } @@ -52,6 +54,11 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { args->nthreads = std::stoull(argv[5]); args->mode = std::stoull(argv[6]); args->timeout_sec = std::stoull(argv[7]); + if (argc == 9) { + args->images_in_set = std::stoull(argv[8]); + } else { + args->images_in_set = 1; + } PrintCommandArguments(*args); return; } catch(std::exception& e) { @@ -89,7 +96,7 @@ asapo::FileData CreateMemoryBuffer(size_t size) { } -bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations) { +bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations, uint64_t images_in_set) { asapo::Error err; if (iterations > 0) { // send wrong meta, for negative integration tests @@ -106,10 +113,25 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it auto buffer = CreateMemoryBuffer(number_of_byte); asapo::EventHeader event_header{i + 1, number_of_byte, std::to_string(i + 1)}; std::string meta = "{\"user_meta\":\"test" + std::to_string(i + 1) + "\"}"; - auto err = producer->SendData(event_header, std::move(buffer), std::move(meta), &ProcessAfterSend); - if (err) { - std::cerr << "Cannot send file: " << err << std::endl; - return false; + if (images_in_set == 1) { + auto err = producer->SendData(event_header, std::move(buffer), std::move(meta), &ProcessAfterSend); + if (err) { + std::cerr << "Cannot send file: " << err << std::endl; + return false; + } + } else { + for (uint64_t id = 0; id < images_in_set; id++) { + auto buffer = CreateMemoryBuffer(number_of_byte); + event_header.subset_id = i + 1; + event_header.subset_size = images_in_set; + event_header.file_id = id + 1; + event_header.file_name = std::to_string(i + 1) + "_" + std::to_string(id + 1); + auto err = producer->SendData(event_header, std::move(buffer), meta, &ProcessAfterSend); + if (err) { + std::cerr << "Cannot send file: " << err << std::endl; + return false; + } + } } } return true; @@ -166,11 +188,11 @@ int main (int argc, char* argv[]) { auto producer = CreateProducer(args); - iterations_remained = args.iterations + 1; + iterations_remained = args.iterations * args.images_in_set + 1; system_clock::time_point start_time = system_clock::now(); - if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations)) { + if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations, args.images_in_set)) { return EXIT_FAILURE; } diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp index 26d7a7ed3c20f55dc376b3e332951f5999426e56..09b8b8a99546c82fc686a1313039fc2afd388a41 100644 --- a/examples/worker/getnext_broker/getnext_broker.cpp +++ b/examples/worker/getnext_broker/getnext_broker.cpp @@ -10,7 +10,6 @@ #include "asapo_worker.h" - using std::chrono::system_clock; using asapo::Error; @@ -25,6 +24,7 @@ struct Params { int timeout_ms; int nthreads; bool read_data; + bool datasets; }; void WaitThreads(std::vector<std::thread>* threads) { @@ -42,8 +42,9 @@ int ProcessError(const Error& err) { std::vector<std::thread> StartThreads(const Params& params, std::vector<int>* nfiles, std::vector<int>* errors, - std::vector<int>* nbuf) { - auto exec_next = [¶ms, nfiles, errors, nbuf](int i) { + std::vector<int>* nbuf, + std::vector<int>* nfiles_total) { + auto exec_next = [¶ms, nfiles, errors, nbuf, nfiles_total](int i) { asapo::FileInfo fi; Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(params.server, params.file_path, params.beamtime_id, @@ -72,16 +73,27 @@ std::vector<std::thread> StartThreads(const Params& params, } } while (true) { - err = broker->GetNext(&fi, group_id, params.read_data ? &data : nullptr); - if (err == nullptr) { - (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; - if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { - data[9] = 0; - std::cout << "Received: " << reinterpret_cast<char const*>(data.get()) << std::endl; + if (params.datasets) { + auto dataset = broker->GetNextDataset(group_id, &err); + if (err == nullptr) { + for (auto& fi : dataset.content) { + (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; + (*nfiles_total)[i]++; + } } } else { + err = broker->GetNext(&fi, group_id, params.read_data ? &data : nullptr); + if (err == nullptr) { + (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1; + if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) { + data[9] = 0; + std::cout << "Received: " << reinterpret_cast<char const*>(data.get()) << std::endl; + } + } + } + if (err) { (*errors)[i] += ProcessError(err); - std::cout << "Received: " << (int)err->GetErrorType() << err << std::endl; + std::cout << "Received: " << (int) err->GetErrorType() << err << std::endl; if (err == asapo::IOErrorTemplates::kTimeout) { break; } @@ -97,20 +109,22 @@ std::vector<std::thread> StartThreads(const Params& params, return threads; } -int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* nbuf) { +int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* nbuf, int* nfiles_total) { asapo::FileInfo fi; system_clock::time_point t1 = system_clock::now(); std::vector<int> nfiles(params.nthreads, 0); std::vector<int> errors(params.nthreads, 0); std::vector<int> nfiles_frombuf(params.nthreads, 0); + std::vector<int> nfiles_total_in_datasets(params.nthreads, 0); - auto threads = StartThreads(params, &nfiles, &errors, &nfiles_frombuf); + auto threads = StartThreads(params, &nfiles, &errors, &nfiles_frombuf, &nfiles_total_in_datasets); WaitThreads(&threads); int n_total = std::accumulate(nfiles.begin(), nfiles.end(), 0); *nerrors = std::accumulate(errors.begin(), errors.end(), 0); *nbuf = std::accumulate(nfiles_frombuf.begin(), nfiles_frombuf.end(), 0); + *nfiles_total = std::accumulate(nfiles_total_in_datasets.begin(), nfiles_total_in_datasets.end(), 0); system_clock::time_point t2 = system_clock::now(); auto duration_read = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); @@ -120,14 +134,15 @@ int ReadAllData(const Params& params, uint64_t* duration_ms, int* nerrors, int* int main(int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("GetNext Broker Example", argc, argv); - if (argc != 8) { + Params params; + params.datasets = false; + if (argc != 8 && argc != 9) { std::cout << "Usage: " + std::string{argv[0]} - + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly>" + + " <server> <files_path> <run_name> <nthreads> <token> <timeout ms> <metaonly> [use datasets]" << std::endl; exit(EXIT_FAILURE); } - Params params; params.server = std::string{argv[1]}; params.file_path = std::string{argv[2]}; params.beamtime_id = std::string{argv[3]}; @@ -135,12 +150,16 @@ int main(int argc, char* argv[]) { params.token = std::string{argv[5]}; params.timeout_ms = atoi(argv[6]); params.read_data = atoi(argv[7]) != 1; - + if (argc == 9) { + params.datasets = atoi(argv[8]) == 1; + } uint64_t duration_ms; - int nerrors, nbuf; - auto nfiles = ReadAllData(params, &duration_ms, &nerrors, &nbuf); - - std::cout << "Processed " << nfiles << " file(s)" << std::endl; + int nerrors, nbuf, nfiles_total; + auto nfiles = ReadAllData(params, &duration_ms, &nerrors, &nbuf, &nfiles_total); + std::cout << "Processed " << nfiles << (params.datasets ? " dataset(s)" : " file(s)") << std::endl; + if (params.datasets) { + std::cout << " with " << nfiles_total << " file(s)" << std::endl; + } std::cout << "Successfully: " << nfiles - nerrors << std::endl; if (params.read_data) { std::cout << " from memory buffer: " << nbuf << std::endl; diff --git a/producer/api/include/producer/common.h b/producer/api/include/producer/common.h index e8c0d8522f2e64f82a790ae77811a6581f75e613..1e3ba00280b82855217584fffd4efd715e1d7461 100644 --- a/producer/api/include/producer/common.h +++ b/producer/api/include/producer/common.h @@ -20,9 +20,17 @@ enum class RequestHandlerType { struct EventHeader { - uint64_t file_id; - uint64_t file_size; + EventHeader() {}; + EventHeader(uint64_t file_id_i, uint64_t file_size_i, std::string file_name_i, uint64_t expected_subset_id_i = 0, + uint64_t expected_subset_size_i = 0 ): + file_id{file_id_i}, file_size{file_size_i}, file_name{std::move(file_name_i)}, + subset_id{expected_subset_id_i}, + subset_size{expected_subset_size_i} {}; + uint64_t file_id = 0; + uint64_t file_size = 0; std::string file_name; + uint64_t subset_id = 0; + uint64_t subset_size = 0; }; } diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h index 04f06aa819bac8b3fb11b94b2713aa17916701f6..ee6caa8fd3208b03acd8641190b537f34a3b049c 100644 --- a/producer/api/include/producer/producer_error.h +++ b/producer/api/include/producer/producer_error.h @@ -14,6 +14,7 @@ enum class ProducerErrorType { kBeamtimeAlreadySet, kFileIdAlreadyInUse, kErrorInMetadata, + kErrorSubsetSize, kAuthorizationFailed, kInternalServerError, kCannotSendDataToReceivers, @@ -30,6 +31,11 @@ auto const kConnectionNotReady = ProducerErrorTemplate { "Connection not ready", ProducerErrorType::kConnectionNotReady }; +auto const kErrorSubsetSize = ProducerErrorTemplate { + "Error in subset size", ProducerErrorType::kErrorSubsetSize +}; + + auto const kFileTooLarge = ProducerErrorTemplate { "File too large", ProducerErrorType::kFileTooLarge }; diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index b5c679a48f1c1ceb10023a108976aecab76f96f9..9b4b557297d126b6c6d0bd0c4136a3d5b75cf361 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -29,21 +29,30 @@ ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, a request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get(), log__}); } -GenericRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, uint64_t file_size, uint64_t meta_size, - std::string file_name) { - GenericRequestHeader request{kOpcodeTransferData, file_id, file_size, meta_size, std::move(file_name)}; +GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header, uint64_t meta_size) { + GenericRequestHeader request{kOpcodeTransferData, event_header.file_id, event_header.file_size, + meta_size, std::move(event_header.file_name)}; + if (event_header.subset_id != 0) { + request.op_code = kOpcodeTransferSubsetData; + request.custom_data[0] = event_header.subset_id; + request.custom_data[1] = event_header.subset_size; + } return request; } -Error CheckProducerRequest(size_t file_size, size_t filename_size) { - if (file_size > ProducerImpl::kMaxChunkSize) { +Error CheckProducerRequest(const EventHeader& event_header) { + if ((size_t)event_header.file_size > ProducerImpl::kMaxChunkSize) { return ProducerErrorTemplates::kFileTooLarge.Generate(); } - if (filename_size > kMaxMessageSize) { + if (event_header.file_name.size() > kMaxMessageSize) { return ProducerErrorTemplates::kFileNameTooLong.Generate(); } + if (event_header.subset_id > 0 && event_header.subset_size == 0) { + return ProducerErrorTemplates::kErrorSubsetSize.Generate(); + } + return nullptr; } @@ -52,14 +61,13 @@ Error ProducerImpl::Send(const EventHeader& event_header, std::string metadata, std::string full_path, RequestCallback callback) { - auto err = CheckProducerRequest((size_t)event_header.file_size, event_header.file_name.size()); + auto err = CheckProducerRequest(event_header); if (err) { log__->Error("error checking request - " + err->Explain()); return err; } - auto request_header = GenerateNextSendRequest(event_header.file_id, event_header.file_size, - metadata.size(), event_header.file_name); + auto request_header = GenerateNextSendRequest(event_header, metadata.size()); return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{beamtime_id_, std::move(request_header), std::move(data), std::move(metadata), std::move(full_path), callback} diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index a6845b81b3f1303b0a0624858a8640560ce2ae21..cb887750c32434314a2fc4917a04347f08cde99b 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -38,12 +38,10 @@ class ProducerImpl : public Producer { private: Error Send(const EventHeader& event_header, FileData data, std::string metadata, std::string full_path, RequestCallback callback); - GenericRequestHeader GenerateNextSendRequest(uint64_t file_id, uint64_t file_size, uint64_t meta_size, - std::string file_name); + GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, uint64_t meta_size); std::string beamtime_id_; }; -Error CheckProducerRequest(const GenericRequestHeader header); } #endif //ASAPO_PRODUCER__PRODUCER_IMPL_H diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index fc168692df8dcce9d32b5ef08dbc7e30c9ab5370..4692d02122402ce96f5313c1f9e95c29939618bc 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -105,8 +105,9 @@ Error RequestHandlerTcp::TrySendToReceiver(const ProducerRequest* request) { return err; } - log__->Debug(std::string("successfully sent data ") + " id: " + std::to_string(request->header.data_id) + " to " + - connected_receiver_uri_); + log__->Debug("successfully sent data, opcode: " + std::to_string(request->header.op_code) + + ", id: " + std::to_string(request->header.data_id) + " to " + connected_receiver_uri_); + return nullptr; } diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index ceddea6b18089d55b4cedab519f9912355a9252a..3e3532baf4b09fdba38cfaa550e9d054a4711974 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -29,7 +29,7 @@ using asapo::RequestPool; using asapo::ProducerRequest; -MATCHER_P6(M_CheckSendDataRequest, op_code, beamtime_id, metadata, file_id, file_size, message, +MATCHER_P8(M_CheckSendDataRequest, op_code, beamtime_id, metadata, file_id, file_size, message, subset_id, subset_size, "Checks if a valid GenericRequestHeader was Send") { auto request = static_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader)(arg->header)).op_code == op_code @@ -37,6 +37,10 @@ MATCHER_P6(M_CheckSendDataRequest, op_code, beamtime_id, metadata, file_id, file && ((asapo::GenericRequestHeader)(arg->header)).data_size == uint64_t(file_size) && request->beamtime_id == beamtime_id && request->metadata == metadata + && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader)(arg->header)).custom_data[0] == + uint64_t(subset_id) : true) + && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader)(arg->header)).custom_data[1] == + uint64_t(subset_size) : true) && strcmp(((asapo::GenericRequestHeader)(arg->header)).message, message) == 0; } @@ -54,6 +58,15 @@ class ProducerImplTests : public testing::Test { testing::NiceMock<asapo::MockLogger> mock_logger; testing::NiceMock<MockRequestPull> mock_pull{&factory, &mock_logger}; asapo::ProducerImpl producer{"", 1, asapo::RequestHandlerType::kTcp}; + uint64_t expected_size = 100; + uint64_t expected_id = 10; + uint64_t expected_subset_id = 100; + uint64_t expected_subset_size = 4; + + char expected_name[asapo::kMaxMessageSize] = "test_name"; + std::string expected_beamtimeid = "beamtime_id"; + std::string expected_metadata = "meta"; + std::string expected_fullpath = "filename"; void SetUp() override { producer.log__ = &mock_logger; producer.request_pool__ = std::unique_ptr<RequestPool> {&mock_pull}; @@ -86,23 +99,21 @@ TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } +TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) { + EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset size"))); + asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize, "", 1}; + auto err = producer.SendData(event_header, nullptr, "", nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kErrorSubsetSize)); +} -TEST_F(ProducerImplTests, OKSendingSendDataRequest) { - uint64_t expected_size = 100; - uint64_t expected_meta_size = 4; - uint64_t expected_id = 10; - char expected_name[asapo::kMaxMessageSize] = "test_name"; - std::string expected_beamtimeid = "beamtime_id"; - std::string expected_metadata = "meta"; + +TEST_F(ProducerImplTests, OKSendingSendDataRequest) { producer.SetBeamtimeId(expected_beamtimeid); - ProducerRequest request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, - expected_size, expected_meta_size, expected_name}, - nullptr, expected_metadata, "", nullptr}; EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, expected_beamtimeid, expected_metadata, - expected_id, expected_size, expected_name))).WillOnce(Return( + expected_id, expected_size, expected_name, 0, 0))).WillOnce(Return( nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name}; @@ -111,17 +122,31 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) { ASSERT_THAT(err, Eq(nullptr)); } +TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) { + producer.SetBeamtimeId(expected_beamtimeid); + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferSubsetData, + expected_beamtimeid, expected_metadata, + expected_id, expected_size, expected_name, + expected_subset_id, expected_subset_size))).WillOnce(Return( + nullptr)); -TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { - uint64_t expected_id = 0; - std::string expected_metadata = "{\"meta\":10}"; - uint64_t expected_size = expected_metadata.size(); + asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_subset_id, expected_subset_size}; + auto err = producer.SendData(event_header, nullptr, expected_metadata, nullptr); + + ASSERT_THAT(err, Eq(nullptr)); +} - std::string expected_beamtimeid = "beamtime_id"; + + +TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { + expected_id = 0; + expected_metadata = "{\"meta\":10}"; + expected_size = expected_metadata.size(); producer.SetBeamtimeId(expected_beamtimeid); EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferMetaData, - expected_beamtimeid, "", expected_id, expected_size, "beamtime_global.meta"))).WillOnce(Return( + expected_beamtimeid, "", expected_id, + expected_size, "beamtime_global.meta", 10, 10))).WillOnce(Return( nullptr)); auto err = producer.SendMetaData(expected_metadata, nullptr); @@ -130,17 +155,10 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) { } TEST_F(ProducerImplTests, OKSendingSendFileRequest) { - uint64_t expected_id = 10; - char expected_name[asapo::kMaxMessageSize] = "test_name"; - std::string expected_beamtimeid = "beamtime_id"; - std::string expected_fullpath = "filename"; - producer.SetBeamtimeId(expected_beamtimeid); - ProducerRequest request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, 0, 0, expected_name}, - nullptr, "", "", nullptr}; EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, - expected_beamtimeid, "", expected_id, 0, expected_name))).WillOnce(Return( + expected_beamtimeid, "", expected_id, 0, expected_name, 0, 0))).WillOnce(Return( nullptr)); asapo::EventHeader event_header{expected_id, 0, expected_name}; diff --git a/producer/event_monitor_producer/src/eventmon_config.cpp b/producer/event_monitor_producer/src/eventmon_config.cpp index ca1a07c9ebcf62416accdbab0e148a516bd05234..24882a220eab1a2e8b60b25b42e37d7e2b29912c 100644 --- a/producer/event_monitor_producer/src/eventmon_config.cpp +++ b/producer/event_monitor_producer/src/eventmon_config.cpp @@ -11,9 +11,31 @@ EventMonConfigFactory::EventMonConfigFactory() : io__{GenerateDefaultIO()} { } +Error SubsetModeToEnum(const std::string& mode_str, SubSetMode* mode) { + if (mode_str == "batch") { + *mode = SubSetMode::kBatch; + return nullptr; + } + + if (mode_str == "none") { + *mode = SubSetMode::kNone; + return nullptr; + } + + if (mode_str == "multisource") { + *mode = SubSetMode::kMultiSource; + return nullptr; + } + + + return TextError("Wrone subset mode:" + mode_str); +} + Error EventMonConfigFactory::ParseConfigFile(std::string file_name) { JsonFileParser parser(file_name, &io__); Error err = nullptr; + std::string subset_mode; + (err = parser.GetString("AsapoEndpoint", &config.asapo_endpoint)) || (err = parser.GetString("Tag", &config.tag)) || (err = parser.GetString("BeamtimeID", &config.beamtime_id)) || @@ -23,7 +45,22 @@ Error EventMonConfigFactory::ParseConfigFile(std::string file_name) { (err = parser.GetString("LogLevel", &config.log_level_str)) || (err = parser.GetBool("RemoveAfterSend", &config.remove_after_send)) || (err = parser.GetArrayString("MonitoredSubFolders", &config.monitored_subfolders)) || - (err = parser.GetArrayString("IgnoreExtentions", &config.ignored_extentions)); + (err = parser.GetArrayString("IgnoreExtentions", &config.ignored_extentions)) || + (err = parser.Embedded("Subset").GetString("Mode", &subset_mode)) || + (err = SubsetModeToEnum(subset_mode, &config.subset_mode)); + if (err) { + return err; + } + + if (config.subset_mode == SubSetMode::kBatch) { + err = parser.Embedded("Subset").GetUInt64("BatchSize", &config.subset_batch_size); + } + + if (config.subset_mode == SubSetMode::kMultiSource) { + err = parser.Embedded("Subset").GetUInt64("NSources", &config.subset_multisource_nsources); + err = parser.Embedded("Subset").GetUInt64("SourceId", &config.subset_multisource_sourceid); + } + return err; } @@ -33,7 +70,9 @@ Error EventMonConfigFactory::CheckConfig() { Error err; (err = CheckMode()) || (err = CheckLogLevel()) || - (err = CheckNThreads()); + (err = CheckNThreads()) || + (err = CheckSubsets()); + //todo: check monitored folders exist? return err; } @@ -73,8 +112,20 @@ Error EventMonConfigFactory::CheckNThreads() { return nullptr; } +Error EventMonConfigFactory::CheckSubsets() { + if (config.subset_mode == SubSetMode::kBatch && config.subset_batch_size < 1) { + return TextError("Batch size should > 0"); + } + if (config.subset_mode == SubSetMode::kMultiSource && config.subset_multisource_nsources < 1) { + return TextError("Number of sources size should be > 0"); + } + + + return nullptr; +} + const EventMonConfig* GetEventMonConfig() { return &config; } diff --git a/producer/event_monitor_producer/src/eventmon_config.h b/producer/event_monitor_producer/src/eventmon_config.h index 494eae4385876e07d85bade0a07831720b626055..bbdd82bc2d319fc9f468ec5a74d99bf97548db6d 100644 --- a/producer/event_monitor_producer/src/eventmon_config.h +++ b/producer/event_monitor_producer/src/eventmon_config.h @@ -9,6 +9,12 @@ namespace asapo { +enum class SubSetMode { + kNone, + kBatch, + kMultiSource +}; + struct EventMonConfig { std::string asapo_endpoint; LogLevel log_level = LogLevel::Info; @@ -20,6 +26,10 @@ struct EventMonConfig { std::vector<std::string> monitored_subfolders; std::vector<std::string> ignored_extentions; bool remove_after_send = false; + SubSetMode subset_mode = SubSetMode::kNone; + uint64_t subset_batch_size = 1; + uint64_t subset_multisource_nsources = 1; + uint64_t subset_multisource_sourceid = 1; private: std::string log_level_str; std::string mode_str; diff --git a/producer/event_monitor_producer/src/eventmon_config_factory.h b/producer/event_monitor_producer/src/eventmon_config_factory.h index eb3735158028eef8161fd18b5c1534ff3c089987..933feb2b994ceea7e75dfe3b909c39964a086af0 100644 --- a/producer/event_monitor_producer/src/eventmon_config_factory.h +++ b/producer/event_monitor_producer/src/eventmon_config_factory.h @@ -16,6 +16,7 @@ class EventMonConfigFactory { Error ParseConfigFile(std::string file_name); Error CheckMode(); Error CheckLogLevel(); + Error CheckSubsets(); Error CheckNThreads(); Error CheckConfig(); diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp index 31268ff4381e300d0087beb7dd825d8b8d088d90..d4344370cb5c80c5bf881c311522420cef000258 100644 --- a/producer/event_monitor_producer/src/main_eventmon.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -73,6 +73,22 @@ void SignalHandler(int signal) { } +void HandleSubsets(asapo::EventHeader* header) { + switch (GetEventMonConfig()->subset_mode) { + case asapo::SubSetMode::kNone: + return; + case asapo::SubSetMode::kBatch: + header->subset_size = GetEventMonConfig()->subset_batch_size; + header->subset_id = (header->file_id - 1) / header->subset_size + 1; + break; + case asapo::SubSetMode::kMultiSource: + header->subset_size = GetEventMonConfig()->subset_multisource_nsources; + header->subset_id = header->file_id; + header->file_id = GetEventMonConfig()->subset_multisource_sourceid; + break; + } +} + int main (int argc, char* argv[]) { asapo::ExitAfterPrintVersionIfNeeded("ASAPO Event Monitor", argc, argv); @@ -117,6 +133,7 @@ int main (int argc, char* argv[]) { continue; } event_header.file_id = ++i; + HandleSubsets(&event_header); producer->SendFile(event_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator + event_header.file_name, ProcessAfterSend); } diff --git a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp index 6cabb6da075bb65a31c44b160be3cc0f7e53c7a4..bf3be313969dac4ad8a52d46e7a31c7a72b98c4e 100644 --- a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp +++ b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp @@ -50,6 +50,32 @@ Error SetFolderMonConfig (const EventMonConfig& config) { config_string += "," + std::string("\"LogLevel\":") + "\"" + log_level + "\""; config_string += "," + std::string("\"RemoveAfterSend\":") + (config.remove_after_send ? "true" : "false"); + std::string subset_mode; + switch (config.subset_mode) { + case SubSetMode::kBatch: + subset_mode = "batch"; + break; + case SubSetMode::kMultiSource: + subset_mode = "multisource"; + break; + + case SubSetMode::kNone: + subset_mode = "none"; + break; + + } + config_string += "," + std::string("\"Subset\":{"); + config_string += std::string("\"Mode\":") + "\"" + subset_mode + "\""; + if (config.subset_mode == SubSetMode::kBatch) { + config_string += "," + std::string("\"BatchSize\":") + std::to_string(config.subset_batch_size); + } + if (config.subset_mode == SubSetMode::kMultiSource) { + config_string += "," + std::string("\"SourceId\":") + std::to_string(config.subset_multisource_sourceid); + config_string += "," + std::string("\"NSources\":") + std::to_string(config.subset_multisource_nsources); + } + + config_string += "}"; + std::string mon_folders; for (auto folder : config.monitored_subfolders) { mon_folders += "\"" + folder + "\"" + ","; @@ -75,7 +101,6 @@ Error SetFolderMonConfig (const EventMonConfig& config) { config_string += "}"; - EXPECT_CALL(mock_io, ReadFileToString_t("fname", _)).WillOnce( testing::Return(config_string) ); diff --git a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp index 71a3928834349484ef8196899e11ece98755a140..544f12cbb42cabe7ede883d535b7ed17875e9a31 100644 --- a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp +++ b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp @@ -31,6 +31,8 @@ using ::asapo::MockIO; using ::asapo::EventMonConfigFactory; using asapo::EventMonConfig; +using asapo::SubSetMode; + namespace { @@ -60,6 +62,8 @@ TEST_F(ConfigTests, ReadSettingsOK) { test_config.monitored_subfolders = {"test1", "test2"}; test_config.ignored_extentions = {"tmp", "test"}; test_config.remove_after_send = true; + test_config.subset_mode = SubSetMode::kBatch; + test_config.subset_batch_size = 9; auto err = asapo::SetFolderMonConfig(test_config); auto config = asapo::GetEventMonConfig(); @@ -75,9 +79,32 @@ TEST_F(ConfigTests, ReadSettingsOK) { ASSERT_THAT(config->root_monitored_folder, Eq("tmp")); ASSERT_THAT(config->ignored_extentions, ElementsAre("tmp", "test")); ASSERT_THAT(config->remove_after_send, Eq(true)); + ASSERT_THAT(config->subset_mode, Eq(SubSetMode::kBatch)); + ASSERT_THAT(config->subset_batch_size, Eq(9)); + } +TEST_F(ConfigTests, ReadSettingsMultiSourceOK) { + asapo::EventMonConfig test_config; + test_config.subset_mode = SubSetMode::kMultiSource; + test_config.subset_multisource_nsources = 2; + test_config.subset_multisource_sourceid = 12; + auto err = asapo::SetFolderMonConfig(test_config); + + auto config = asapo::GetEventMonConfig(); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(config->subset_mode, Eq(SubSetMode::kMultiSource)); + ASSERT_THAT(config->subset_multisource_nsources, Eq(2)); + ASSERT_THAT(config->subset_multisource_sourceid, Eq(12)); + +} + + + + + TEST_F(ConfigTests, ReadSettingsChecksNthreads) { asapo::EventMonConfig test_config; test_config.nthreads = 0; @@ -92,6 +119,31 @@ TEST_F(ConfigTests, ReadSettingsChecksNthreads) { } +TEST_F(ConfigTests, ReadSettingsChecksSubsets) { + asapo::EventMonConfig test_config; + test_config.subset_mode = SubSetMode::kBatch; + test_config.subset_batch_size = 0; + + auto err = asapo::SetFolderMonConfig(test_config); + ASSERT_THAT(err, Ne(nullptr)); + + test_config.subset_mode = SubSetMode::kMultiSource; + test_config.subset_multisource_nsources = 0; + + err = asapo::SetFolderMonConfig(test_config); + ASSERT_THAT(err, Ne(nullptr)); + + +} + +TEST_F(ConfigTests, ReadSettingsDoesnotChecksSubsetsIfNoSubsets) { + asapo::EventMonConfig test_config; + test_config.subset_batch_size = 0; + + auto err = asapo::SetFolderMonConfig(test_config); + ASSERT_THAT(err, Eq(nullptr)); +} + TEST_F(ConfigTests, ReadSettingsChecksMode) { asapo::EventMonConfig test_config; diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 5f8a9bcf8c32c07879d3907759a3e409cff25376..286cff585967abd6e513a241a8afbd09773b5235 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -176,4 +176,9 @@ const std::string& Request::GetMetaData() const { return metadata_; } +const CustomRequestData& Request::GetCustomData() const { + return request_header_.custom_data; +} + + } \ No newline at end of file diff --git a/receiver/src/request.h b/receiver/src/request.h index 82d470f93afc08a3f7106464a55e65ed3765a673..c97cdd357e82c85373cf8ecd156fee71d1ffbca8 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -42,6 +42,8 @@ class Request { VIRTUAL void SetBeamtimeId(std::string beamtime_id); VIRTUAL void SetBeamline(std::string beamline); VIRTUAL const std::string& GetBeamline() const; + VIRTUAL const CustomRequestData& GetCustomData() const; + std::unique_ptr<IO> io__; DataCache* cache__ = nullptr; VIRTUAL uint64_t GetSlotId() const; diff --git a/receiver/src/request_factory.cpp b/receiver/src/request_factory.cpp index bbec5fa99e464744f55d530b641ed2ff046b379e..d0e1c152244b74ecedc06b31b56662439b779f2c 100644 --- a/receiver/src/request_factory.cpp +++ b/receiver/src/request_factory.cpp @@ -10,7 +10,8 @@ std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHea *err = nullptr; auto request = std::unique_ptr<Request> {new Request{request_header, socket_fd, std::move(origin_uri), cache_.get()}}; switch (request_header.op_code) { - case Opcode::kOpcodeTransferData: { + case Opcode::kOpcodeTransferData: + case Opcode::kOpcodeTransferSubsetData: { request->AddHandler(&request_handler_authorize_); if (GetReceiverConfig()->write_to_disk) { request->AddHandler(&request_handler_filewrite_); diff --git a/receiver/src/request_handler_db_write.cpp b/receiver/src/request_handler_db_write.cpp index ecb9ad2069a37081aacf67e8038826e6baa7a2fb..7540492fbace9d6b3226b793bfb37e9e33e93c3b 100644 --- a/receiver/src/request_handler_db_write.cpp +++ b/receiver/src/request_handler_db_write.cpp @@ -26,11 +26,26 @@ Error RequestHandlerDbWrite::ProcessRequest(Request* request) const { Error RequestHandlerDbWrite::InsertRecordToDb(const Request* request) const { auto file_info = PrepareFileInfo(request); - auto err = db_client__->Insert(file_info, true); - if (!err) { - log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + collection_name_ + " in " + - db_name_ + - " at " + GetReceiverConfig()->broker_db_uri); + + auto op_code = request->GetOpCode(); + Error err; + if (op_code == Opcode::kOpcodeTransferData) { + err = db_client__->Insert(file_info, true); + if (!err) { + log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + collection_name_ + " in " + + db_name_ + + " at " + GetReceiverConfig()->broker_db_uri); + } + } else { + auto subset_id = request->GetCustomData()[0]; + auto subset_size = request->GetCustomData()[1]; + err = db_client__->InsertAsSubset(file_info, subset_id, subset_size, true); + if (!err) { + log__->Debug(std::string{"insert record as subset id "} + std::to_string(subset_id) + ", id: " + + std::to_string(file_info.id) + " to " + collection_name_ + " in " + + db_name_ + + " at " + GetReceiverConfig()->broker_db_uri); + } } return err; } diff --git a/receiver/src/requests_dispatcher.cpp b/receiver/src/requests_dispatcher.cpp index bb44cb5ebdbbe47239b02f126333be29cafe5a39..afb0ca38bc67f9fc2d7d647477183472c35d7a32 100644 --- a/receiver/src/requests_dispatcher.cpp +++ b/receiver/src/requests_dispatcher.cpp @@ -30,7 +30,8 @@ NetworkErrorCode GetNetworkCodeFromError(const Error& err) { } Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept { - log__->Debug("processing request id " + std::to_string(request->GetDataID()) + " from " + producer_uri_ ); + log__->Debug("processing request id " + std::to_string(request->GetDataID()) + ", opcode " + + std::to_string(request->GetOpCode()) + " from " + producer_uri_ ); Error handle_err; handle_err = request->Handle(statistics__); GenericNetworkResponse generic_response; diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h index 05227a6888f12684a9bcac0aa936ff7e8791874a..4b3d3771a18caca8610d892611b39125e413b39f 100644 --- a/receiver/unittests/receiver_mocking.h +++ b/receiver/unittests/receiver_mocking.h @@ -53,7 +53,13 @@ class MockRequest: public Request { MOCK_CONST_METHOD0(GetBeamtimeId, const std::string & ()); MOCK_CONST_METHOD0(GetMetaData, const std::string & ()); MOCK_CONST_METHOD0(GetBeamline, const std::string & ()); - MOCK_CONST_METHOD0(GetOpCode, asapo::Opcode ()); + MOCK_CONST_METHOD0(GetOpCode, + asapo::Opcode ()); + const asapo::CustomRequestData& GetCustomData() const override { + return (asapo::CustomRequestData&) * GetCustomData_t(); + }; + + MOCK_CONST_METHOD0(GetCustomData_t, const uint64_t* ()); MOCK_CONST_METHOD0(GetMessage, const char* ()); MOCK_METHOD1(SetBeamtimeId, void (std::string)); MOCK_METHOD1(SetBeamline, void (std::string)); diff --git a/receiver/unittests/test_request_factory.cpp b/receiver/unittests/test_request_factory.cpp index 80cb5b098eddcf8040ae11daf9db74371b8c3690..c737746152962c9294fb440d2d5e290027ac5933 100644 --- a/receiver/unittests/test_request_factory.cpp +++ b/receiver/unittests/test_request_factory.cpp @@ -76,14 +76,16 @@ TEST_F(FactoryTests, ErrorOnWrongCode) { } TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendDataCode) { - generic_request_header.op_code = asapo::Opcode::kOpcodeTransferData; - auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); - - ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileWrite*>(request->GetListHandlers()[1]), Ne(nullptr)); - ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); + for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferSubsetData}) { + generic_request_header.op_code = code; + auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::Request*>(request.get()), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerFileWrite*>(request->GetListHandlers()[1]), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbWrite*>(request->GetListHandlers().back()), Ne(nullptr)); + } } diff --git a/receiver/unittests/test_request_handler_db_writer.cpp b/receiver/unittests/test_request_handler_db_writer.cpp index 6c04f1c0f4857a502cd3455d45e351696d2ff333..d4fa6213a19f4368722a9ddb359c2ee253c0581f 100644 --- a/receiver/unittests/test_request_handler_db_writer.cpp +++ b/receiver/unittests/test_request_handler_db_writer.cpp @@ -65,14 +65,29 @@ class DbWriterHandlerTests : public Test { std::string expected_hostname = "host"; uint64_t expected_port = 1234; uint64_t expected_buf_id = 18446744073709551615ull; + std::string expected_file_name = "2"; + std::string expected_metadata = "meta"; + uint64_t expected_file_size = 10; + uint64_t expected_id = 15; + uint64_t expected_subset_id = 15; + uint64_t expected_subset_size = 2; + uint64_t expected_custom_data[2] {expected_subset_id, expected_subset_size}; void SetUp() override { GenericRequestHeader request_header; request_header.data_id = 2; + request_header.op_code = asapo::Opcode::kOpcodeTransferData; handler.db_client__ = std::unique_ptr<asapo::Database> {&mock_db}; handler.log__ = &mock_logger; mock_request.reset(new NiceMock<MockRequest> {request_header, 1, ""}); + config.broker_db_uri = "127.0.0.1:27017"; + config.source_host = expected_hostname; + config.dataserver.listen_port = expected_port; + SetReceiverConfig(config, "none"); + ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id)); } + void ExpectRequestParams(asapo::Opcode op_code); + FileInfo PrepareFileInfo(); void TearDown() override { handler.db_client__.release(); } @@ -92,13 +107,7 @@ MATCHER_P(CompareFileInfo, file, "") { } -TEST_F(DbWriterHandlerTests, CallsInsert) { - config.broker_db_uri = "127.0.0.1:27017"; - config.source_host = expected_hostname; - config.dataserver.listen_port = expected_port; - - SetReceiverConfig(config, "none"); - +void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code) { EXPECT_CALL(*mock_request, GetBeamtimeId()) .WillOnce(ReturnRef(expected_beamtime_id)) ; @@ -107,14 +116,9 @@ TEST_F(DbWriterHandlerTests, CallsInsert) { .WillOnce(Return(expected_buf_id)) ; - EXPECT_CALL(mock_db, Connect_t(config.broker_db_uri, expected_beamtime_id, expected_collection_name)). WillOnce(testing::Return(nullptr)); - std::string expected_file_name = "2"; - std::string expected_metadata = "meta"; - uint64_t expected_file_size = 10; - uint64_t expected_id = 15; EXPECT_CALL(*mock_request, GetDataSize()) .WillOnce(Return(expected_file_size)) ; @@ -127,11 +131,23 @@ TEST_F(DbWriterHandlerTests, CallsInsert) { .WillOnce(ReturnRef(expected_metadata)) ; - EXPECT_CALL(*mock_request, GetDataID()) .WillOnce(Return(expected_id)) ; + EXPECT_CALL(*mock_request, GetOpCode()) + .WillOnce(Return(op_code)) + ; + + if (op_code == asapo::Opcode::kOpcodeTransferSubsetData) { + EXPECT_CALL(*mock_request, GetCustomData_t()).Times(2). + WillRepeatedly(Return(expected_custom_data)) + ; + } +} + + +FileInfo DbWriterHandlerTests::PrepareFileInfo() { FileInfo file_info; file_info.size = expected_file_size; file_info.name = expected_file_name; @@ -139,6 +155,13 @@ TEST_F(DbWriterHandlerTests, CallsInsert) { file_info.buf_id = expected_buf_id; file_info.source = expected_hostname + ":" + std::to_string(expected_port); file_info.metadata = expected_metadata; + return file_info; +} + +TEST_F(DbWriterHandlerTests, CallsInsert) { + + ExpectRequestParams(asapo::Opcode::kOpcodeTransferData); + auto file_info = PrepareFileInfo(); EXPECT_CALL(mock_db, Insert_t(CompareFileInfo(file_info), _)). WillOnce(testing::Return(nullptr)); @@ -154,4 +177,25 @@ TEST_F(DbWriterHandlerTests, CallsInsert) { handler.ProcessRequest(mock_request.get()); } +TEST_F(DbWriterHandlerTests, CallsInsertSubset) { + + ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData); + auto file_info = PrepareFileInfo(); + + + EXPECT_CALL(mock_db, InsertAsSubset_t(CompareFileInfo(file_info), expected_subset_id, expected_subset_size, _)). + WillOnce(testing::Return(nullptr)); + + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("insert record"), + HasSubstr(config.broker_db_uri), + HasSubstr(expected_beamtime_id), + HasSubstr(expected_collection_name) + ) + ) + ); + + handler.ProcessRequest(mock_request.get()); +} + + } \ No newline at end of file diff --git a/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in index 22b699c17deff35749c9c7ac59d82f8d62b473ae..995984533959e3caedd3d29d64be1cfb83a10c44 100644 --- a/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in +++ b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in @@ -8,5 +8,9 @@ "RootMonitoredFolder":"@ROOT_PATH@test_in", "MonitoredSubFolders":["test1"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } + } diff --git a/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in index 22b699c17deff35749c9c7ac59d82f8d62b473ae..4b7d3fc60d8c8d77e669bba405b93e955a55f03f 100644 --- a/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in +++ b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in @@ -8,5 +8,8 @@ "RootMonitoredFolder":"@ROOT_PATH@test_in", "MonitoredSubFolders":["test1"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } } diff --git a/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt b/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt index d88fa1fe801bb73514b28b13f66061de38624c75..c52597f1ba80ad0367cdef0a9df3bf7b31f8abcd 100644 --- a/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt +++ b/tests/automatic/curl_http_client/curl_http_client_command/CMakeLists.txt @@ -21,7 +21,9 @@ target_link_libraries(${TARGET_NAME} test_common asapo-worker) #add_test_setup_cleanup(${TARGET_NAME}) #add_integration_test(${TARGET_NAME} get_httpbin "GET http://httpbin.org body 200") add_integration_test(${TARGET_NAME} get_badaddress "GET google.com/badaddress found 404") -add_integration_test(${TARGET_NAME} get_badaddress2 "GET 111 clienterror 404") #add_integration_test(${TARGET_NAME} post "POST http://httpbin.org/post data 200") + add_integration_test(${TARGET_NAME} post_badaddress "POST google.com/badaddress found 404") -add_integration_test(${TARGET_NAME} post_badaddress2 "POST 111 clienterror 404") \ No newline at end of file + +#add_integration_test(${TARGET_NAME} get_badaddress2 "GET 111 clienterror 404") +#add_integration_test(${TARGET_NAME} post_badaddress2 "POST 111 clienterror 404") \ No newline at end of file diff --git a/tests/automatic/full_chain/CMakeLists.txt b/tests/automatic/full_chain/CMakeLists.txt index 035c7a4da4e7bc2d3914ed30b0d22e0aacc4cf89..327b3c3edee8a64d4dd4791aae6ee5e718f1e908 100644 --- a/tests/automatic/full_chain/CMakeLists.txt +++ b/tests/automatic/full_chain/CMakeLists.txt @@ -5,5 +5,8 @@ endif() add_subdirectory(simple_chain_metadata) add_subdirectory(two_beamlines) add_subdirectory(simple_chain_filegen) +add_subdirectory(simple_chain_filegen_batches) +add_subdirectory(simple_chain_filegen_multisource) add_subdirectory(simple_chain_filegen_readdata_cache) add_subdirectory(simple_chain_filegen_readdata_file) +add_subdirectory(simple_chain_dataset) \ No newline at end of file diff --git a/tests/automatic/full_chain/simple_chain_dataset/CMakeLists.txt b/tests/automatic/full_chain/simple_chain_dataset/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c66bfad61d973da55b0573a5e1599d753853accb --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_dataset/CMakeLists.txt @@ -0,0 +1,7 @@ +set(TARGET_NAME full_chain_simple_chain_dataset) + +################################ +# Testing +################################ +prepare_asapo() +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME>" nomem) diff --git a/tests/automatic/full_chain/simple_chain_dataset/check_linux.sh b/tests/automatic/full_chain/simple_chain_dataset/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..ce7e80113c3888b7fa4b087da2034acb3708610f --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_dataset/check_linux.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +beamtime_id=asapo_test +token=`$3 token -secret broker_secret.key $beamtime_id` + +monitor_database_name=db_test +proxy_address=127.0.0.1:8400 + +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id} + +Cleanup() { + echo cleanup + rm -rf ${receiver_root_folder} + nomad stop nginx + nomad stop receiver + nomad stop discovery + nomad stop broker + nomad stop authorizer + rm -rf out +# kill $producerid +# echo "db.dropDatabase()" | mongo ${beamtime_id} + influx -execute "drop database ${monitor_database_name}" +} + +echo "db.dropDatabase()" | mongo ${beamtime_id} + +influx -execute "create database ${monitor_database_name}" +echo "db.${beamtime_id}.insert({dummy:1})" | mongo ${beamtime_id} + +nomad run nginx.nmd +nomad run authorizer.nmd +nomad run receiver.nmd +nomad run discovery.nmd +nomad run broker.nmd + +sleep 1 + +#producer +mkdir -p ${receiver_folder} +$1 localhost:8400 ${beamtime_id} 100 100 4 0 100 5 & + + + +$2 ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 5000 1 1 > out +cat out +cat out | grep "Processed 100 dataset(s)" +cat out | grep "with 500 file(s)" + diff --git a/tests/automatic/full_chain/simple_chain_dataset/check_windows.bat b/tests/automatic/full_chain/simple_chain_dataset/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..da570177cc87fe0f0fca6c8091871c9408602026 --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_dataset/check_windows.bat @@ -0,0 +1,51 @@ +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" +SET beamtime_id=asapo_test +SET beamline=test +SET receiver_root_folder=c:\tmp\asapo\receiver\files +SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%" + + +"%3" token -secret broker_secret.key %beamtime_id% > token +set /P token=< token + +set proxy_address="127.0.0.1:8400" + +echo db.%beamtime_id%.insert({dummy:1}) | %mongo_exe% %beamtime_id% + +c:\opt\consul\nomad run receiver.nmd +c:\opt\consul\nomad run authorizer.nmd +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd + +ping 1.0.0.0 -n 10 -w 100 > nul + +REM producer +mkdir %receiver_folder% +start /B "" "%1" %proxy_address% %beamtime_id% 100 100 4 0 100 5 +ping 1.0.0.0 -n 1 -w 100 > nul + +REM worker +"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 5000 1 1 > out.txt +type out.txt +findstr /i /l /c:"Processed 100 dataset(s)" out.txt || goto :error +findstr /i /l /c:"with 500 file(s)" out.txt || goto :error + + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop receiver +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop authorizer +c:\opt\consul\nomad stop nginx +rmdir /S /Q %receiver_root_folder% +del /f token +echo db.dropDatabase() | %mongo_exe% %beamtime_id% + + diff --git a/tests/automatic/full_chain/simple_chain_filegen/test.json.in b/tests/automatic/full_chain/simple_chain_filegen/test.json.in index f072140a4ea55aa15aae487ce7d61cd80219fe31..758d184a81e40e342b9ca3c75118b6447393e10d 100644 --- a/tests/automatic/full_chain/simple_chain_filegen/test.json.in +++ b/tests/automatic/full_chain/simple_chain_filegen/test.json.in @@ -8,5 +8,9 @@ "RootMonitoredFolder":"@ROOT_PATH@test_in", "MonitoredSubFolders":["test1","test2"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } + } diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/CMakeLists.txt b/tests/automatic/full_chain/simple_chain_filegen_batches/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c319a11ec8fbd3ac56918c0879d365c47c277f81 --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_filegen_batches/CMakeLists.txt @@ -0,0 +1,15 @@ +set(TARGET_NAME full_chain_simple_chain_filegen_producer_batches) + +################################ +# Testing +################################ +prepare_asapo() +if (UNIX) + set (ROOT_PATH "/tmp/asapo/") +else() + set (ROOT_PATH "c:\\\\tmp\\\\asapo\\\\") +endif() + +configure_file(test.json.in test.json @ONLY) + +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:event-monitor-producer-bin> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME>" nomem) diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/check_linux.sh b/tests/automatic/full_chain/simple_chain_filegen_batches/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..8e2be578c81d8c3d66c9d73bea28110a6758c9ab --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_filegen_batches/check_linux.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash + +#set -e + +trap Cleanup EXIT + +beamtime_id=asapo_test +token=`$3 token -secret broker_secret.key $beamtime_id` + +monitor_database_name=db_test +proxy_address=127.0.0.1:8400 + +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id} + +mkdir -p /tmp/asapo/test_in/test1/ +mkdir -p /tmp/asapo/test_in/test2/ + +Cleanup() { + echo cleanup + kill $producerid + rm -rf /tmp/asapo/test_in/test1 + rm -rf /tmp/asapo/test_in/test2 + nomad stop nginx + nomad stop receiver + nomad stop discovery + nomad stop broker + nomad stop authorizer + echo "db.dropDatabase()" | mongo ${beamtime_id} + rm -rf out +} + +echo "db.${beamtime_id}.insert({dummy:1})" | mongo ${beamtime_id} + +nomad run nginx.nmd +nomad run authorizer.nmd +nomad run receiver.nmd +nomad run discovery.nmd +nomad run broker.nmd + +sleep 1 + +#producer +mkdir -p ${receiver_folder} +$1 test.json & +producerid=`echo $!` + +sleep 1 + +echo hello > /tmp/asapo/test_in/test1/file1 +echo hello > /tmp/asapo/test_in/test1/file2 +echo hello > /tmp/asapo/test_in/test2/file2 + +$2 ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 2000 1 1 > out +cat out +cat out | grep "Processed 1 dataset(s)" +cat out | grep "with 3 file(s)" diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/check_windows.bat b/tests/automatic/full_chain/simple_chain_filegen_batches/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..41a436e0fdc55ef5ca4407154f53397de72db90e --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_filegen_batches/check_windows.bat @@ -0,0 +1,68 @@ + + +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" +SET beamtime_id=asapo_test +SET beamline=test +SET receiver_root_folder=c:\tmp\asapo\receiver\files +SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%" + +set producer_short_name="%~nx1" + + +"%3" token -secret broker_secret.key %beamtime_id% > token +set /P token=< token + +set proxy_address="127.0.0.1:8400" + +echo db.%beamtime_id%.insert({dummy:1}) | %mongo_exe% %beamtime_id% + +c:\opt\consul\nomad run receiver.nmd +c:\opt\consul\nomad run authorizer.nmd +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd + +ping 1.0.0.0 -n 10 -w 100 > nul + +REM producer +mkdir %receiver_folder% +mkdir c:\tmp\asapo\test_in\test1 +mkdir c:\tmp\asapo\test_in\test2 +start /B "" "%1" test.json + +ping 1.0.0.0 -n 3 -w 100 > nul + +echo hello > c:\tmp\asapo\test_in\test1\file1 +echo hello > c:\tmp\asapo\test_in\test1\file2 +echo hello > c:\tmp\asapo\test_in\test2\file2 + +ping 1.0.0.0 -n 10 -w 100 > nul + + +REM worker +"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 1000 1 1 > out.txt +type out.txt +findstr /i /l /c:"Processed 1 dataset(s)" out.txt || goto :error +findstr /i /l /c:"with 3 file(s)" out.txt || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop receiver +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop authorizer +c:\opt\consul\nomad stop nginx +rmdir /S /Q %receiver_root_folder% +rmdir /S /Q c:\tmp\asapo\test_in\test1 +rmdir /S /Q c:\tmp\asapo\test_in\test2 +Taskkill /IM "%producer_short_name%" /F + +del /f token +echo db.dropDatabase() | %mongo_exe% %beamtime_id% + + diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in new file mode 100644 index 0000000000000000000000000000000000000000..3f25b85fe0a9ffb652ad740b0ed78cb4c0f13aae --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in @@ -0,0 +1,16 @@ +{ + "AsapoEndpoint":"localhost:8400", + "Tag":"test_tag", + "BeamtimeID":"asapo_test", + "Mode":"tcp", + "NThreads":1, + "LogLevel":"debug", + "RootMonitoredFolder":"@ROOT_PATH@test_in", + "MonitoredSubFolders":["test1","test2"], + "IgnoreExtentions":["tmp"], + "RemoveAfterSend":true, + "Subset": { + "Mode":"batch", + "BatchSize":3 + } +} diff --git a/tests/automatic/full_chain/simple_chain_filegen_multisource/CMakeLists.txt b/tests/automatic/full_chain/simple_chain_filegen_multisource/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..16d6a270f0b362ddea8b6180f08e23427ff7599d --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_filegen_multisource/CMakeLists.txt @@ -0,0 +1,18 @@ +set(TARGET_NAME full_chain_simple_chain_filegen_producer_multisource) + +################################ +# Testing +################################ +prepare_asapo() +if (UNIX) + set (ROOT_PATH "/tmp/asapo/") +else() + set (ROOT_PATH "c:\\\\tmp\\\\asapo\\\\") +endif() + +SET (ID 1) +configure_file(test.json.in test1.json @ONLY) +SET (ID 2) +configure_file(test.json.in test2.json @ONLY) + +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:event-monitor-producer-bin> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME>" nomem) diff --git a/tests/automatic/full_chain/simple_chain_filegen_multisource/check_linux.sh b/tests/automatic/full_chain/simple_chain_filegen_multisource/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..dd69a9ad5f90551e4429ed67d286be32d264669e --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_filegen_multisource/check_linux.sh @@ -0,0 +1,64 @@ +#!/usr/bin/env bash + +#set -e + +trap Cleanup EXIT + +beamtime_id=asapo_test +token=`$3 token -secret broker_secret.key $beamtime_id` + +monitor_database_name=db_test +proxy_address=127.0.0.1:8400 + +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id} + +mkdir -p /tmp/asapo/test_in/test1/ +mkdir -p /tmp/asapo/test_in/test2/ + +Cleanup() { + echo cleanup + kill $producerid1 + kill $producerid2 + rm -rf /tmp/asapo/test_in/test1 + rm -rf /tmp/asapo/test_in/test2 + nomad stop nginx + nomad stop receiver + nomad stop discovery + nomad stop broker + nomad stop authorizer + echo "db.dropDatabase()" | mongo ${beamtime_id} + rm -rf out +} + +echo "db.${beamtime_id}.insert({dummy:1})" | mongo ${beamtime_id} + +nomad run nginx.nmd +nomad run authorizer.nmd +nomad run receiver.nmd +nomad run discovery.nmd +nomad run broker.nmd + +sleep 1 + +mkdir -p ${receiver_folder} +#producer1 +$1 test1.json & +producerid1=`echo $!` +#producer2 +$1 test2.json & +producerid2=`echo $!` + + +sleep 1 + +echo hello > /tmp/asapo/test_in/test1/file1 +echo hello > /tmp/asapo/test_in/test1/file2 +echo hello > /tmp/asapo/test_in/test2/file1 +echo hello > /tmp/asapo/test_in/test2/file2 + +$2 ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 2000 1 1 > out +cat out +cat out | grep "Processed 2 dataset(s)" +cat out | grep "with 4 file(s)" diff --git a/tests/automatic/full_chain/simple_chain_filegen_multisource/check_windows.bat b/tests/automatic/full_chain/simple_chain_filegen_multisource/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..a0f958d55ad479319d355039d8f120d734b6589a --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_filegen_multisource/check_windows.bat @@ -0,0 +1,74 @@ + + +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" +SET beamtime_id=asapo_test +SET beamline=test +SET receiver_root_folder=c:\tmp\asapo\receiver\files +SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%" + +set producer_short_name="%~nx1" + + +"%3" token -secret broker_secret.key %beamtime_id% > token +set /P token=< token + +set proxy_address="127.0.0.1:8400" + +echo db.%beamtime_id%.insert({dummy:1}) | %mongo_exe% %beamtime_id% + +c:\opt\consul\nomad run receiver.nmd +c:\opt\consul\nomad run authorizer.nmd +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run broker.nmd +c:\opt\consul\nomad run nginx.nmd + +ping 1.0.0.0 -n 10 -w 100 > nul + +mkdir %receiver_folder% +mkdir c:\tmp\asapo\test_in\test1 +mkdir c:\tmp\asapo\test_in\test2 + +REM producer1 +start /B "" "%1" test1.json + +REM producer2 +start /B "" "%1" test2.json + + +ping 1.0.0.0 -n 3 -w 100 > nul + +echo hello > c:\tmp\asapo\test_in\test1\file1 +echo hello > c:\tmp\asapo\test_in\test1\file2 +echo hello > c:\tmp\asapo\test_in\test2\file1 +echo hello > c:\tmp\asapo\test_in\test2\file2 + +ping 1.0.0.0 -n 10 -w 100 > nul + + +REM worker +"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 1000 1 1 > out.txt +type out.txt +findstr /i /l /c:"Processed 2 dataset(s)" out.txt || goto :error +findstr /i /l /c:"with 4 file(s)" out.txt || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop receiver +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop broker +c:\opt\consul\nomad stop authorizer +c:\opt\consul\nomad stop nginx +rmdir /S /Q %receiver_root_folder% +rmdir /S /Q c:\tmp\asapo\test_in\test1 +rmdir /S /Q c:\tmp\asapo\test_in\test2 +Taskkill /IM "%producer_short_name%" /F + +del /f token +echo db.dropDatabase() | %mongo_exe% %beamtime_id% + + diff --git a/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in new file mode 100644 index 0000000000000000000000000000000000000000..6318341dd71cc62b6ae0d808993f1c2721611027 --- /dev/null +++ b/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in @@ -0,0 +1,17 @@ +{ + "AsapoEndpoint":"localhost:8400", + "Tag":"test_tag", + "BeamtimeID":"asapo_test", + "Mode":"tcp", + "NThreads":1, + "LogLevel":"debug", + "RootMonitoredFolder":"@ROOT_PATH@test_in", + "MonitoredSubFolders":["test@ID@"], + "IgnoreExtentions":["tmp"], + "RemoveAfterSend":true, + "Subset": { + "Mode":"multisource", + "SourceId":@ID@, + "NSources":2 + } +} diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in index f072140a4ea55aa15aae487ce7d61cd80219fe31..b7b1f67605fd45d7047f0706cc73bd964ae9869a 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in +++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in @@ -8,5 +8,9 @@ "RootMonitoredFolder":"@ROOT_PATH@test_in", "MonitoredSubFolders":["test1","test2"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } + } diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in index f072140a4ea55aa15aae487ce7d61cd80219fe31..b7b1f67605fd45d7047f0706cc73bd964ae9869a 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in +++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in @@ -8,5 +8,9 @@ "RootMonitoredFolder":"@ROOT_PATH@test_in", "MonitoredSubFolders":["test1","test2"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } + } diff --git a/tests/automatic/producer/file_monitor_producer/test.json.in b/tests/automatic/producer/file_monitor_producer/test.json.in index ca77f2d906a2136ff200ae42ad8a6b0feec1c7c5..088681a0ddc231f34ebd9c66b3a5809150f0bbc8 100644 --- a/tests/automatic/producer/file_monitor_producer/test.json.in +++ b/tests/automatic/producer/file_monitor_producer/test.json.in @@ -8,5 +8,9 @@ "RootMonitoredFolder":"@ROOT_PATH@test_in", "MonitoredSubFolders":["test1","test2"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } + } diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt index ad36394912da0e110b2707f73e0270452030d41f..8c02b473599ca3b68bf5f85a8432ce7dd244c559 100644 --- a/tests/automatic/producer_receiver/CMakeLists.txt +++ b/tests/automatic/producer_receiver/CMakeLists.txt @@ -1,4 +1,6 @@ add_subdirectory(transfer_single_file) +add_subdirectory(transfer_datasets) + if (UNIX) add_subdirectory(check_monitoring) endif() \ No newline at end of file diff --git a/tests/automatic/producer_receiver/transfer_datasets/CMakeLists.txt b/tests/automatic/producer_receiver/transfer_datasets/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..b29b867f1725f0f4b58f613105a3ceab059e5201 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_datasets/CMakeLists.txt @@ -0,0 +1,7 @@ +set(TARGET_NAME transfer-datasets) + +################################ +# Testing +################################ +prepare_asapo() +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem) diff --git a/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..4953e902b7bb7f4cafc09bdc8ba1247f83b08b90 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +database_name=db_test +beamtime_id=asapo_test +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id} + +Cleanup() { + echo cleanup + rm -rf ${receiver_root_folder} + nomad stop receiver + nomad stop discovery + nomad stop authorizer + nomad stop nginx + echo "db.dropDatabase()" | mongo ${beamtime_id} + influx -execute "drop database ${database_name}" +} + +echo "db.dropDatabase()" | mongo ${beamtime_id} + + +influx -execute "create database ${database_name}" +# create db before worker starts reading it. todo: git rid of it +echo "db.${beamtime_id}.insert({dummy:1})" | mongo ${beamtime_id} + +nomad run authorizer.nmd +nomad run nginx.nmd +nomad run receiver.nmd +nomad run discovery.nmd + +mkdir -p ${receiver_folder} + +$1 localhost:8400 ${beamtime_id} 100 1 1 0 30 3 + +ls -ln ${receiver_folder}/1_1 | awk '{ print $5 }'| grep 100000 +ls -ln ${receiver_folder}/1_2 | awk '{ print $5 }'| grep 100000 +ls -ln ${receiver_folder}/1_3 | awk '{ print $5 }'| grep 100000 + +echo 'db.data.find({"images._id":{$gt:0}},{"images.name":1})' | mongo asapo_test | grep 1_1 | grep 1_2 | grep 1_3 \ No newline at end of file diff --git a/tests/automatic/producer_receiver/transfer_datasets/check_windows.bat b/tests/automatic/producer_receiver/transfer_datasets/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..673f1188ac19f55997b3cca7e0c14b7d59ae7810 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_datasets/check_windows.bat @@ -0,0 +1,52 @@ +SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" +SET beamtime_id=asapo_test +SET beamline=test +SET receiver_root_folder=c:\tmp\asapo\receiver\files +SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%" + + +echo db.%beamtime_id%.insert({dummy:1})" | %mongo_exe% %beamtime_id% + + +c:\opt\consul\nomad run receiver.nmd +c:\opt\consul\nomad run authorizer.nmd +c:\opt\consul\nomad run discovery.nmd +c:\opt\consul\nomad run nginx.nmd + +ping 1.0.0.0 -n 1 -w 100 > nul + +mkdir %receiver_folder% + +"%1" localhost:8400 %beamtime_id% 100 1 1 0 30 3 + +ping 1.0.0.0 -n 1 -w 100 > nul + +FOR /F "usebackq" %%A IN ('%receiver_folder%\1_1') DO set size=%%~zA +if %size% NEQ 100000 goto :error + +FOR /F "usebackq" %%A IN ('%receiver_folder%\1_2') DO set size=%%~zA +if %size% NEQ 100000 goto :error + +FOR /F "usebackq" %%A IN ('%receiver_folder%\1_3') DO set size=%%~zA +if %size% NEQ 100000 goto :error + + +echo db.data.find({"images._id":{$gt:0}},{"images.name":1}) | %mongo_exe% %beamtime_id% | findstr 1_1 || goto :error +echo db.data.find({"images._id":{$gt:0}},{"images.name":1}) | %mongo_exe% %beamtime_id% | findstr 1_2 || goto :error +echo db.data.find({"images._id":{$gt:0}},{"images.name":1}) | %mongo_exe% %beamtime_id% | findstr 1_3 || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +c:\opt\consul\nomad stop receiver +c:\opt\consul\nomad stop discovery +c:\opt\consul\nomad stop nginx +c:\opt\consul\nomad stop authorizer +rmdir /S /Q %receiver_root_folder% +echo db.dropDatabase() | %mongo_exe% %beamtime_id% + + diff --git a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp index ecacc919da9e5e29112383e56f04bca66f292442..b4d087649283716645346d70a11d0739cdf34ee0 100644 --- a/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp +++ b/tests/automatic/worker/next_multithread_broker/next_multithread_broker.cpp @@ -47,7 +47,7 @@ Args GetArgs(int argc, char* argv[]) { return Args{server, source_name, token, nthreads, nfiles}; } -void GetAllFromBroker(const Args& args) { +void TestAll(const Args& args) { asapo::Error err; auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", args.run_name, args.token, &err); auto group_id = broker->GenerateNewGroupId(&err); @@ -76,6 +76,6 @@ int main(int argc, char* argv[]) { auto args = GetArgs(argc, argv); - GetAllFromBroker(args); + TestAll(args); return 0; } diff --git a/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp b/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp index 434fc7ea0d437e8674565ba774805ce7265df888..f9e4930eaa5d123e2017359f76a8506ccf4d5a89 100644 --- a/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp +++ b/tests/automatic/worker/next_multithread_folder/next_multithread_folder.cpp @@ -38,7 +38,7 @@ Args GetArgs(int argc, char* argv[]) { return Args{folder, nthreads, nattempts}; } -void GetAllFromBroker(const Args& args) { +void TestAll(const Args& args) { asapo::Error err; auto broker = asapo::DataBrokerFactory::CreateFolderBroker(args.folder, &err); broker->Connect(); @@ -64,7 +64,7 @@ int main(int argc, char* argv[]) { auto args = GetArgs(argc, argv); for (int nattempt = 0; nattempt < args.nattempts; nattempt++) { - GetAllFromBroker(args); + TestAll(args); } return 0; } diff --git a/tests/automatic/worker/worker_api/check_linux.sh b/tests/automatic/worker/worker_api/check_linux.sh index ce1cf9b705f32404777c2755810198af0eb5ba2b..189a40720e074ea211cee6625ad1bd689696385f 100644 --- a/tests/automatic/worker/worker_api/check_linux.sh +++ b/tests/automatic/worker/worker_api/check_linux.sh @@ -13,6 +13,7 @@ Cleanup() { nomad stop discovery nomad stop broker echo "db.dropDatabase()" | mongo ${database_name} + rm -f 1_1 1 } @@ -24,9 +25,30 @@ sleep 1 for i in `seq 1 10`; do - echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} done -$@ 127.0.0.1:8400 $database_name $token_test_run +echo hello1 > 1 +$@ 127.0.0.1:8400 $database_name $token_test_run single + +#check datasets +echo "db.dropDatabase()" | mongo ${database_name} + +sleep 1 + +for i in `seq 1 10`; +do + images='' + for j in `seq 1 3`; + do + images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}" + done + images=${images#?} + echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name} +done + +echo hello1 > 1_1 + +$@ 127.0.0.1:8400 $database_name $token_test_run datasets diff --git a/tests/automatic/worker/worker_api/check_windows.bat b/tests/automatic/worker/worker_api/check_windows.bat index 39c517c0c0f9a9cd8f66eda8a9796065f002c3f3..b189f66b4cade9dc9cc31339d646e6b544dab993 100644 --- a/tests/automatic/worker/worker_api/check_windows.bat +++ b/tests/automatic/worker/worker_api/check_windows.bat @@ -10,9 +10,20 @@ c:\opt\consul\nomad run nginx.nmd ping 1.0.0.0 -n 10 -w 100 > nul -for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error -%1 127.0.0.1:8400 %database_name% %token_test_run% || goto :error +echo hello1 > 1 + + +%1 127.0.0.1:8400 %database_name% %token_test_run% single || goto :error + +echo db.dropDatabase() | %mongo_exe% %database_name% + +for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":6,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":6,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error + +echo hello1 > 1_1 + +%1 127.0.0.1:8400 %database_name% %token_test_run% datasets || goto :error goto :clean @@ -25,3 +36,5 @@ c:\opt\consul\nomad stop discovery c:\opt\consul\nomad stop broker c:\opt\consul\nomad stop nginx echo db.dropDatabase() | %mongo_exe% %database_name% +del "1 1_1" + diff --git a/tests/automatic/worker/worker_api/worker_api.cpp b/tests/automatic/worker/worker_api/worker_api.cpp index 820c2afb6b42b03b219ba6cf3aff14313869ff27..e2f56e101e0c0f85d9edfd12d13d82d9d5358ab1 100644 --- a/tests/automatic/worker/worker_api/worker_api.cpp +++ b/tests/automatic/worker/worker_api/worker_api.cpp @@ -13,26 +13,27 @@ struct Args { std::string server; std::string run_name; std::string token; + std::string datasets; }; Args GetArgs(int argc, char* argv[]) { - if (argc != 4) { + if (argc != 5) { std::cout << "Wrong number of arguments" << std::endl; exit(EXIT_FAILURE); } std::string server{argv[1]}; std::string source_name{argv[2]}; std::string token{argv[3]}; + std::string datasets{argv[4]}; - return Args{server, source_name, token}; + return Args{server, source_name, token, datasets}; } -void GetAllFromBroker(const Args& args) { - asapo::Error err; - auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, "dummy", args.run_name, args.token, &err); - broker->SetTimeout(100); - auto group_id = broker->GenerateNewGroupId(&err); + +void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::string& group_id) { asapo::FileInfo fi; + asapo::Error err; + err = broker->GetNext(&fi, group_id, nullptr); if (err) { std::cout << err->Explain() << std::endl; @@ -41,6 +42,12 @@ void GetAllFromBroker(const Args& args) { M_AssertTrue(fi.name == "1", "GetNext filename"); M_AssertTrue(fi.metadata == "{\"test\":10}", "GetNext metadata"); + asapo::FileData data; + err = broker->RetrieveData(&fi, &data); + M_AssertTrue(err == nullptr, "RetrieveData no error"); + M_AssertEq("hello1", std::string(data.get(), data.get() + fi.size)); + + err = broker->GetLast(&fi, group_id, nullptr); M_AssertTrue(err == nullptr, "GetLast no error"); M_AssertTrue(fi.name == "10", "GetLast filename"); @@ -99,10 +106,65 @@ void GetAllFromBroker(const Args& args) { M_AssertTrue(images.size() == 0, "size of query answer 5"); } + +void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::string& group_id) { + asapo::FileInfo fi; + asapo::Error err; + + auto dataset = broker->GetNextDataset(group_id, &err); + if (err) { + std::cout << err->Explain() << std::endl; + } + M_AssertTrue(err == nullptr, "GetNextDataSet no error"); + M_AssertTrue(dataset.content.size() == 3, "GetNextDataSet size"); + M_AssertTrue(dataset.content[0].name == "1_1", "GetNextDataSet filename"); + M_AssertTrue(dataset.content[2].name == "1_3", "GetNextDataSet filename"); + M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetNext metadata"); + + asapo::FileData data; + err = broker->RetrieveData(&dataset.content[0], &data); + M_AssertTrue(err == nullptr, "RetrieveData no error"); + M_AssertEq("hello1", std::string(data.get(), data.get() + dataset.content[0].size)); + + + dataset = broker->GetLastDataset(group_id, &err); + M_AssertTrue(err == nullptr, "GetLast no error"); + M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename"); + M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetLastDataset metadata"); + + dataset = broker->GetNextDataset(group_id, &err); + M_AssertTrue(err != nullptr, "GetNextDataset2 error"); + + dataset = broker->GetLastDataset(group_id, &err); + M_AssertTrue(err == nullptr, "GetLastDataset2 no error"); + + dataset = broker->GetDatasetById(8, group_id, &err); + M_AssertTrue(err == nullptr, "GetDatasetById error"); + M_AssertTrue(dataset.content[2].name == "8_3", "GetDatasetById filename"); + +} + +void TestAll(const Args& args) { + asapo::Error err; + auto broker = asapo::DataBrokerFactory::CreateServerBroker(args.server, ".", args.run_name, args.token, &err); + broker->SetTimeout(100); + auto group_id = broker->GenerateNewGroupId(&err); + + if (args.datasets == "single") { + TestSingle(broker, group_id); + } + if (args.datasets == "dataset") { + TestDataset(broker, group_id); + } + +} + + + int main(int argc, char* argv[]) { auto args = GetArgs(argc, argv); - GetAllFromBroker(args); + TestAll(args); return 0; } diff --git a/tests/automatic/worker/worker_api_python/check_linux.sh b/tests/automatic/worker/worker_api_python/check_linux.sh index f7a4eb4785ff63e6762663d69bc36a0182b26df9..c1a3b854a912a29587d0dd42fab525bb7e14818f 100644 --- a/tests/automatic/worker/worker_api_python/check_linux.sh +++ b/tests/automatic/worker/worker_api_python/check_linux.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -source_path=dummy +source_path=. database_name=test_run token_test_run=K38Mqc90iRv8fC7prcFHd994mF_wfUiJnWBfIjIzieo= set -e @@ -15,24 +15,44 @@ Cleanup() { nomad stop discovery nomad stop broker echo "db.dropDatabase()" | mongo ${database_name} + rm 1 1_1 } nomad run nginx.nmd nomad run discovery.nmd nomad run broker.nmd +echo hello1 > 1 +echo hello1 > 1_1 + + for i in `seq 1 5`; do - echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} + echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} done sleep 1 export PYTHONPATH=$1:${PYTHONPATH} -python worker_api.py 127.0.0.1:8400 $source_path $database_name $token_test_run +python worker_api.py 127.0.0.1:8400 $source_path $database_name $token_test_run single + +#check datasets +echo "db.dropDatabase()" | mongo ${database_name} +sleep 1 +for i in `seq 1 10`; +do + images='' + for j in `seq 1 3`; + do + images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}" + done + images=${images#?} + echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name} +done +python worker_api.py 127.0.0.1:8400 $source_path $database_name $token_test_run datasets diff --git a/tests/automatic/worker/worker_api_python/check_windows.bat b/tests/automatic/worker/worker_api_python/check_windows.bat index 1e25519831a72d8b6181b2afb29ffa0edb9f5654..ff3442c7698e0c4006e7945d500c5a5bdec0d8e1 100644 --- a/tests/automatic/worker/worker_api_python/check_windows.bat +++ b/tests/automatic/worker/worker_api_python/check_windows.bat @@ -1,4 +1,4 @@ -SET source_path=dummy +SET source_path=. SET database_name=test_run SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe" @@ -10,11 +10,22 @@ c:\opt\consul\nomad run nginx.nmd ping 1.0.0.0 -n 10 -w 100 > nul -for /l %%x in (1, 1, 5) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error +for /l %%x in (1, 1, 5) do echo db.data.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name% || goto :error set PYTHONPATH=%1 -python worker_api.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% || goto :error +echo hello1 > 1 +echo hello1 > 1_1 + + +python worker_api.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% single || goto :error + +echo db.dropDatabase() | %mongo_exe% %database_name% + +for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":6,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":6,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name% || goto :error + +python worker_api.py 127.0.0.1:8400 %source_path% %database_name% %token_test_run% datasets || goto :error + goto :clean @@ -27,3 +38,4 @@ c:\opt\consul\nomad stop discovery c:\opt\consul\nomad stop broker c:\opt\consul\nomad stop nginx echo db.dropDatabase() | %mongo_exe% %database_name% +del "1 1_1" diff --git a/tests/automatic/worker/worker_api_python/worker_api.py b/tests/automatic/worker/worker_api_python/worker_api.py index 8f3e1bf0f2b0e22b95e6e80859be1f41f4aee760..72cfdc32d577478cc6507236a0f1dd2d087acb65 100644 --- a/tests/automatic/worker/worker_api_python/worker_api.py +++ b/tests/automatic/worker/worker_api_python/worker_api.py @@ -35,76 +35,123 @@ def assert_eq(val,expected,name): print ('val: ', val,' expected: ',expected) sys.exit(1) +def check_single(broker,group_id_new): + _, meta, err = broker.get_next(group_id_new, meta_only=True) + assert_noterr(err, "get_next") + assert_metaname(meta,"1","get next1") + assert_usermetadata(meta,"get next1") -source, path, beamtime, token = sys.argv[1:] + data, err = broker.retrieve_data(meta) + assert_eq(data.tostring().decode("utf-8"),"hello1","retrieve_data data") + assert_noterr(err, "retrieve_data err") -broker, err = asapo_worker.create_server_broker(source,path, beamtime,token,1000) + _, meta, err = broker.get_next(group_id_new, meta_only=True) + assert_noterr(err, "get_next2") + assert_metaname(meta,"2","get next2") + assert_usermetadata(meta,"get next2") + + _, meta, err = broker.get_last(group_id_new, meta_only=True) + assert_noterr(err, "get_last1") + assert_metaname(meta,"5","get last1") + assert_usermetadata(meta,"get last1") + + _, meta, err = broker.get_next(group_id_new, meta_only=True) + assert_err(err, "get_next3") + + size,err = broker.get_ndatasets() + assert_noterr(err, "get_ndatasets") + assert_eq(size,5,"get_ndatasets") + + + err = broker.reset_counter(group_id_new) + assert_noterr(err, "reset_counter") + + _, meta, err = broker.get_next(group_id_new, meta_only=True) + assert_noterr(err, "get_next4") + assert_metaname(meta,"1","get next4") + assert_usermetadata(meta,"get next4") -group_id_new, err = broker.generate_group_id() -assert_noterr(err, "generate_group") -_, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_noterr(err, "get_next") -assert_metaname(meta,"1","get next1") -assert_usermetadata(meta,"get next1") + _, meta, err = broker.get_by_id(3, group_id_new, meta_only=True) + assert_noterr(err, "get_by_id") + assert_metaname(meta,"3","get get_by_id") + assert_usermetadata(meta,"get get_by_id") + _, meta, err = broker.get_next(group_id_new, meta_only=True) + assert_noterr(err, "get_next5") + assert_metaname(meta,"4","get next5") + assert_usermetadata(meta,"get next5") -_, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_noterr(err, "get_next2") -assert_metaname(meta,"2","get next2") -assert_usermetadata(meta,"get next2") -_, meta, err = broker.get_last(group_id_new, meta_only=True) -assert_noterr(err, "get_last1") -assert_metaname(meta,"5","get last1") -assert_usermetadata(meta,"get last1") + images,err = broker.query_images("meta.test = 10") + assert_noterr(err, "query1") + assert_eq(len(images),5,"size of query answer 1") + for image in images: + assert_usermetadata(image,"query_images") -_, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_err(err, "get_next3") -size,err = broker.get_ndatasets() -assert_noterr(err, "get_ndatasets") -assert_eq(size,5,"get_ndatasets") + images,err = broker.query_images("meta.test = 10 AND name='1'") + assert_eq(len(images),1,"size of query answer 2 ") + assert_noterr(err, "query2") + for image in images: + assert_usermetadata(image,"query_images") -err = broker.reset_counter(group_id_new) -assert_noterr(err, "reset_counter") + images,err = broker.query_images("meta.test = 11") + assert_eq(len(images),0,"size of query answer 3 ") + assert_noterr(err, "query3") -_, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_noterr(err, "get_next4") -assert_metaname(meta,"1","get next4") -assert_usermetadata(meta,"get next4") + images,err = broker.query_images("bla") + assert_err(err, "wrong query") +def check_dataset(broker,group_id_new): + id, metas, err = broker.get_next_dataset(group_id_new) + assert_noterr(err, "get_next_dataset1") + assert_eq(id,1,"get_next_dataset1") + assert_metaname(metas[0],"1_1","get nextdataset1 name1") + assert_metaname(metas[1],"1_2","get nextdataset1 name2") + assert_usermetadata(metas[0],"get nextdataset1 meta") -_, meta, err = broker.get_by_id(3, group_id_new, meta_only=True) -assert_noterr(err, "get_by_id") -assert_metaname(meta,"3","get get_by_id") -assert_usermetadata(meta,"get get_by_id") + data, err = broker.retrieve_data(metas[0]) + assert_eq(data.tostring().decode("utf-8"),"hello1","retrieve_data from dataset data") + assert_noterr(err, "retrieve_data from dataset err") -_, meta, err = broker.get_next(group_id_new, meta_only=True) -assert_noterr(err, "get_next5") -assert_metaname(meta,"4","get next5") -assert_usermetadata(meta,"get next5") + id, metas, err = broker.get_next_dataset(group_id_new) + assert_noterr(err, "get_next_dataset2") + assert_eq(id,2,"get_next_dataset2") + assert_metaname(metas[0],"2_1","get nextdataset2 name1") -images,err = broker.query_images("meta.test = 10") -assert_noterr(err, "query1") -assert_eq(len(images),5,"size of query answer 1") -for image in images: - assert_usermetadata(image,"query_images") + id, metas, err = broker.get_last_dataset(group_id_new) + assert_noterr(err, "get_last_dataset1") + assert_eq(id,10,"get_last_dataset1") + assert_metaname(metas[2],"10_3","get get_last_dataset1 name3") + id, metas, err = broker.get_next_dataset(group_id_new) + assert_eq(id,None,"get_next_dataset3 id") + assert_eq(id,metas,"get_next_dataset3 metas") + assert_err(err, "get_next_dataset3 err") -images,err = broker.query_images("meta.test = 10 AND name='1'") -assert_eq(len(images),1,"size of query answer 2 ") -assert_noterr(err, "query2") + id, metas, err = broker.get_dataset_by_id(8,group_id_new) + assert_eq(id,8,"get_dataset_by_id1 id") + assert_noterr(err, "get_dataset_by_id1 err") + assert_metaname(metas[2],"8_3","get get_dataset_by_id1 name3") + + id, metas, err = broker.get_next_dataset(group_id_new) + assert_eq(id,9,"get_next_dataset4 id") + + +source, path, beamtime, token, mode = sys.argv[1:] + +broker, err = asapo_worker.create_server_broker(source,path, beamtime,token,1000) + +group_id_new, err = broker.generate_group_id() +assert_noterr(err, "generate_group") -for image in images: - assert_usermetadata(image,"query_images") -images,err = broker.query_images("meta.test = 11") -assert_eq(len(images),0,"size of query answer 3 ") -assert_noterr(err, "query3") +if mode == "single": + check_single(broker,group_id_new) -images,err = broker.query_images("bla") -assert_err(err, "wrong query") +if mode == "datasets": + check_dataset(broker,group_id_new) diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in index ed3c4f477dd0231d73ed6e628a144059d0ee2fc1..e40fe1738f8fd6bd9f627200769a1cc60afab290 100644 --- a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in +++ b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in @@ -39,7 +39,10 @@ job "asapo-produceronly" { "RootMonitoredFolder":"c:\\tmp\\asapo\\test_in", "MonitoredSubFolders":["test_folder"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } } EOH destination = "local/test.json" @@ -95,7 +98,11 @@ job "asapo-produceronly" { "RootMonitoredFolder":"/tmp/asapo/test_in", "MonitoredSubFolders":["test_folder"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } + } EOH destination = "local/test.json" diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad.in index a716b6384f42bb20d7712f2b98dfc5776d1ef51c..b9c374bcde07d325cdcb0418980790225dfca88a 100644 --- a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad.in +++ b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad.in @@ -39,7 +39,10 @@ job "asapo-filemon-producer" { "RootMonitoredFolder":"u:\\asapo", "MonitoredSubFolders":["test_folder"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } } EOH destination = "local/test.json" @@ -95,7 +98,11 @@ job "asapo-filemon-producer" { "RootMonitoredFolder":"/run/user", "MonitoredSubFolders":["data"], "IgnoreExtentions":["tmp"], - "RemoveAfterSend":true + "RemoveAfterSend":true, + "Subset": { + "Mode":"none" + } + } EOH destination = "local/test.json" diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index 5f2358a52326a55e6502e31ce68f2fb98fd49438..4d6fbac02acbc29dc1aea655014cc80bff1156b5 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -55,10 +55,46 @@ class DataBroker { */ virtual Error GetNext(FileInfo* info, std::string group_id, FileData* data) = 0; + //! Retrieves image using fileinfo. + /*! + \param info - image metadata to use, can be updated after operation + \param data - where to store image data. Can be set to nullptr only image metadata is needed. + \return Error if data is nullptr or data cannot be read, nullptr otherwise. + */ + virtual Error RetrieveData(FileInfo* info, FileData* data) = 0; + + + //! Receive next available completed dataset. + /*! + \param err - will be set to error data cannot be read, nullptr otherwise. + \param group_id - group id to use. + \return DataSet - information about the dataset + */ + virtual DataSet GetNextDataset(std::string group_id, Error* err) = 0; + + //! Receive last available completed dataset. + /*! + \param err - will be set to error data cannot be read, nullptr otherwise. + \param group_id - group id to use. + \return DataSet - information about the dataset + */ + virtual DataSet GetLastDataset(std::string group_id, Error* err) = 0; + //! Receive dataset by id. /*! \param id - dataset id + \param err - will be set to error data cannot be read or dataset is incomplete, nullptr otherwise. + \param group_id - group id to use. + \return DataSet - information about the dataset + */ + virtual DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) = 0; + + + + //! Receive single image by id. + /*! + \param id - image id \param info - where to store image metadata. Can be set to nullptr only image data is needed. \param data - where to store image data. Can be set to nullptr only image metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index f64f140bbc15986971c48b86c10ad0c2ad33ef71..cdc0fb1fa518392d5463e3f9632e6b5b85960089 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -44,22 +44,31 @@ Error FolderDataBroker::CanGetData(FileInfo* info, FileData* data, uint64_t nfil return nullptr; } + +Error FolderDataBroker::RetrieveData(FileInfo* info, FileData* data) { + if (data == nullptr || info == nullptr ) { + return TextError("pointers are empty"); + } + + Error error; + *data = io__->GetDataFromFile(info->FullName(base_path_), &info->size, &error); + return error; +} + + Error FolderDataBroker::GetFileByIndex(uint64_t nfile_to_get, FileInfo* info, FileData* data) { auto err = CanGetData(info, data, nfile_to_get); if (err != nullptr) { return err; } - *info = filelist_[(size_t)nfile_to_get]; + *info = filelist_[(size_t) nfile_to_get]; if (data == nullptr) { return nullptr; } - Error error; - *data = io__->GetDataFromFile(info->FullName(base_path_), &info->size, &error); - - return error; + return RetrieveData(info, data); } @@ -104,4 +113,18 @@ FileInfos FolderDataBroker::QueryImages(std::string query, Error* err) { return FileInfos{}; } +DataSet FolderDataBroker::GetNextDataset(std::string group_id, Error* err) { + *err = TextError("Not supported for folder data broker"); + return {0, FileInfos{}}; +} +DataSet FolderDataBroker::GetLastDataset(std::string group_id, Error* err) { + *err = TextError("Not supported for folder data broker"); + return {0, FileInfos{}}; +} +DataSet FolderDataBroker::GetDatasetById(uint64_t id, std::string group_id, Error* err) { + *err = TextError("Not supported for folder data broker"); + return {0, FileInfos{}}; +} + + } diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index 9ffb04d3b04beeebd4339fa09250e89c40c9dc25..372ef2d514535e7bad5a0c82c348abdd403895dc 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -25,6 +25,10 @@ class FolderDataBroker final : public asapo::DataBroker { Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override; std::unique_ptr<asapo::IO> io__; // modified in testings to mock system calls,otherwise do not touch FileInfos QueryImages(std::string query, Error* err) override; + DataSet GetNextDataset(std::string group_id, Error* err) override; + DataSet GetLastDataset(std::string group_id, Error* err) override; + DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override; + Error RetrieveData(FileInfo* info, FileData* data) override; private: std::string base_path_; bool is_connected_; diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index f20d00482245086eeea3616648c785349128cebb..8afacb830ecc1e7cd7786d9b9e1bb4dd611c21de 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -114,24 +114,27 @@ Error ServerDataBroker::GetBrokerUri() { } -Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op) { +Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, GetImageServerOperation op, + bool dataset) { std::string request_suffix = OpToUriCmd(op); std::string request_api = "/database/" + source_name_ + "/" + std::move(group_id) + "/"; uint64_t elapsed_ms = 0; - std::string response; while (true) { auto err = GetBrokerUri(); if (err == nullptr) { RequestInfo ri; ri.host = current_broker_uri_; ri.api = request_api + request_suffix; - err = ProcessRequest(&response, ri); + if (dataset) { + ri.extra_params = "&dataset=true"; + } + err = ProcessRequest(response, ri); if (err == nullptr) { break; } } - ProcessServerError(&err, response, &request_suffix); + ProcessServerError(&err, *response, &request_suffix); if (elapsed_ms >= timeout_ms_) { err = IOErrorTemplates::kTimeout.Generate( ", last error: " + err->Explain()); @@ -141,9 +144,6 @@ Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, std::string group_ elapsed_ms += 100; } - if (!info->SetFromJson(response)) { - return WorkerErrorTemplates::kErrorReadingSource.Generate(std::string(":") + response); - } return nullptr; } @@ -174,22 +174,26 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, uint64_t } Error err; + std::string response; if (op == GetImageServerOperation::GetID) { - err = GetFileInfoFromServerById(id, info, std::move(group_id)); + err = GetRecordFromServerById(id, &response, std::move(group_id)); } else { - err = GetFileInfoFromServer(info, std::move(group_id), op); + err = GetRecordFromServer(&response, std::move(group_id), op); } - if (err != nullptr) { return err; } + if (!info->SetFromJson(response)) { + return WorkerErrorTemplates::kErrorReadingSource.Generate(std::string(":") + response); + } + return GetDataIfNeeded(info, data); } -Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) { - if (data == nullptr) { - return nullptr; +Error ServerDataBroker::RetrieveData(FileInfo* info, FileData* data) { + if (data == nullptr || info == nullptr ) { + return TextError("pointers are empty"); } if (DataCanBeInBuffer(info)) { @@ -205,6 +209,16 @@ Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) { return error; } + +Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) { + if (data == nullptr) { + return nullptr; + } + + return RetrieveData(info, data); + +} + bool ServerDataBroker::DataCanBeInBuffer(const FileInfo* info) { return info->buf_id > 0; } @@ -278,24 +292,18 @@ Error ServerDataBroker::GetById(uint64_t id, FileInfo* info, std::string group_i } -Error ServerDataBroker::GetFileInfoFromServerById(uint64_t id, FileInfo* info, std::string group_id) { - +Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id, + bool dataset) { RequestInfo ri; ri.api = "/database/" + source_name_ + "/" + std::move(group_id) + "/" + std::to_string(id); ri.extra_params = "&reset=true"; - - - Error err; - auto responce = BrokerRequestWithTimeout(ri, &err); - if (err) { - return err; - } - - if (!info->SetFromJson(responce)) { - return WorkerErrorTemplates::kErrorReadingSource.Generate(); + if (dataset) { + ri.extra_params += "&dataset=true"; } - return nullptr; + Error err; + *response = BrokerRequestWithTimeout(ri, &err); + return err; } std::string ServerDataBroker::GetBeamtimeMeta(Error* err) { @@ -306,13 +314,17 @@ std::string ServerDataBroker::GetBeamtimeMeta(Error* err) { } -FileInfos ServerDataBroker::DecodeFromResponse(std::string response, Error* err) { - auto parser = JsonStringParser("{ \"images\":" + response + "}"); +DataSet ServerDataBroker::DecodeDatasetFromResponse(std::string response, Error* err) { + auto parser = JsonStringParser(std::move(response)); std::vector<std::string> vec_fi_endcoded; - auto parse_err = parser.GetArrayRawStrings("images", &vec_fi_endcoded); + Error parse_err; + uint64_t id; + (parse_err = parser.GetArrayRawStrings("images", &vec_fi_endcoded)) || + (parse_err = parser.GetUInt64("_id", &id)); if (parse_err) { *err = WorkerErrorTemplates::kInternalError.Generate("cannot parse response:" + parse_err->Explain()); + return {0, FileInfos{}}; } auto res = FileInfos{}; @@ -320,11 +332,11 @@ FileInfos ServerDataBroker::DecodeFromResponse(std::string response, Error* err) FileInfo fi; if (!fi.SetFromJson(fi_encoded)) { *err = WorkerErrorTemplates::kInternalError.Generate("cannot parse response:" + fi_encoded); - return FileInfos{}; + return {0, FileInfos{}}; } res.emplace_back(fi); } - return res; + return {id, std::move(res)}; } @@ -340,7 +352,37 @@ FileInfos ServerDataBroker::QueryImages(std::string query, Error* err) { return FileInfos{}; } - return DecodeFromResponse(response, err); + auto dataset = DecodeDatasetFromResponse("{\"_id\":0, \"images\":" + response + "}", err); + return dataset.content; +} + +DataSet ServerDataBroker::GetNextDataset(std::string group_id, Error* err) { + return GetDatasetFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), err); +} + +DataSet ServerDataBroker::GetDatasetFromServer(GetImageServerOperation op, + uint64_t id, + std::string group_id, + Error* err) { + FileInfos infos; + std::string response; + if (op == GetImageServerOperation::GetID) { + *err = GetRecordFromServerById(id, &response, std::move(group_id), true); + } else { + *err = GetRecordFromServer(&response, std::move(group_id), op, true); + } + if (*err != nullptr) { + return {0, FileInfos{}}; + } + return DecodeDatasetFromResponse(response, err); +} +DataSet ServerDataBroker::GetLastDataset(std::string group_id, Error* err) { + return GetDatasetFromServer(GetImageServerOperation::GetLast, 0, std::move(group_id), err); +} + +DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, Error* err) { + return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), err); } + } diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index 4f90f37ec4675dee3a6785a931a77b8112f280ab..3676878ba14698e05ed0f555997288dadb92a787 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -38,24 +38,29 @@ class ServerDataBroker final : public asapo::DataBroker { Error GetById(uint64_t id, FileInfo* info, std::string group_id, FileData* data) override; void SetTimeout(uint64_t timeout_ms) override; FileInfos QueryImages(std::string query, Error* err) override; + DataSet GetNextDataset(std::string group_id, Error* err) override; + DataSet GetLastDataset(std::string group_id, Error* err) override; + DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override; + Error RetrieveData(FileInfo* info, FileData* data) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch std::unique_ptr<HttpClient> httpclient__; std::unique_ptr<NetClient> net_client__; private: std::string RequestWithToken(std::string uri); - Error GetFileInfoFromServer(FileInfo* info, std::string group_id, GetImageServerOperation op); - Error GetFileInfoFromServerById(uint64_t id, FileInfo* info, std::string group_id); + Error GetRecordFromServer(std::string* info, std::string group_id, GetImageServerOperation op, bool dataset = false); + Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, bool dataset = false); Error GetDataIfNeeded(FileInfo* info, FileData* data); Error GetBrokerUri(); void ProcessServerError(Error* err, const std::string& response, std::string* redirect_uri); Error ProcessRequest(std::string* response, const RequestInfo& request); Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, FileInfo* info, FileData* data); + DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, Error* err); bool DataCanBeInBuffer(const FileInfo* info); Error TryGetDataFromBuffer(const FileInfo* info, FileData* data); std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); std::string AppendUri(std::string request_string); - FileInfos DecodeFromResponse(std::string response, Error* err); + DataSet DecodeDatasetFromResponse(std::string response, Error* err); std::string OpToUriCmd(GetImageServerOperation op); std::string server_uri_; diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index 9b9c254938b009c44fab8ae43723c40ae941d6ef..d72d46166bfd0b9e79c7328f9e49054c5898c927 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -156,7 +156,6 @@ TEST_F(FolderDataBrokerTests, GetNextReturnsFileInfo) { } - TEST_F(FolderDataBrokerTests, GetNDataSets) { data_broker->Connect(); Error err; @@ -278,6 +277,32 @@ TEST_F(GetDataFromFileTests, GetNextReturnsDataAndInfo) { } + +TEST_F(GetDataFromFileTests, RetrieveDataCallsReadsFile) { + data_broker->Connect(); + FileInfo fi; + fi.name = "test"; + + + EXPECT_CALL(mock, GetDataFromFile_t(expected_base_path + asapo::kPathSeparator + "test", _, _)). + WillOnce(DoAll(testing::SetArgPointee<2>(nullptr), testing::Return(new uint8_t[1] {'1'}))); + + auto err = data_broker->RetrieveData(&fi, &data); + + ASSERT_THAT(data[0], Eq('1')); + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(GetDataFromFileTests, RetrieveDataReturnsErrorWithEmptyPointer) { + data_broker->Connect(); + + auto err = data_broker->RetrieveData(&fi, nullptr); + + ASSERT_THAT(err, Ne(nullptr)); +} + + + TEST_F(GetDataFromFileTests, GetNextReturnsErrorWhenCannotReadData) { EXPECT_CALL(mock, GetDataFromFile_t(_, _, _)). WillOnce(DoAll(testing::SetArgPointee<2>(asapo::IOErrorTemplates::kReadError.Generate().release()), @@ -354,5 +379,39 @@ TEST(FolderDataBroker, QueryImages) { } +TEST(FolderDataBroker, NextDataset) { + auto data_broker = std::unique_ptr<FolderDataBroker> {new FolderDataBroker("test")}; + + Error err; + auto dataset = data_broker->GetNextDataset("bla", &err); + + ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(dataset.content.size(), Eq(0)); + ASSERT_THAT(dataset.id, Eq(0)); +} + +TEST(FolderDataBroker, LastDataset) { + auto data_broker = std::unique_ptr<FolderDataBroker> {new FolderDataBroker("test")}; + + Error err; + auto dataset = data_broker->GetLastDataset("bla", &err); + + ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(dataset.content.size(), Eq(0)); + ASSERT_THAT(dataset.id, Eq(0)); +} + + +TEST(FolderDataBroker, DatasetById) { + auto data_broker = std::unique_ptr<FolderDataBroker> {new FolderDataBroker("test")}; + + Error err; + auto dataset = data_broker->GetDatasetById(0, "bla", &err); + + ASSERT_THAT(err, Ne(nullptr)); + ASSERT_THAT(dataset.content.size(), Eq(0)); + ASSERT_THAT(dataset.id, Eq(0)); +} + } diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 4c0f85fea4bc05065ba321f8fef7f394989d0375..df027969c83973054698f3778b0fcd44ed9a6cff 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -299,7 +299,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsParseError) { ASSERT_THAT(err, Eq(asapo::WorkerErrorTemplates::kErrorReadingSource)); } -TEST_F(ServerDataBrokerTests, GetImageReturnsIfNoDtataNeeded) { +TEST_F(ServerDataBrokerTests, GetImageReturnsIfNoDataNeeded) { MockGetBrokerUri(); MockGet("error_response"); @@ -604,6 +604,94 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) { ASSERT_THAT(images[1], Eq(rec2)); } +TEST_F(ServerDataBrokerTests, GetNextDatasetUsesCorrectUri) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/next?token=" + + expected_token + "&dataset=true", _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); + asapo::Error err; + data_broker->GetNextDataset(expected_group_id, &err); +} + + +TEST_F(ServerDataBrokerTests, GetDataSetReturnsFileInfos) { + asapo::Error err; + MockGetBrokerUri(); + + auto to_send1 = CreateFI(); + auto json1 = to_send1.Json(); + auto to_send2 = CreateFI(); + to_send2.id = 2; + auto json2 = to_send2.Json(); + auto to_send3 = CreateFI(); + to_send3.id = 3; + auto json3 = to_send3.Json(); + + auto json = std::string("{") + + "\"_id\":1," + + "\"size\":3," + + "\"images\":[" + json1 + "," + json2 + "," + json3 + "]" + + "}"; + + MockGet(json); + + auto dataset = data_broker->GetNextDataset(expected_group_id, &err); + + ASSERT_THAT(err, Eq(nullptr)); + + ASSERT_THAT(dataset.id, Eq(1)); + ASSERT_THAT(dataset.content.size(), Eq(3)); + ASSERT_THAT(dataset.content[0].id, Eq(to_send1.id)); + ASSERT_THAT(dataset.content[1].id, Eq(to_send2.id)); + ASSERT_THAT(dataset.content[2].id, Eq(to_send3.id)); +} + +TEST_F(ServerDataBrokerTests, GetDataSetReturnsParseError) { + MockGetBrokerUri(); + MockGet("error_response"); + + asapo::Error err; + auto dataset = data_broker->GetNextDataset(expected_group_id, &err); + + ASSERT_THAT(err, Eq(asapo::WorkerErrorTemplates::kInternalError)); + ASSERT_THAT(dataset.content.size(), Eq(0)); + ASSERT_THAT(dataset.id, Eq(0)); + +} + +TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + "/last?token=" + + expected_token + "&dataset=true", _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); + asapo::Error err; + data_broker->GetLastDataset(expected_group_id, &err); +} + + +TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_group_id + + "/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token + "&reset=true&dataset=true", _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); + asapo::Error err; + data_broker->GetDatasetById(expected_dataset_id, expected_group_id, &err); +} + + } diff --git a/worker/api/python/asapo_worker.pxd b/worker/api/python/asapo_worker.pxd index 4b12bb5e14230ac6bc4d7733dd03968399a634f0..de9231176777eeb34be425877bc8650469875f84 100644 --- a/worker/api/python/asapo_worker.pxd +++ b/worker/api/python/asapo_worker.pxd @@ -1,6 +1,7 @@ from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.vector cimport vector +from libcpp cimport bool ctypedef unsigned char uint8_t @@ -22,10 +23,13 @@ cdef extern from "asapo_worker.h" namespace "asapo": cdef extern from "asapo_worker.h" namespace "asapo": cppclass FileInfo: string Json() + bool SetFromJson(string json_str) cppclass FileInfos: vector[FileInfo].iterator begin() vector[FileInfo].iterator end() - + struct DataSet: + uint64_t id + FileInfos content cdef extern from "asapo_worker.h" namespace "asapo": @@ -40,6 +44,10 @@ cdef extern from "asapo_worker.h" namespace "asapo": string GenerateNewGroupId(Error* err) string GetBeamtimeMeta(Error* err) FileInfos QueryImages(string query, Error* err) + DataSet GetNextDataset(string group_id, Error* err) + DataSet GetLastDataset(string group_id, Error* err) + DataSet GetDatasetById(uint64_t id,string group_id, Error* err) + Error RetrieveData(FileInfo* info, FileData* data) cdef extern from "asapo_worker.h" namespace "asapo": diff --git a/worker/api/python/asapo_worker.pyx.in b/worker/api/python/asapo_worker.pyx.in index 46b1f7845784068410bef088ccc74cd6558c1e26..9eb871abf8c48f14724d9a320c9db8abf1d5b8eb 100644 --- a/worker/api/python/asapo_worker.pyx.in +++ b/worker/api/python/asapo_worker.pyx.in @@ -42,9 +42,6 @@ cdef class PyDataBroker: return None,None,err_str info_str = _str(info.Json()) meta = json.loads(info_str) - del meta['buf_id'] - del meta['source'] - del meta['lastchange'] if meta_only: return None,meta,None cdef char* ptr = <char*> data.release() @@ -57,6 +54,24 @@ cdef class PyDataBroker: return self._op("last",group_id,meta_only,0) def get_by_id(self,id,group_id,meta_only = True): return self._op("id",group_id,meta_only,id) + def retrieve_data(self,meta): + json_str = json.dumps(meta) + cdef FileInfo info + if not info.SetFromJson(_bytes(json_str)): + return None,"wrong metadata" + cdef Error err + cdef FileData data + err = self.c_broker.RetrieveData(&info, &data) + err_str = _str(GetErrorString(&err)) + if err_str.strip(): + return None,err_str + cdef np.npy_intp dims[1] + dims[0] = meta['size'] + cdef char* ptr = <char*> data.release() + arr = np.PyArray_SimpleNewFromData(1, dims, np.NPY_BYTE, ptr) + return arr,None + + def get_ndatasets(self): cdef Error err size = self.c_broker.GetNDataSets(&err) @@ -94,6 +109,32 @@ cdef class PyDataBroker: for fi in file_infos: json_list.append(json.loads(_str(fi.Json()))) return json_list, None + def _op_dataset(self, op, group_id, id): + cdef FileInfos file_infos + cdef DataSet dataset + cdef Error err + if op == "next": + dataset = self.c_broker.GetNextDataset(_bytes(group_id),&err) + elif op == "last": + dataset = self.c_broker.GetLastDataset(_bytes(group_id),&err) + elif op == "id": + dataset = self.c_broker.GetDatasetById(id,_bytes(group_id),&err) + err_str = _str(GetErrorString(&err)) + if err_str.strip(): + return None, None, err_str + else: + json_list = [] + for fi in dataset.content: + json_list.append(json.loads(_str(fi.Json()))) + return dataset.id, json_list, None + + + def get_next_dataset(self, group_id): + return self._op_dataset("next",group_id,0) + def get_last_dataset(self, group_id): + return self._op_dataset("last",group_id,0) + def get_dataset_by_id(self, id, group_id): + return self._op_dataset("id",group_id,id) def get_beamtime_meta(self):