From 283c0af7ca9645b7e27dae212396fa0374dc0f14 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Tue, 22 Dec 2020 15:24:36 +0100 Subject: [PATCH] more changes, increase timeout in test --- broker/src/asapo_broker/database/mongodb.go | 18 +-- .../src/asapo_broker/database/mongodb_test.go | 40 +++---- broker/src/asapo_broker/server/listroutes.go | 10 +- .../src/asapo_broker/server/post_op_image.go | 6 +- .../asapo_broker/server/post_op_image_test.go | 32 ++--- .../asapo_broker/server/post_query_images.go | 4 +- .../server/post_query_images_test.go | 6 +- common/cpp/src/data_structs/data_structs.cpp | 2 +- common/cpp/src/database/mongodb_client.cpp | 10 +- .../api/cpp/include/asapo/consumer/consumer.h | 36 +++--- consumer/api/cpp/src/consumer_impl.cpp | 42 +++---- consumer/api/cpp/src/consumer_impl.h | 14 +-- .../api/cpp/unittests/test_consumer_impl.cpp | 112 +++++++++--------- consumer/api/python/asapo_consumer.pxd | 2 +- consumer/api/python/asapo_consumer.pyx.in | 4 +- examples/pipeline/in_to_out/in_to_out.cpp | 2 +- .../dummy_data_producer.cpp | 28 ++--- .../api/cpp/include/asapo/producer/producer.h | 2 +- .../test_system_folder_watch_linux.cpp | 4 +- .../consumer/consumer_api/consumer_api.cpp | 20 ++-- .../consumer_api_python/consumer_api.py | 24 ++-- .../get_user_meta.py | 6 +- .../python_tests/consumer/consumer_api.py | 2 +- .../manual/python_tests/plot_images_online.py | 4 +- 24 files changed, 215 insertions(+), 215 deletions(-) diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 478139680..6683be24c 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -168,7 +168,7 @@ func (db *Mongodb) getMaxIndex(request Request, returnIncompete bool) (max_id in if request.MinDatasetSize>0 { q = bson.M{"size": bson.M{"$gte": request.MinDatasetSize}} } else { - q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$images"}}}} + q = bson.M{"$expr": bson.M{"$eq": []interface{}{"$size", bson.M{"$size": "$messages"}}}} } } else { q = nil @@ -257,13 +257,13 @@ func (db *Mongodb) getRecordByIDRow(request Request, id, id_max int) ([]byte, er partialData := false if request.DatasetOp { - imgs,ok1 :=res["images"].(primitive.A) + imgs,ok1 :=res["messages"].(primitive.A) expectedSize,ok2 := utils.InterfaceToInt64(res["size"]) if !ok1 || !ok2 { return nil, &DBError{utils.StatusTransactionInterrupted, "getRecordByIDRow: cannot parse database response" } } - nImages := len(imgs) - if (request.MinDatasetSize==0 && int64(nImages)!=expectedSize) || (request.MinDatasetSize==0 && nImages<request.MinDatasetSize) { + nMessages := len(imgs) + if (request.MinDatasetSize==0 && int64(nMessages)!=expectedSize) || (request.MinDatasetSize==0 && nMessages<request.MinDatasetSize) { partialData = true } } @@ -622,7 +622,7 @@ func (db *Mongodb) processQueryError(query, dbname string, err error) ([]byte, e return nil, &DBError{utils.StatusNoData, err.Error()} } -func (db *Mongodb) queryImages(request Request) ([]byte, error) { +func (db *Mongodb) queryMessages(request Request) ([]byte, error) { var res []map[string]interface{} q, sort, err := db.BSONFromSQL(request.DbName, request.ExtraParam) if err != nil { @@ -808,13 +808,13 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { return db.getSize(request) case "meta": return db.getMeta(request) - case "queryimages": - return db.queryImages(request) + case "querymessages": + return db.queryMessages(request) case "streams": return db.getStreams(request) - case "ackimage": + case "ackmessage": return db.ackRecord(request) - case "negackimage": + case "negackmessage": return db.negAckRecord(request) case "nacks": return db.nacks(request) diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index 3acab2a69..dbf379375 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -22,7 +22,7 @@ type TestRecord struct { type TestDataset struct { ID int64 `bson:"_id" json:"_id"` Size int64 `bson:"size" json:"size"` - Images []TestRecord `bson:"images" json:"images"` + Messages []TestRecord `bson:"messages" json:"messages"` } var db Mongodb @@ -84,8 +84,8 @@ func TestMongoDBGetMetaErrorWhenNotConnected(t *testing.T) { assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } -func TestMongoDBQueryImagesErrorWhenNotConnected(t *testing.T) { - _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "queryimages", ExtraParam: "0"}) +func TestMongoDBQueryMessagesErrorWhenNotConnected(t *testing.T) { + _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "querymessages", ExtraParam: "0"}) assert.Equal(t, utils.StatusServiceUnavailable, err.(*DBError).Code) } @@ -514,7 +514,7 @@ var tests = []struct { {"(meta.counter = 10 OR meta.counter = 11 AND (meta.text = 'bbb' OR meta.text = 'ccc')", []TestRecordMeta{}, false}, } -func TestMongoDBQueryImagesOK(t *testing.T) { +func TestMongoDBQueryMessagesOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -531,7 +531,7 @@ func TestMongoDBQueryImagesOK(t *testing.T) { // continue // } - res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "queryimages", ExtraParam: test.query}) + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "querymessages", ExtraParam: test.query}) var res []TestRecordMeta json.Unmarshal(res_string, &res) // fmt.Println(string(res_string)) @@ -546,11 +546,11 @@ func TestMongoDBQueryImagesOK(t *testing.T) { } -func TestMongoDBQueryImagesOnEmptyDatabase(t *testing.T) { +func TestMongoDBQueryMessagesOnEmptyDatabase(t *testing.T) { db.Connect(dbaddress) defer cleanup() for _, test := range tests { - res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "queryimages", ExtraParam: test.query}) + res_string, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, Op: "querymessages", ExtraParam: test.query}) var res []TestRecordMeta json.Unmarshal(res_string, &res) assert.Equal(t, 0, len(res)) @@ -772,14 +772,14 @@ func TestMongoDBListStreams(t *testing.T) { } } -func TestMongoDBAckImage(t *testing.T) { +func TestMongoDBAckMessage(t *testing.T) { db.Connect(dbaddress) defer cleanup() db.insertRecord(dbname, collection, &rec1) - query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" + query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - request := Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackimage", ExtraParam: query_str} + request := Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str} res, err := db.ProcessRequest(request) nacks, _ := db.getNacks(request, 1, 1) assert.Nil(t, err) @@ -815,9 +815,9 @@ func TestMongoDBNacks(t *testing.T) { insertRecords(10) } if test.ackRecords { - db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackimage\"}"}) - db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackimage\"}"}) - db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackimage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) } res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "nacks", ExtraParam: test.rangeString}) @@ -849,9 +849,9 @@ func TestMongoDBLastAcks(t *testing.T) { insertRecords(10) } if test.ackRecords { - db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackimage\"}"}) - db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackimage\"}"}) - db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackimage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":2,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":3,\"Op\":\"ackmessage\"}"}) + db.ackRecord(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, ExtraParam: "{\"Id\":4,\"Op\":\"ackmessage\"}"}) } res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "lastack"}) @@ -969,9 +969,9 @@ func TestMongoDBAckDeletesInprocessed(t *testing.T) { defer cleanup() db.insertRecord(dbname, collection, &rec1) db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) - query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" + query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" - db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackimage", ExtraParam: query_str}) + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "ackmessage", ExtraParam: query_str}) _, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next", ExtraParam: "0_3"}) assert.NotNil(t, err) if err != nil { @@ -997,8 +997,8 @@ func TestMongoDBNegAck(t *testing.T) { db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) bparam, _ := json.Marshal(&inputParams) - db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackimage", ExtraParam: string(bparam)}) - res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // first time image from negack + db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "negackmessage", ExtraParam: string(bparam)}) + res, err := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // first time message from negack _, err1 := db.ProcessRequest(Request{DbName: dbname, DbCollectionName: collection, GroupId: groupId, Op: "next"}) // second time nothing assert.Nil(t, err) diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index d95f6161b..8d782f58e 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -60,10 +60,10 @@ var listRoutes = utils.Routes{ routeCreateGroupID, }, utils.Route{ - "QueryImages", + "QueryMessages", "Post", - "/database/{dbname}/{datasource}/{stream}/0/queryimages", - routeQueryImages, + "/database/{dbname}/{datasource}/{stream}/0/querymessages", + routeQueryMessages, }, utils.Route{ "ResetConter", @@ -72,10 +72,10 @@ var listRoutes = utils.Routes{ routeResetCounter, }, utils.Route{ - "ImageOp", + "MessageOp", "Post", "/database/{dbname}/{datasource}/{stream}/{groupid}/{id}", - routeImageOp, + routeMessageOp, }, utils.Route{ "Health", diff --git a/broker/src/asapo_broker/server/post_op_image.go b/broker/src/asapo_broker/server/post_op_image.go index 0f3b22198..1440812f5 100644 --- a/broker/src/asapo_broker/server/post_op_image.go +++ b/broker/src/asapo_broker/server/post_op_image.go @@ -7,12 +7,12 @@ import ( "strconv" ) -type ImageOp struct { +type MessageOp struct { Id int Op string Params map[string]interface{} `json:",omitempty"` } -func routeImageOp(w http.ResponseWriter, r *http.Request) { +func routeMessageOp(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), 500) @@ -31,7 +31,7 @@ func routeImageOp(w http.ResponseWriter, r *http.Request) { return } - var op ImageOp + var op MessageOp err = json.Unmarshal(body, &op) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/broker/src/asapo_broker/server/post_op_image_test.go b/broker/src/asapo_broker/server/post_op_image_test.go index 789e645e5..259787e41 100644 --- a/broker/src/asapo_broker/server/post_op_image_test.go +++ b/broker/src/asapo_broker/server/post_op_image_test.go @@ -9,12 +9,12 @@ import ( "testing" ) -type ImageOpTestSuite struct { +type MessageOpTestSuite struct { suite.Suite mock_db *database.MockedDatabase } -func (suite *ImageOpTestSuite) SetupTest() { +func (suite *MessageOpTestSuite) SetupTest() { statistics.Reset() suite.mock_db = new(database.MockedDatabase) db = suite.mock_db @@ -22,33 +22,33 @@ func (suite *ImageOpTestSuite) SetupTest() { logger.SetMockLog() } -func (suite *ImageOpTestSuite) TearDownTest() { +func (suite *MessageOpTestSuite) TearDownTest() { assertExpectations(suite.T(), suite.mock_db) logger.UnsetMockLog() db = nil } -func TestImageOpTestSuite(t *testing.T) { - suite.Run(t, new(ImageOpTestSuite)) +func TestMessageOpTestSuite(t *testing.T) { + suite.Run(t, new(MessageOpTestSuite)) } -func (suite *ImageOpTestSuite) TestAckImageOpOK() { - query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" - suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "ackimage", ExtraParam: query_str}).Return([]byte(""), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request ackimage"))) +func (suite *MessageOpTestSuite) TestAckMessageOpOK() { + query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" + suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedStream, GroupId: expectedGroupID, Op: "ackmessage", ExtraParam: query_str}).Return([]byte(""), nil) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request ackmessage"))) w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/1" + correctTokenSuffix,"POST",query_str) - suite.Equal(http.StatusOK, w.Code, "ackimage OK") + suite.Equal(http.StatusOK, w.Code, "ackmessage OK") } -func (suite *ImageOpTestSuite) TestAckImageOpErrorWrongOp() { - query_str := "\"Id\":1,\"Op\":\"ackimage\"}" +func (suite *MessageOpTestSuite) TestAckMessageOpErrorWrongOp() { + query_str := "\"Id\":1,\"Op\":\"ackmessage\"}" w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/1" + correctTokenSuffix,"POST",query_str) - suite.Equal(http.StatusBadRequest, w.Code, "ackimage wrong") + suite.Equal(http.StatusBadRequest, w.Code, "ackmessage wrong") } -func (suite *ImageOpTestSuite) TestAckImageOpErrorWrongID() { - query_str := "{\"Id\":1,\"Op\":\"ackimage\"}" +func (suite *MessageOpTestSuite) TestAckMessageOpErrorWrongID() { + query_str := "{\"Id\":1,\"Op\":\"ackmessage\"}" w := doRequest("/database/" + expectedBeamtimeId + "/" + expectedSource + "/" + expectedStream + "/" + expectedGroupID + "/bla" + correctTokenSuffix,"POST",query_str) - suite.Equal(http.StatusBadRequest, w.Code, "ackimage wrong") + suite.Equal(http.StatusBadRequest, w.Code, "ackmessage wrong") } diff --git a/broker/src/asapo_broker/server/post_query_images.go b/broker/src/asapo_broker/server/post_query_images.go index 4d33c2358..87e833b68 100644 --- a/broker/src/asapo_broker/server/post_query_images.go +++ b/broker/src/asapo_broker/server/post_query_images.go @@ -5,12 +5,12 @@ import ( "net/http" ) -func routeQueryImages(w http.ResponseWriter, r *http.Request) { +func routeQueryMessages(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), 500) return } - processRequest(w, r, "queryimages", string(body), false) + processRequest(w, r, "querymessages", string(body), false) } diff --git a/broker/src/asapo_broker/server/post_query_images_test.go b/broker/src/asapo_broker/server/post_query_images_test.go index 6549c8b37..0f2b55c14 100644 --- a/broker/src/asapo_broker/server/post_query_images_test.go +++ b/broker/src/asapo_broker/server/post_query_images_test.go @@ -35,10 +35,10 @@ func TestQueryTestSuite(t *testing.T) { func (suite *QueryTestSuite) TestQueryOK() { query_str := "aaaa" - suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedStream,Op: "queryimages", ExtraParam: query_str}).Return([]byte("{}"), nil) - logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request queryimages"))) + suite.mock_db.On("ProcessRequest", database.Request{DbName: expectedDBName, DbCollectionName: expectedStream,Op: "querymessages", ExtraParam: query_str}).Return([]byte("{}"), nil) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("processing request querymessages"))) - w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/0/queryimages"+correctTokenSuffix, "POST", query_str) + w := doRequest("/database/"+expectedBeamtimeId+"/"+expectedSource+"/"+expectedStream+"/0/querymessages"+correctTokenSuffix, "POST", query_str) suite.Equal(http.StatusOK, w.Code, "Query OK") } diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp index 36023b5d5..2c9b7deed 100644 --- a/common/cpp/src/data_structs/data_structs.cpp +++ b/common/cpp/src/data_structs/data_structs.cpp @@ -91,7 +91,7 @@ bool DataSet::SetFromJson(const std::string &json_string) { std::vector<std::string> vec_fi_endcoded; Error parse_err; - (parse_err = parser.GetArrayRawStrings("images", &vec_fi_endcoded)) || + (parse_err = parser.GetArrayRawStrings("messages", &vec_fi_endcoded)) || (parse_err = parser.GetUInt64("size", &expected_size)) || (parse_err = parser.GetUInt64("_id", &id)); if (parse_err) { diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index 783b38acb..5abe197d2 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -252,14 +252,14 @@ Error MongoDBClient::InsertAsSubset(const std::string &collection, const Message if (err) { return err; } - auto query = BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(subset_id), "}", "{", "images._id", "{", "$ne", + auto query = BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(subset_id), "}", "{", "messages._id", "{", "$ne", BCON_INT64(file.id), "}", "}", "]"); auto update = BCON_NEW ("$setOnInsert", "{", "size", BCON_INT64(subset_size), "timestamp", BCON_INT64((int64_t) NanosecsEpochFromTimePoint(file.timestamp)), "}", "$addToSet", "{", - "images", BCON_DOCUMENT(document.get()), "}"); + "messages", BCON_DOCUMENT(document.get()), "}"); err = AddBsonDocumentToArray(query, update, ignore_duplicates); @@ -347,9 +347,9 @@ Error MongoDBClient::GetDataSetById(const std::string &collection, uint64_t id_i DBErrorTemplates::kJsonParseError.Generate(record_str); } - for (const auto &fileinfo : dataset.content) { - if (fileinfo.id == id_in_set) { - *file = fileinfo; + for (const auto &message_meta : dataset.content) { + if (message_meta.id == id_in_set) { + *file = message_meta; return nullptr; } } diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index bd4c495bf..913246ab2 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -99,20 +99,20 @@ class Consumer { */ virtual std::string GetBeamtimeMeta(Error* err) = 0; - //! Receive next available image. + //! Receive next available message. /*! - \param info - where to store image metadata. Can be set to nullptr only image data is needed. + \param info - where to store message metadata. Can be set to nullptr only message data is needed. \param group_id - group id to use. - \param data - where to store image data. Can be set to nullptr only image metadata is needed. + \param data - where to store message data. Can be set to nullptr only message metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetNext(MessageMeta* info, std::string group_id, MessageData* data) = 0; virtual Error GetNext(MessageMeta* info, std::string group_id, std::string stream, MessageData* data) = 0; - //! Retrieves image using fileinfo. + //! Retrieves message using message meta. /*! - \param info - image metadata to use, can be updated after operation - \param data - where to store image data. Can be set to nullptr only image metadata is needed. + \param info - message metadata to use, can be updated after operation + \param data - where to store message data. Can be set to nullptr only message metadata is needed. \return Error if data is nullptr or data cannot be read, nullptr otherwise. */ virtual Error RetrieveData(MessageMeta* info, MessageData* data) = 0; @@ -150,11 +150,11 @@ class Consumer { virtual DataSet GetDatasetById(uint64_t id, std::string stream, uint64_t min_size, Error* err) = 0; virtual DataSet GetDatasetById(uint64_t id, uint64_t min_size, Error* err) = 0; - //! Receive single image by id. + //! Receive single message by id. /*! - \param id - image id - \param info - where to store image metadata. Can be set to nullptr only image data is needed. - \param data - where to store image data. Can be set to nullptr only image metadata is needed. + \param id - message id + \param info - where to store message metadata. Can be set to nullptr only message data is needed. + \param data - where to store message data. Can be set to nullptr only message metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetById(uint64_t id, MessageMeta* info, MessageData* data) = 0; @@ -165,28 +165,28 @@ class Consumer { \param group_id - group id to use. \param stream (optional) - stream \param err - will be set in case of error, nullptr otherwise. - \return id of the last acknowledged image, 0 if error + \return id of the last acknowledged message, 0 if error */ virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, std::string stream, Error* error) = 0; virtual uint64_t GetLastAcknowledgedTulpeId(std::string group_id, Error* error) = 0; - //! Receive last available image. + //! Receive last available message. /*! - \param info - where to store image metadata. Can be set to nullptr only image data is needed. - \param data - where to store image data. Can be set to nullptr only image metadata is needed. + \param info - where to store message metadata. Can be set to nullptr only message data is needed. + \param data - where to store message data. Can be set to nullptr only message metadata is needed. \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetLast(MessageMeta* info, MessageData* data) = 0; virtual Error GetLast(MessageMeta* info, std::string stream, MessageData* data) = 0; - //! Get all images matching the query. + //! Get all messages matching the query. /*! \param sql_query - query string in SQL format. Limit subset is supported \param err - will be set in case of error, nullptr otherwise - \return vector of image metadata matchiing to specified query. Empty if nothing found or error + \return vector of message metadata matchiing to specified query. Empty if nothing found or error */ - virtual MessageMetas QueryImages(std::string query, Error* err) = 0; - virtual MessageMetas QueryImages(std::string query, std::string stream, Error* err) = 0; + virtual MessageMetas QueryMessages(std::string query, Error* err) = 0; + virtual MessageMetas QueryMessages(std::string query, std::string stream, Error* err) = 0; //! Configure resending nonacknowledged data /*! diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 7afbaeb8e..b3fc99f5d 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -241,7 +241,7 @@ RequestInfo ConsumerImpl::PrepareRequestInfo(std::string api_url, bool dataset, } Error ConsumerImpl::GetRecordFromServer(std::string* response, std::string group_id, std::string stream, - GetImageServerOperation op, + GetMessageServerOperation op, bool dataset, uint64_t min_size) { interrupt_flag_= false; std::string request_suffix = OpToUriCmd(op); @@ -299,7 +299,7 @@ Error ConsumerImpl::GetNext(MessageMeta* info, std::string group_id, MessageData } Error ConsumerImpl::GetNext(MessageMeta* info, std::string group_id, std::string stream, MessageData* data) { - return GetImageFromServer(GetImageServerOperation::GetNext, + return GetMessageFromServer(GetMessageServerOperation::GetNext, 0, std::move(group_id), std::move(stream), @@ -312,7 +312,7 @@ Error ConsumerImpl::GetLast(MessageMeta* info, MessageData* data) { } Error ConsumerImpl::GetLast(MessageMeta* info, std::string stream, MessageData* data) { - return GetImageFromServer(GetImageServerOperation::GetLast, + return GetMessageFromServer(GetMessageServerOperation::GetLast, 0, "0", std::move(stream), @@ -320,15 +320,15 @@ Error ConsumerImpl::GetLast(MessageMeta* info, std::string stream, MessageData* data); } -std::string ConsumerImpl::OpToUriCmd(GetImageServerOperation op) { +std::string ConsumerImpl::OpToUriCmd(GetMessageServerOperation op) { switch (op) { - case GetImageServerOperation::GetNext:return "next"; - case GetImageServerOperation::GetLast:return "last"; + case GetMessageServerOperation::GetNext:return "next"; + case GetMessageServerOperation::GetLast:return "last"; default:return "last"; } } -Error ConsumerImpl::GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, +Error ConsumerImpl::GetMessageFromServer(GetMessageServerOperation op, uint64_t id, std::string group_id, std::string stream, MessageMeta* info, MessageData* data) { @@ -338,7 +338,7 @@ Error ConsumerImpl::GetImageFromServer(GetImageServerOperation op, uint64_t id, Error err; std::string response; - if (op == GetImageServerOperation::GetID) { + if (op == GetMessageServerOperation::GetID) { err = GetRecordFromServerById(id, &response, std::move(group_id), std::move(stream)); } else { err = GetRecordFromServer(&response, std::move(group_id), std::move(stream), op); @@ -568,7 +568,7 @@ Error ConsumerImpl::GetById(uint64_t id, MessageMeta* info, MessageData* data) { } Error ConsumerImpl::GetById(uint64_t id, MessageMeta* info, std::string stream, MessageData* data) { - return GetImageFromServer(GetImageServerOperation::GetID, id, "0", stream, info, data); + return GetMessageFromServer(GetMessageServerOperation::GetID, id, "0", stream, info, data); } Error ConsumerImpl::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id, @@ -606,10 +606,10 @@ DataSet DecodeDatasetFromResponse(std::string response, Error* err) { } } -MessageMetas ConsumerImpl::QueryImages(std::string query, std::string stream, Error* err) { +MessageMetas ConsumerImpl::QueryMessages(std::string query, std::string stream, Error* err) { RequestInfo ri; ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.data_source + - "/" + std::move(stream) + "/0/queryimages"; + "/" + std::move(stream) + "/0/querymessages"; ri.post = true; ri.body = std::move(query); @@ -618,12 +618,12 @@ MessageMetas ConsumerImpl::QueryImages(std::string query, std::string stream, Er return MessageMetas{}; } - auto dataset = DecodeDatasetFromResponse("{\"_id\":0,\"size\":0, \"images\":" + response + "}", err); + auto dataset = DecodeDatasetFromResponse("{\"_id\":0,\"size\":0, \"messages\":" + response + "}", err); return dataset.content; } -MessageMetas ConsumerImpl::QueryImages(std::string query, Error* err) { - return QueryImages(std::move(query), kDefaultStream, err); +MessageMetas ConsumerImpl::QueryMessages(std::string query, Error* err) { + return QueryMessages(std::move(query), kDefaultStream, err); } DataSet ConsumerImpl::GetNextDataset(std::string group_id, uint64_t min_size, Error* err) { @@ -631,25 +631,25 @@ DataSet ConsumerImpl::GetNextDataset(std::string group_id, uint64_t min_size, Er } DataSet ConsumerImpl::GetNextDataset(std::string group_id, std::string stream, uint64_t min_size, Error* err) { - return GetDatasetFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), std::move(stream),min_size, err); + return GetDatasetFromServer(GetMessageServerOperation::GetNext, 0, std::move(group_id), std::move(stream),min_size, err); } DataSet ConsumerImpl::GetLastDataset(std::string stream, uint64_t min_size, Error* err) { - return GetDatasetFromServer(GetImageServerOperation::GetLast, 0, "0", std::move(stream),min_size, err); + return GetDatasetFromServer(GetMessageServerOperation::GetLast, 0, "0", std::move(stream),min_size, err); } DataSet ConsumerImpl::GetLastDataset(uint64_t min_size, Error* err) { return GetLastDataset(kDefaultStream, min_size, err); } -DataSet ConsumerImpl::GetDatasetFromServer(GetImageServerOperation op, +DataSet ConsumerImpl::GetDatasetFromServer(GetMessageServerOperation op, uint64_t id, std::string group_id, std::string stream, uint64_t min_size, Error* err) { MessageMetas infos; std::string response; - if (op == GetImageServerOperation::GetID) { + if (op == GetMessageServerOperation::GetID) { *err = GetRecordFromServerById(id, &response, std::move(group_id), std::move(stream), true, min_size); } else { *err = GetRecordFromServer(&response, std::move(group_id), std::move(stream), op, true, min_size); @@ -665,7 +665,7 @@ DataSet ConsumerImpl::GetDatasetById(uint64_t id, uint64_t min_size, Error* err) } DataSet ConsumerImpl::GetDatasetById(uint64_t id, std::string stream, uint64_t min_size, Error* err) { - return GetDatasetFromServer(GetImageServerOperation::GetID, id, "0", std::move(stream), min_size, err); + return GetDatasetFromServer(GetMessageServerOperation::GetID, id, "0", std::move(stream), min_size, err); } StreamInfos ParseStreamsFromResponse(std::string response, Error* err) { @@ -766,7 +766,7 @@ Error ConsumerImpl::Acknowledge(std::string group_id, uint64_t id, std::string s +"/" + std::move(stream) + "/" + std::move(group_id) + "/" + std::to_string(id); ri.post = true; - ri.body = "{\"Op\":\"ackimage\"}"; + ri.body = "{\"Op\":\"ackmessage\"}"; Error err; BrokerRequestWithTimeout(ri, &err); @@ -847,7 +847,7 @@ Error ConsumerImpl::NegativeAcknowledge(std::string group_id, +"/" + std::move(stream) + "/" + std::move(group_id) + "/" + std::to_string(id); ri.post = true; - ri.body = R"({"Op":"negackimage","Params":{"DelayMs":)" + std::to_string(delay_ms) + "}}"; + ri.body = R"({"Op":"negackmessage","Params":{"DelayMs":)" + std::to_string(delay_ms) + "}}"; Error err; BrokerRequestWithTimeout(ri, &err); diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index d5c1defba..1ced0a6ba 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -11,7 +11,7 @@ namespace asapo { -enum class GetImageServerOperation { +enum class GetMessageServerOperation { GetNext, GetLast, GetID @@ -97,8 +97,8 @@ class ConsumerImpl final : public asapo::Consumer { NetworkConnectionType CurrentConnectionType() const override; - MessageMetas QueryImages(std::string query, Error* err) override; - MessageMetas QueryImages(std::string query, std::string stream, Error* err) override; + MessageMetas QueryMessages(std::string query, Error* err) override; + MessageMetas QueryMessages(std::string query, std::string stream, Error* err) override; DataSet GetNextDataset(std::string group_id, uint64_t min_size, Error* err) override; DataSet GetNextDataset(std::string group_id, std::string stream, uint64_t min_size, Error* err) override; @@ -127,7 +127,7 @@ class ConsumerImpl final : public asapo::Consumer { static const std::string kBrokerServiceName; static const std::string kFileTransferServiceName; std::string RequestWithToken(std::string uri); - Error GetRecordFromServer(std::string* info, std::string group_id, std::string stream, GetImageServerOperation op, + Error GetRecordFromServer(std::string* info, std::string group_id, std::string stream, GetMessageServerOperation op, bool dataset = false, uint64_t min_size = 0); Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, std::string stream, bool dataset = false, uint64_t min_size = 0); @@ -136,9 +136,9 @@ class ConsumerImpl final : public asapo::Consumer { bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* group_id,std::string* redirect_uri); bool SwitchToGetByIdIfPartialData(Error* err, const std::string& response, std::string* group_id,std::string* redirect_uri); Error ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri); - Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string stream, + Error GetMessageFromServer(GetMessageServerOperation op, uint64_t id, std::string group_id, std::string stream, MessageMeta* info, MessageData* data); - DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string stream, + DataSet GetDatasetFromServer(GetMessageServerOperation op, uint64_t id, std::string group_id, std::string stream, uint64_t min_size, Error* err); bool DataCanBeInBuffer(const MessageMeta* info); Error TryGetDataFromBuffer(const MessageMeta* info, MessageData* data); @@ -152,7 +152,7 @@ class ConsumerImpl final : public asapo::Consumer { Error ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code); RequestInfo PrepareRequestInfo(std::string api_url, bool dataset, uint64_t min_size); - std::string OpToUriCmd(GetImageServerOperation op); + std::string OpToUriCmd(GetMessageServerOperation op); Error UpdateFolderTokenIfNeeded(bool ignore_existing); std::string endpoint_; std::string current_broker_uri_; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index f2defc92d..18aaf81a0 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -78,7 +78,7 @@ class ConsumerImplTests : public Test { std::string expected_query_string = "bla"; std::string expected_folder_token = "folder_token"; std::string expected_beamtime_id = "beamtime_id"; - uint64_t expected_image_size = 100; + uint64_t expected_message_size = 100; uint64_t expected_dataset_id = 1; static const uint64_t expected_buf_id = 123; std::string expected_next_stream = "nextstream"; @@ -168,7 +168,7 @@ class ConsumerImplTests : public Test { } MessageMeta CreateFI(uint64_t buf_id = expected_buf_id) { MessageMeta fi; - fi.size = expected_image_size; + fi.size = expected_message_size; fi.id = 1; fi.buf_id = buf_id; fi.name = expected_filename; @@ -177,7 +177,7 @@ class ConsumerImplTests : public Test { } }; -TEST_F(ConsumerImplTests, GetImageReturnsErrorOnWrongInput) { +TEST_F(ConsumerImplTests, GetMessageReturnsErrorOnWrongInput) { auto err = consumer->GetNext(nullptr, "", nullptr); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } @@ -251,7 +251,7 @@ TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) { consumer->GetLast(&info, nullptr); } -TEST_F(ConsumerImplTests, GetImageReturnsEndOfStreamFromHttpClient) { +TEST_F(ConsumerImplTests, GetMessageReturnsEndOfStreamFromHttpClient) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( @@ -269,7 +269,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsEndOfStreamFromHttpClient) { ASSERT_THAT(err_data->next_stream, Eq("")); } -TEST_F(ConsumerImplTests, GetImageReturnsStreamFinishedFromHttpClient) { +TEST_F(ConsumerImplTests, GetMessageReturnsStreamFinishedFromHttpClient) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( @@ -288,7 +288,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsStreamFinishedFromHttpClient) { ASSERT_THAT(err_data->next_stream, Eq(expected_next_stream)); } -TEST_F(ConsumerImplTests, GetImageReturnsNoDataFromHttpClient) { +TEST_F(ConsumerImplTests, GetMessageReturnsNoDataFromHttpClient) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( @@ -306,7 +306,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsNoDataFromHttpClient) { ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); } -TEST_F(ConsumerImplTests, GetImageReturnsNotAuthorized) { +TEST_F(ConsumerImplTests, GetMessageReturnsNotAuthorized) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( @@ -319,7 +319,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsNotAuthorized) { ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } -TEST_F(ConsumerImplTests, GetImageReturnsWrongResponseFromHttpClient) { +TEST_F(ConsumerImplTests, GetMessageReturnsWrongResponseFromHttpClient) { MockGetBrokerUri(); @@ -334,7 +334,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsWrongResponseFromHttpClient) { ASSERT_THAT(err->Explain(), HasSubstr("malformed")); } -TEST_F(ConsumerImplTests, GetImageReturnsIfBrokerAddressNotFound) { +TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerAddressNotFound) { EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/asapo-broker"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::NotFound), @@ -347,7 +347,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsIfBrokerAddressNotFound) { ASSERT_THAT(err->Explain(), AllOf(HasSubstr(expected_server_uri), HasSubstr("unavailable"))); } -TEST_F(ConsumerImplTests, GetImageReturnsIfBrokerUriEmpty) { +TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerUriEmpty) { EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/asapo-broker"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll( SetArgPointee<1>(HttpCode::OK), @@ -387,7 +387,7 @@ TEST_F(ConsumerImplTests, GetBrokerUriAgainAfterConnectionError) { consumer->GetNext(&info, expected_group_id, nullptr); } -TEST_F(ConsumerImplTests, GetImageReturnsEofStreamFromHttpClientUntilTimeout) { +TEST_F(ConsumerImplTests, GetMessageReturnsEofStreamFromHttpClientUntilTimeout) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll( @@ -401,7 +401,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsEofStreamFromHttpClientUntilTimeout) { ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream)); } -TEST_F(ConsumerImplTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorOccured) { +TEST_F(ConsumerImplTests, GetMessageReturnsNoDataAfterTimeoutEvenIfOtherErrorOccured) { MockGetBrokerUri(); consumer->SetTimeout(300); @@ -424,7 +424,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorOccur ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData)); } -TEST_F(ConsumerImplTests, GetNextImageReturnsImmediatelyOnTransferError) { +TEST_F(ConsumerImplTests, GetNextMessageReturnsImmediatelyOnTransferError) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( @@ -461,7 +461,7 @@ TEST_F(ConsumerImplTests, GetNextRetriesIfConnectionHttpClientErrorUntilTimeout) ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kUnavailableService)); } -TEST_F(ConsumerImplTests, GetNextImageReturnsImmediatelyOnFinshedStream) { +TEST_F(ConsumerImplTests, GetNextMessageReturnsImmediatelyOnFinshedStream) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( @@ -475,7 +475,7 @@ TEST_F(ConsumerImplTests, GetNextImageReturnsImmediatelyOnFinshedStream) { ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kStreamFinished)); } -TEST_F(ConsumerImplTests, GetImageReturnsMessageMeta) { +TEST_F(ConsumerImplTests, GetMessageReturnsMessageMeta) { MockGetBrokerUri(); auto to_send = CreateFI(); @@ -493,7 +493,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsMessageMeta) { ASSERT_THAT(info.timestamp, Eq(to_send.timestamp)); } -TEST_F(ConsumerImplTests, GetImageReturnsParseError) { +TEST_F(ConsumerImplTests, GetMessageReturnsParseError) { MockGetBrokerUri(); MockGet("error_response"); @@ -502,7 +502,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsParseError) { ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction)); } -TEST_F(ConsumerImplTests, GetImageReturnsIfNoDataNeeded) { +TEST_F(ConsumerImplTests, GetMessageReturnsIfNoDataNeeded) { MockGetBrokerUri(); MockGet("error_response"); @@ -512,7 +512,7 @@ TEST_F(ConsumerImplTests, GetImageReturnsIfNoDataNeeded) { consumer->GetNext(&info, expected_group_id, nullptr); } -TEST_F(ConsumerImplTests, GetImageTriesToGetDataFromMemoryCache) { +TEST_F(ConsumerImplTests, GetMessageTriesToGetDataFromMemoryCache) { MockGetBrokerUri(); auto to_send = CreateFI(); auto json = to_send.Json(); @@ -528,7 +528,7 @@ TEST_F(ConsumerImplTests, GetImageTriesToGetDataFromMemoryCache) { } -TEST_F(ConsumerImplTests, GetImageCallsReadFromFileIfCannotReadFromCache) { +TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfCannotReadFromCache) { MockGetBrokerUri(); auto to_send = CreateFI(); auto json = to_send.Json(); @@ -544,7 +544,7 @@ TEST_F(ConsumerImplTests, GetImageCallsReadFromFileIfCannotReadFromCache) { ASSERT_THAT(info.buf_id, Eq(0)); } -TEST_F(ConsumerImplTests, GetImageCallsReadFromFileIfZeroBufId) { +TEST_F(ConsumerImplTests, GetMessageCallsReadFromFileIfZeroBufId) { MockGetBrokerUri(); auto to_send = CreateFI(0); auto json = to_send.Json(); @@ -788,40 +788,40 @@ TEST_F(ConsumerImplTests, GetMetaDataOK) { } -TEST_F(ConsumerImplTests, QueryImagesReturnError) { +TEST_F(ConsumerImplTests, QueryMessagesReturnError) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Post_t(HasSubstr("queryimages"), _, expected_query_string, _, _)).WillOnce(DoAll( + EXPECT_CALL(mock_http_client, Post_t(HasSubstr("querymessages"), _, expected_query_string, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::BadRequest), SetArgPointee<4>(nullptr), Return("error in query"))); consumer->SetTimeout(1000); asapo::Error err; - auto images = consumer->QueryImages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, &err); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); ASSERT_THAT(err->Explain(), HasSubstr("query")); - ASSERT_THAT(images.size(), Eq(0)); + ASSERT_THAT(messages.size(), Eq(0)); } -TEST_F(ConsumerImplTests, QueryImagesReturnEmptyResults) { +TEST_F(ConsumerImplTests, QueryMessagesReturnEmptyResults) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Post_t(HasSubstr("queryimages"), _, expected_query_string, _, _)).WillOnce(DoAll( + EXPECT_CALL(mock_http_client, Post_t(HasSubstr("querymessages"), _, expected_query_string, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), SetArgPointee<4>(nullptr), Return("[]"))); consumer->SetTimeout(100); asapo::Error err; - auto images = consumer->QueryImages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, &err); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(images.size(), Eq(0)); + ASSERT_THAT(messages.size(), Eq(0)); } -TEST_F(ConsumerImplTests, QueryImagesWrongResponseArray) { +TEST_F(ConsumerImplTests, QueryMessagesWrongResponseArray) { MockGetBrokerUri(); @@ -832,41 +832,41 @@ TEST_F(ConsumerImplTests, QueryImagesWrongResponseArray) { auto responce_string = json1 + "," + json2 + "]"; // no [ at the beginning - EXPECT_CALL(mock_http_client, Post_t(HasSubstr("queryimages"), _, expected_query_string, _, _)).WillOnce(DoAll( + EXPECT_CALL(mock_http_client, Post_t(HasSubstr("querymessages"), _, expected_query_string, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), SetArgPointee<4>(nullptr), Return(responce_string))); consumer->SetTimeout(100); asapo::Error err; - auto images = consumer->QueryImages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, &err); ASSERT_THAT(err, Ne(nullptr)); - ASSERT_THAT(images.size(), Eq(0)); + ASSERT_THAT(messages.size(), Eq(0)); ASSERT_THAT(err->Explain(), HasSubstr("response")); } -TEST_F(ConsumerImplTests, QueryImagesWrongResponseRecorsd) { +TEST_F(ConsumerImplTests, QueryMessagesWrongResponseRecorsd) { MockGetBrokerUri(); auto responce_string = R"([{"bla":1},{"err":}])"; - EXPECT_CALL(mock_http_client, Post_t(HasSubstr("queryimages"), _, expected_query_string, _, _)).WillOnce(DoAll( + EXPECT_CALL(mock_http_client, Post_t(HasSubstr("querymessages"), _, expected_query_string, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), SetArgPointee<4>(nullptr), Return(responce_string))); consumer->SetTimeout(100); asapo::Error err; - auto images = consumer->QueryImages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, &err); ASSERT_THAT(err, Ne(nullptr)); - ASSERT_THAT(images.size(), Eq(0)); + ASSERT_THAT(messages.size(), Eq(0)); ASSERT_THAT(err->Explain(), HasSubstr("response")); } -TEST_F(ConsumerImplTests, QueryImagesReturnRecords) { +TEST_F(ConsumerImplTests, QueryMessagesReturnRecords) { MockGetBrokerUri(); @@ -879,36 +879,36 @@ TEST_F(ConsumerImplTests, QueryImagesReturnRecords) { EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/0" + - "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll( + "/querymessages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), SetArgPointee<4>(nullptr), Return(responce_string))); consumer->SetTimeout(100); asapo::Error err; - auto images = consumer->QueryImages(expected_query_string, &err); + auto messages = consumer->QueryMessages(expected_query_string, &err); ASSERT_THAT(err, Eq(nullptr)); - ASSERT_THAT(images.size(), Eq(2)); + ASSERT_THAT(messages.size(), Eq(2)); - ASSERT_THAT(images[0].name, Eq(rec1.name)); - ASSERT_THAT(images[1].name, Eq(rec2.name)); + ASSERT_THAT(messages[0].name, Eq(rec1.name)); + ASSERT_THAT(messages[1].name, Eq(rec2.name)); } -TEST_F(ConsumerImplTests, QueryImagesUsesCorrectUriWithStream) { +TEST_F(ConsumerImplTests, QueryMessagesUsesCorrectUriWithStream) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_stream + "/0" + - "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll( + "/querymessages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll( SetArgPointee<3>(HttpCode::OK), SetArgPointee<4>(nullptr), Return("[]"))); consumer->SetTimeout(100); asapo::Error err; - auto images = consumer->QueryImages(expected_query_string, expected_stream, &err); + auto messages = consumer->QueryMessages(expected_query_string, expected_stream, &err); ASSERT_THAT(err, Eq(nullptr)); @@ -944,7 +944,7 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsMessageMetas) { auto json = std::string("{") + "\"_id\":1," + "\"size\":3," + - "\"images\":[" + json1 + "," + json2 + "," + json3 + "]" + + "\"messages\":[" + json1 + "," + json2 + "," + json3 + "]" + "}"; MockGet(json); @@ -976,7 +976,7 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsPartialMessageMetas) { auto json = std::string("{") + "\"_id\":1," + "\"size\":3," + - "\"images\":[" + json1 + "," + json2 + "]" + + "\"messages\":[" + json1 + "," + json2 + "]" + "}"; MockGet(json, asapo::HttpCode::PartialContent); @@ -1011,7 +1011,7 @@ TEST_F(ConsumerImplTests, GetDataSetByIdReturnsPartialMessageMetas) { auto json = std::string("{") + "\"_id\":1," + "\"size\":3," + - "\"images\":[" + json1 + "," + json2 + "]" + + "\"messages\":[" + json1 + "," + json2 + "]" + "}"; MockGet(json, asapo::HttpCode::PartialContent); @@ -1155,7 +1155,7 @@ void ConsumerImplTests::ExpectFileTransfer(const asapo::ConsumerErrorTemplate* p expected_cookie, expected_fts_query_string, _, - expected_image_size, + expected_message_size, _)).WillOnce(DoAll( SetArgPointee<5>(HttpCode::OK), AssignArg3(p_err_template == nullptr), @@ -1168,7 +1168,7 @@ void ConsumerImplTests::ExpectRepeatedFileTransfer() { expected_cookie, expected_fts_query_string, _, - expected_image_size, + expected_message_size, _)). WillOnce(DoAll( SetArgPointee<5>(HttpCode::Unauthorized), @@ -1195,7 +1195,7 @@ void ConsumerImplTests::AssertSingleFileTransfer() { Mock::VerifyAndClearExpectations(&mock_io); } -TEST_F(ConsumerImplTests, GetImageUsesFileTransferServiceIfCannotReadFromCache) { +TEST_F(ConsumerImplTests, GetMessageUsesFileTransferServiceIfCannotReadFromCache) { AssertSingleFileTransfer(); } @@ -1226,7 +1226,7 @@ TEST_F(ConsumerImplTests, FileTransferReadsFileSize) { auto err = fts_consumer->RetrieveData(&info, &data); } -TEST_F(ConsumerImplTests, GetImageReusesTokenAndUri) { +TEST_F(ConsumerImplTests, GetMessageReusesTokenAndUri) { AssertSingleFileTransfer(); asapo::MessageData data = asapo::MessageData{new uint8_t[1]}; @@ -1236,7 +1236,7 @@ TEST_F(ConsumerImplTests, GetImageReusesTokenAndUri) { auto err = fts_consumer->GetNext(&info, expected_group_id, &data); } -TEST_F(ConsumerImplTests, GetImageTriesToGetTokenAgainIfTransferFailed) { +TEST_F(ConsumerImplTests, GetMessageTriesToGetTokenAgainIfTransferFailed) { AssertSingleFileTransfer(); asapo::MessageData data; @@ -1249,7 +1249,7 @@ TEST_F(ConsumerImplTests, GetImageTriesToGetTokenAgainIfTransferFailed) { TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) { MockGetBrokerUri(); - auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}"; + auto expected_acknowledge_command = "{\"Op\":\"ackmessage\"}"; EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_stream + "/" + expected_group_id @@ -1266,7 +1266,7 @@ TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) { TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUriWithDefaultStream) { MockGetBrokerUri(); - auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}"; + auto expected_acknowledge_command = "{\"Op\":\"ackmessage\"}"; EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/default/" + expected_group_id @@ -1353,7 +1353,7 @@ TEST_F(ConsumerImplTests, ResendNacks) { TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) { MockGetBrokerUri(); - auto expected_neg_acknowledge_command = R"({"Op":"negackimage","Params":{"DelayMs":10000}})"; + auto expected_neg_acknowledge_command = R"({"Op":"negackmessage","Params":{"DelayMs":10000}})"; EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_data_source + "/" + expected_stream + "/" + expected_group_id diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index b7a7b4500..70d2d4de4 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -74,7 +74,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: IdList GetUnacknowledgedTupleIds(string group_id, string stream, uint64_t from_id, uint64_t to_id, Error* error) string GenerateNewGroupId(Error* err) string GetBeamtimeMeta(Error* err) - MessageMetas QueryImages(string query, string stream, Error* err) + MessageMetas QueryMessages(string query, string stream, Error* err) DataSet GetNextDataset(string group_id, string stream, uint64_t min_size, Error* err) DataSet GetLastDataset(string stream, uint64_t min_size, Error* err) DataSet GetDatasetById(uint64_t id, string stream, uint64_t min_size, Error* err) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index a664a22ae..e7e85c62d 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -266,13 +266,13 @@ cdef class PyConsumer: list.append(id) return list - def query_images(self,query, stream = "default"): + def query_messages(self,query, stream = "default"): cdef string b_query = _bytes(query) cdef string b_stream = _bytes(stream) cdef Error err cdef MessageMetas message_metas with nogil: - message_metas = self.c_consumer.get().QueryImages(b_query,b_stream,&err) + message_metas = self.c_consumer.get().QueryMessages(b_query,b_stream,&err) if err: throw_exception(err) json_list = [] diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index 241773a5c..87becf1a9 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -192,7 +192,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { asapo::Error err; auto producer = asapo::Producer::Create(args.server, args.nthreads, asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{asapo::SourceType::kProcessed,args.beamtime_id, "", args.stream_out, args.token }, 60, &err); + asapo::SourceCredentials{asapo::SourceType::kProcessed,args.beamtime_id, "", args.stream_out, args.token }, 60000, &err); if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; exit(EXIT_FAILURE); diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 6516f2b34..b44eae6a8 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -25,7 +25,7 @@ struct Args { uint64_t nthreads; uint64_t mode; uint64_t timeout_ms; - uint64_t images_in_set; + uint64_t messages_in_set; }; void PrintCommandArguments(const Args& args) { @@ -39,7 +39,7 @@ void PrintCommandArguments(const Args& args) { << "Tcp mode: " << ((args.mode % 10) ==0 ) << std::endl << "Raw: " << (args.mode / 100 == 1)<< std::endl << "timeout: " << args.timeout_ms << std::endl - << "images in set: " << args.images_in_set << std::endl + << "messages in set: " << args.messages_in_set << std::endl << std::endl; } @@ -74,7 +74,7 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { std::cout << "Usage: " << argv[0] << " <destination> <beamtime_id[%<data_source>%<token>]> <number_of_kbyte> <iterations> <nthreads>" - " <mode 0xx - processed source type, 1xx - raw source type, xx0 -t tcp, xx1 - filesystem, x0x - write files, x1x - do not write files> <timeout (sec)> [n images in set (default 1)]" + " <mode 0xx - processed source type, 1xx - raw source type, xx0 -t tcp, xx1 - filesystem, x0x - write files, x1x - do not write files> <timeout (sec)> [n messages in set (default 1)]" << std::endl; exit(EXIT_FAILURE); } @@ -88,9 +88,9 @@ void ProcessCommandArguments(int argc, char* argv[], Args* args) { args->mode = std::stoull(argv[6]); args->timeout_ms = std::stoull(argv[7])*1000; if (argc == 9) { - args->images_in_set = std::stoull(argv[8]); + args->messages_in_set = std::stoull(argv[8]); } else { - args->images_in_set = 1; + args->messages_in_set = 1; } PrintCommandArguments(*args); return; @@ -129,7 +129,7 @@ asapo::MessageData CreateMemoryBuffer(size_t size) { } -bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations, uint64_t images_in_set, +bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations, uint64_t messages_in_set, const std::string& data_source, bool write_files, asapo::SourceType type) { asapo::Error err; @@ -141,7 +141,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it } } - std::string image_folder = GetStringFromSourceType(type)+asapo::kPathSeparator; + std::string message_folder = GetStringFromSourceType(type)+asapo::kPathSeparator; for (uint64_t i = 0; i < iterations; i++) { @@ -151,9 +151,9 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it if (!data_source.empty()) { message_header.file_name = data_source + "/" + message_header.file_name; } - message_header.file_name = image_folder+message_header.file_name; + message_header.file_name = message_folder+message_header.file_name; message_header.user_metadata = std::move(meta); - if (images_in_set == 1) { + if (messages_in_set == 1) { auto err = producer->Send(message_header, std::move(buffer), write_files ? asapo::kDefaultIngestMode : asapo::kTransferData, &ProcessAfterSend); if (err) { @@ -161,16 +161,16 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it return false; } } else { - for (uint64_t id = 0; id < images_in_set; id++) { + for (uint64_t id = 0; id < messages_in_set; id++) { auto buffer = CreateMemoryBuffer(number_of_byte); message_header.id_in_subset = id + 1; - message_header.subset_size = images_in_set; + message_header.subset_size = messages_in_set; message_header.message_id = i + 1; message_header.file_name = std::to_string(i + 1) + "_" + std::to_string(id + 1); if (!data_source.empty()) { message_header.file_name = data_source + "/" + message_header.file_name; } - message_header.file_name = image_folder + message_header.file_name; + message_header.file_name = message_folder + message_header.file_name; message_header.user_metadata = meta; auto err = producer->Send(message_header, std::move(buffer), write_files ? asapo::kDefaultIngestMode : @@ -219,12 +219,12 @@ int main (int argc, char* argv[]) { if (args.iterations == 0) { iterations_remained = 1; // metadata } else { - iterations_remained = args.iterations * args.images_in_set; + iterations_remained = args.iterations * args.messages_in_set; } system_clock::time_point start_time = system_clock::now(); - if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations, args.images_in_set, args.data_source, + if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations, args.messages_in_set, args.data_source, (args.mode %100) / 10 == 0,args.mode / 100 == 0 ?asapo::SourceType::kProcessed:asapo::SourceType::kRaw)) { return EXIT_FAILURE; } diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h index 848c94d2c..db73edf62 100644 --- a/producer/api/cpp/include/asapo/producer/producer.h +++ b/producer/api/cpp/include/asapo/producer/producer.h @@ -96,7 +96,7 @@ class Producer { //! Marks stream finished /*! \param stream - Name of the stream to makr finished - \param last_id - ID of the last image in stream + \param last_id - ID of the last message in stream \param next_stream - Name of the next stream (empty if not set) \return Error - Will be nullptr on success */ diff --git a/producer/event_monitor_producer/unittests/test_system_folder_watch_linux.cpp b/producer/event_monitor_producer/unittests/test_system_folder_watch_linux.cpp index 68e852f63..d2b1f50e4 100644 --- a/producer/event_monitor_producer/unittests/test_system_folder_watch_linux.cpp +++ b/producer/event_monitor_producer/unittests/test_system_folder_watch_linux.cpp @@ -64,7 +64,7 @@ class SystemFolderWatchTests : public testing::Test { std::vector<std::string> expected_watches{"/tmp/test1", "/tmp/test2", "/tmp/test1/sub11", "/tmp/test2/sub21", "/tmp/test2/sub22", "/tmp/test2/sub21/sub211"}; std::string expected_filename1{"file1"}; std::string expected_filename2{"file2"}; - MessageMetas expected_fileinfos = CreateTestMessageMetas(); + MessageMetas expected_message_metas = CreateTestMessageMetas(); int expected_wd = 10; std::vector<int>expected_fds = {1, 2, 3, 4, 5, 6}; void MockStartMonitoring(); @@ -287,7 +287,7 @@ void SystemFolderWatchTests::ExpectCreateFolder(std::string folder, bool with_fi if (with_files) { ON_CALL(mock_io, FilesInFolder_t(newfolder, _)). WillByDefault(DoAll(testing::SetArgPointee<1>(nullptr), - testing::Return(expected_fileinfos))); + testing::Return(expected_message_metas))); } else { ON_CALL(mock_io, FilesInFolder_t(newfolder, _)). WillByDefault(DoAll(testing::SetArgPointee<1>(nullptr), diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 101279ed0..904e2f9db 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -96,27 +96,27 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(err == nullptr, "GetNext5 no error"); M_AssertTrue(fi.name == "1", "GetNext5 filename"); - auto images = consumer->QueryImages("meta.test = 10", &err); + auto messages = consumer->QueryMessages("meta.test = 10", &err); M_AssertTrue(err == nullptr, "query1"); - M_AssertTrue(images.size() == 10, "size of query answer 1"); + M_AssertTrue(messages.size() == 10, "size of query answer 1"); - images = consumer->QueryImages("meta.test = 10 AND name='1'", &err); + messages = consumer->QueryMessages("meta.test = 10 AND name='1'", &err); M_AssertTrue(err == nullptr, "query2"); - M_AssertTrue(images.size() == 1, "size of query answer 2"); + M_AssertTrue(messages.size() == 1, "size of query answer 2"); M_AssertTrue(fi.name == "1", "GetNext5 filename"); - images = consumer->QueryImages("meta.test = 11", &err); + messages = consumer->QueryMessages("meta.test = 11", &err); M_AssertTrue(err == nullptr, "query3"); - M_AssertTrue(images.size() == 0, "size of query answer 3"); + M_AssertTrue(messages.size() == 0, "size of query answer 3"); - images = consumer->QueryImages("meta.test = 18", &err); + messages = consumer->QueryMessages("meta.test = 18", &err); M_AssertTrue(err == nullptr, "query4"); - M_AssertTrue(images.size() == 0, "size of query answer 4"); + M_AssertTrue(messages.size() == 0, "size of query answer 4"); - images = consumer->QueryImages("bla", &err); + messages = consumer->QueryMessages("bla", &err); M_AssertTrue(err != nullptr, "query5"); - M_AssertTrue(images.size() == 0, "size of query answer 5"); + M_AssertTrue(messages.size() == 0, "size of query answer 5"); //streams diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 7f4942822..0e6a637f6 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -174,24 +174,24 @@ def check_single(consumer, group_id): _, meta = consumer.get_next(group_id, meta_only=True) assert_metaname(meta, "2", "get next after resend") - # images + # messages - images = consumer.query_images("meta.test = 10") - assert_eq(len(images), 5, "size of query answer 1") - for image in images: - assert_usermetadata(image, "query_images") + messages = consumer.query_messages("meta.test = 10") + assert_eq(len(messages), 5, "size of query answer 1") + for message in messages: + assert_usermetadata(message, "query_messages") - images = consumer.query_images("meta.test = 10 AND name='1'") - assert_eq(len(images), 1, "size of query answer 2 ") + messages = consumer.query_messages("meta.test = 10 AND name='1'") + assert_eq(len(messages), 1, "size of query answer 2 ") - for image in images: - assert_usermetadata(image, "query_images") + for message in messages: + assert_usermetadata(message, "query_messages") - images = consumer.query_images("meta.test = 11") - assert_eq(len(images), 0, "size of query answer 3 ") + messages = consumer.query_messages("meta.test = 11") + assert_eq(len(messages), 0, "size of query answer 3 ") try: - images = consumer.query_images("bla") + messages = consumer.query_messages("bla") except: pass else: diff --git a/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py b/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py index 176c5e0df..7a3c0521c 100644 --- a/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py +++ b/tests/automatic/full_chain/simple_chain_usermeta_python/get_user_meta.py @@ -7,7 +7,7 @@ source, path, beamtime, token, group_id = sys.argv[1:] consumer = asapo_consumer.create_consumer(source,path,True, beamtime,"",token,60000) -images = consumer.query_images("meta.user_meta regexp 'test*' order by _id") +messages = consumer.query_messages("meta.user_meta regexp 'test*' order by _id") -print ('found images:',len(images)) -print (images[99]['meta']['user_meta']) +print ('found messages:',len(messages)) +print (messages[99]['meta']['user_meta']) diff --git a/tests/manual/python_tests/consumer/consumer_api.py b/tests/manual/python_tests/consumer/consumer_api.py index 28d8e16db..2c72bf03f 100644 --- a/tests/manual/python_tests/consumer/consumer_api.py +++ b/tests/manual/python_tests/consumer/consumer_api.py @@ -7,7 +7,7 @@ source, path, beamtime, token = sys.argv[1:] consumer = asapo_consumer.create_consumer(source,path,False, beamtime,"",token,1000) group_id = consumer.generate_group_id() -res = consumer.query_images("_id > 0", stream="1") +res = consumer.query_messages("_id > 0", stream="1") print(res) diff --git a/tests/manual/python_tests/plot_images_online.py b/tests/manual/python_tests/plot_images_online.py index e9e31a85d..53e63a99c 100644 --- a/tests/manual/python_tests/plot_images_online.py +++ b/tests/manual/python_tests/plot_images_online.py @@ -17,7 +17,7 @@ while True: id = meta['_id'] if id == last_id: continue - fid = h5py.h5f.open_file_image(data) + fid = h5py.h5f.open_file_message(data) f = h5py.File(fid) data1 = np.array(f['mydataset']) print(data1) @@ -31,7 +31,7 @@ while True: #alternative - but tobytes creates an additional copy - not nice. #import tables #h5file1 = tables.open_file("in-memory-sample.h5", driver="H5FD_CORE", -# driver_core_image=data.tobytes(), +# driver_core_message=data.tobytes(), # driver_core_backing_store=0) #data2 = h5file1.root.mydataset.read() -- GitLab