diff --git a/CHANGELOG.md b/CHANGELOG.md index ba8f2a7dc46bf4d103e247062890ccb1e2bb5a43..c56a4a3309a2b283ad7cb4fa93160ed9029c7920 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 21.12.0 (in progress) + +FEATURES +* Consumer API: Get last within consumer group returns message only once + + ## 21.09.0 FEATURES diff --git a/CMakeIncludes/prepare_version_tag.cmake b/CMakeIncludes/prepare_version_tag.cmake index e2a41293f288770013d83c7664018edbea285a6e..59de5c91e18dec4151309f5bcf23e60e4199c576 100644 --- a/CMakeIncludes/prepare_version_tag.cmake +++ b/CMakeIncludes/prepare_version_tag.cmake @@ -20,7 +20,6 @@ execute_process(COMMAND git rev-parse --short=10 HEAD string(STRIP ${ASAPO_VERSION_COMMIT} ASAPO_VERSION_COMMIT) if (${BRANCH} STREQUAL "master") - SET (ASAPO_VERSION_IN_DOCS ${ASAPO_TAG}) SET (ASAPO_VERSION ${ASAPO_TAG}) SET (ASAPO_VERSION_COMMIT "") SET (ASAPO_VERSION_DOCKER_SUFFIX "") @@ -38,10 +37,6 @@ else() SET (ASAPO_WHEEL_VERSION ${ASAPO_VERSION}) endif() -string(REGEX REPLACE "\\.0([0-9]+)\\." - ".\\1." ASAPO_WHEEL_VERSION_IN_DOCS - ${ASAPO_VERSION_IN_DOCS}) - message("Asapo Version: " ${ASAPO_VERSION}) message("Python Asapo Version: " ${PYTHON_ASAPO_VERSION}) message("Asapo commit: " ${ASAPO_VERSION_COMMIT}) diff --git a/CMakeLists.txt b/CMakeLists.txt index 36302965720bf9265d21426f3afa9bb1aba779f4..b65a84e1e4c9d3f50a75dbae7e5709f682748138 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,16 +3,12 @@ project(ASAPO) set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/CMakeModules/ ${PROJECT_SOURCE_DIR}/CMakeIncludes/) -set (ASAPO_VERSION_IN_DOCS 21.06.0) # is overwritten in master branch -set (ASAPO_EXAMPLES_DIR .) -#set (ASAPO_EXAMPLES_DIR ./frozen_versions/${ASAPO_VERSION_IN_DOCS}) - #protocol version changes if one of the microservice API's change -set (ASAPO_CONSUMER_PROTOCOL "v0.4") +set (ASAPO_CONSUMER_PROTOCOL "v0.5") set (ASAPO_PRODUCER_PROTOCOL "v0.4") set (ASAPO_DISCOVERY_API_VER "v0.1") set (ASAPO_AUTHORIZER_API_VER "v0.2") -set (ASAPO_BROKER_API_VER "v0.4") +set (ASAPO_BROKER_API_VER "v0.5") set (ASAPO_FILE_TRANSFER_SERVICE_API_VER "v0.2") set (ASAPO_RECEIVER_API_VER "v0.4") set (ASAPO_RDS_API_VER "v0.1") diff --git a/PROTOCOL-VERSIONS.md b/PROTOCOL-VERSIONS.md index 3b87af27833e8b8bfeab57e857ca531cac18a631..f91b0d2fb04d1393bab62174ebaad43ff84561ab 100644 --- a/PROTOCOL-VERSIONS.md +++ b/PROTOCOL-VERSIONS.md @@ -10,6 +10,7 @@ ### Consumer Protocol | Release | used by client | Supported by server | Status | | ------------ | ------------------- | -------------------- | ---------------- | +| v0.5 | | | In development | | v0.4 | 21.06.0 - 21.09.0 | 21.06.0 - 21.09.0 | Current version | | v0.3 | 21.03.3 - 21.03.3 | 21.03.3 - 21.09.0 | Deprecates from 01.07.2022 | | v0.2 | 21.03.2 - 21.03.2 | 21.03.2 - 21.09.0 | Deprecates from 01.06.2022 | diff --git a/VERSIONS.md b/VERSIONS.md index 05d9509333bb681295b0a3f1a6c4f852e62c49d7..438a72968942edef808eba10d776fe10aa0df84a 100644 --- a/VERSIONS.md +++ b/VERSIONS.md @@ -2,7 +2,8 @@ | Release | API changed\*\* | Protocol | Supported by server from/to | Status |Comment| | ------------ | ----------- | -------- | ------------------------- | --------------------- | ------- | -| 21.09.0 | No | v0.4 | 21.09.0/21.09.0 | current version |beamline token for raw | +| 21.12.0 | No | v0.4 | 21.12.0/21.12.0 | in development | | +| 21.09.0 | No | v0.4 | 21.09.0/21.09.0 | current version |beamline token for raw | | 21.06.0 | Yes | v0.3 | 21.06.0/21.09.0 | deprecates 01.09.2022 |arbitrary characters| | 21.03.3 | No | v0.2 | 21.03.2/21.09.0 | deprecates 01.07.2022 |bugfix in server| | 21.03.2 | Yes | v0.2 | 21.03.2/21.09.0 | deprecates 01.07.2022 |bugfixes, add delete_stream| @@ -13,6 +14,7 @@ | Release | API changed\*\* | Protocol | Supported by server from/to | Status |Comment| | ------------ | ----------- | --------- | ------------------------- | ---------------- | ------- | +| 21.12.0 | Yes | v0.5 | 21.12.0/21.12.0 | in development | | | 21.09.0 | No | v0.4 | 21.06.0/21.09.0 | current version | | | 21.06.0 | Yes | v0.4 | 21.06.0/21.09.0 | |arbitrary characters, bugfixes | | 21.03.3 | Yes | v0.3 | 21.03.3/21.09.0 | deprecates 01.06.2022 |bugfix in server, error type for dublicated ack| diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 37eb03008fd1ab8df9b4f8c187e5542459ce5c11..0291da19f76a6235dbfcc947306038414a7b53e4 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -65,6 +65,9 @@ const inprocess_collection_name_prefix = "inprocess_" const meta_collection_name = "meta" const pointer_collection_name = "current_location" const pointer_field_name = "current_pointer" +const last_message_collection_name = "last_messages" +const last_message_field_name = "last_message" + const no_session_msg = "database client not created" const already_connected_msg = "already connected" @@ -74,6 +77,19 @@ const stream_filter_all = "all" const stream_filter_finished = "finished" const stream_filter_unfinished = "unfinished" +const ( + field_op_inc int = iota + field_op_set +) + +type fieldChangeRequest struct { + collectionName string + fieldName string + op int + max_ind int + val int +} + var dbSessionLock sync.Mutex var dbClientLock sync.RWMutex @@ -217,20 +233,26 @@ func (db *Mongodb) setCounter(request Request, ind int) (err error) { return } -func (db *Mongodb) errorWhenCannotIncrementField(request Request, max_ind int) (err error) { +func (db *Mongodb) errorWhenCannotSetField(request Request, max_ind int) error { if res, err := db.getRecordFromDb(request, max_ind, max_ind); err == nil { - if err := checkStreamFinished(request, max_ind, max_ind, res); err != nil { - return err + if err2 := checkStreamFinished(request, max_ind, max_ind, res); err2 != nil { + return err2 } } return &DBError{utils.StatusNoData, encodeAnswer(max_ind, max_ind, "")} } -func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) (err error) { - update := bson.M{"$inc": bson.M{pointer_field_name: 1}} +func (db *Mongodb) changeField(request Request, change fieldChangeRequest, res interface{}) (err error) { + var update bson.M + if change.op == field_op_inc { + update = bson.M{"$inc": bson.M{change.fieldName: 1}} + } else if change.op == field_op_set { + update = bson.M{"$set": bson.M{change.fieldName: change.val}} + } + opts := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After) - q := bson.M{"_id": request.GroupId + "_" + request.Stream, pointer_field_name: bson.M{"$lt": max_ind}} - c := db.client.Database(request.DbName).Collection(pointer_collection_name) + q := bson.M{"_id": request.GroupId + "_" + request.Stream, change.fieldName: bson.M{"$lt": change.max_ind}} + c := db.client.Database(request.DbName).Collection(change.collectionName) err = c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res) if err != nil { @@ -242,7 +264,7 @@ func (db *Mongodb) incrementField(request Request, max_ind int, res interface{}) if err2 := c.FindOneAndUpdate(context.TODO(), q, update, opts).Decode(res); err2 == nil { return nil } - return db.errorWhenCannotIncrementField(request, max_ind) + return db.errorWhenCannotSetField(request, change.max_ind) } return &DBError{utils.StatusTransactionInterrupted, err.Error()} } @@ -421,7 +443,11 @@ func (db *Mongodb) getCurrentPointer(request Request) (LocationPointer, int, err } var curPointer LocationPointer - err = db.incrementField(request, max_ind, &curPointer) + err = db.changeField(request, fieldChangeRequest{ + collectionName: pointer_collection_name, + fieldName: pointer_field_name, + op: field_op_inc, + max_ind: max_ind}, &curPointer) if err != nil { return LocationPointer{}, 0, err } @@ -631,6 +657,26 @@ func (db *Mongodb) getLastRecord(request Request) ([]byte, error) { return db.getRecordByIDRaw(request, max_ind, max_ind) } +func (db *Mongodb) getLastRecordInGroup(request Request) ([]byte, error) { + max_ind, err := db.getMaxIndex(request, false) + if err != nil { + return nil, err + } + + var res map[string]interface{} + err = db.changeField(request, fieldChangeRequest{ + collectionName: last_message_collection_name, + fieldName: last_message_field_name, + op: field_op_set, + max_ind: max_ind, + val: max_ind, + }, &res) + if err != nil { + return nil, err + } + return db.getRecordByIDRaw(request, max_ind, max_ind) +} + func getSizeFilter(request Request) bson.M { filter := bson.M{} if request.ExtraParam == "false" { // do not return incomplete datasets @@ -704,7 +750,7 @@ func (db *Mongodb) getMeta(request Request) ([]byte, error) { logger.Debug(log_str) return nil, &DBError{utils.StatusNoData, err.Error()} } - userMeta,ok:=res["meta"] + userMeta, ok := res["meta"] if !ok { log_str := "error getting meta for " + id + " in " + request.DbName + " : cannot parse database response" logger.Error(log_str) @@ -1040,6 +1086,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { return db.getRecordByID(request) case "last": return db.getLastRecord(request) + case "groupedlast": + return db.getLastRecordInGroup(request) case "resetcounter": return db.resetCounter(request) case "size": diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index c81813611b03dbed3b7124ca1fa8227ce88ca897..09bf8ab1cb9f16605f5261ba17e29929b08b6e08 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -445,6 +445,60 @@ func TestMongoDBGetNextAfterGetLastCorrect(t *testing.T) { } +func TestMongoDBGetGetLastInGroupCorrect(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "next"}) // to check it does not influence groupedlast + + res, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + assert.Nil(t, err) + assert.Equal(t, string(rec1_expect), string(res)) + +// first record - ok, then error + res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + assert.NotNil(t, err) + if err != nil { + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_stream\":\"\"}", err.(*DBError).Message) + } +// second record - ok, then error + db.insertRecord(dbname, collection, &rec2) + res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + assert.Nil(t, err) + assert.Equal(t, string(rec2_expect), string(res)) + res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + assert.NotNil(t, err) + +// stream finished - immediately error + db.insertRecord(dbname, collection, &rec_finished3) + res, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + assert.NotNil(t, err) + if err != nil { + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_stream\":\"next1\"}", err.(*DBError).Message) + } + +} + +func TestMongoDBGetGetLastInGroupImmediateErrorOnFinishStream(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.insertRecord(dbname, collection, &rec1) + db.insertRecord(dbname, collection, &rec2) + db.insertRecord(dbname, collection, &rec_finished3) + _, err := db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + assert.NotNil(t, err) + _, err = db.ProcessRequest(Request{DbName: dbname, Stream: collection, GroupId: groupId, Op: "groupedlast", ExtraParam: ""}) + assert.NotNil(t, err) + if err != nil { + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_stream\":\"next1\"}", err.(*DBError).Message) + } +} + + + func TestMongoDBGetSize(t *testing.T) { db.Connect(dbaddress) defer cleanup() diff --git a/broker/src/asapo_broker/server/get_commands_test.go b/broker/src/asapo_broker/server/get_commands_test.go index 3f34750b20451d69d8fc62f456070e905f1327b7..17f3aea5a32a1308f3ec4e7e49f5d894cd6c2aea 100644 --- a/broker/src/asapo_broker/server/get_commands_test.go +++ b/broker/src/asapo_broker/server/get_commands_test.go @@ -47,6 +47,7 @@ var testsGetCommand = []struct { {"id", expectedSource,expectedStream, "", expectedStream + "/0/1","","1"}, {"meta", expectedSource,"default", "", "default/0/meta/0","","0"}, {"nacks",expectedSource, expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/nacks","","0_0"}, + {"groupedlast", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/groupedlast","",""}, {"next", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next","",""}, {"next", expectedSource,expectedStream, expectedGroupID, expectedStream + "/" + expectedGroupID + "/next","&resend_nacks=true&delay_ms=10000&resend_attempts=3","10000_3"}, diff --git a/broker/src/asapo_broker/server/get_groupedlast.go b/broker/src/asapo_broker/server/get_groupedlast.go new file mode 100644 index 0000000000000000000000000000000000000000..747d6fcf0938dd1e1cb9ed78c9b8599f084bb002 --- /dev/null +++ b/broker/src/asapo_broker/server/get_groupedlast.go @@ -0,0 +1,9 @@ +package server + +import ( + "net/http" +) + +func routeGetGroupedLast(w http.ResponseWriter, r *http.Request) { + processRequest(w, r, "groupedlast", "", true) +} diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index c6064e117787ebc6a1f97e69a2a4734e34db69c5..3c99b1a8a85f5a6dd4599df35d7e80f143df807a 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -35,6 +35,12 @@ var listRoutes = utils.Routes{ "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/0/last", routeGetLast, }, + utils.Route{ + "GetGroupedLast", + "Get", + "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/{groupid}/groupedlast", + routeGetGroupedLast, + }, utils.Route{ "GetLastAck", "Get", diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h index bc73c17e2ec3c7f8e9207e068c0549ca315dc5b2..dcab928d981c79f63a417c02174af91080388163 100644 --- a/consumer/api/c/include/asapo/consumer_c.h +++ b/consumer/api/c/include/asapo/consumer_c.h @@ -134,6 +134,11 @@ AsapoDataSetHandle asapo_consumer_get_last_dataset(AsapoConsumerHandle consumer, uint64_t min_size, const char* stream, AsapoErrorHandle* error); +AsapoDataSetHandle asapo_consumer_get_last_dataset_ingroup(AsapoConsumerHandle consumer, + AsapoStringHandle group_id, + uint64_t min_size, + const char* stream, + AsapoErrorHandle* error); AsapoDataSetHandle asapo_consumer_get_dataset_by_id(AsapoConsumerHandle consumer, uint64_t id, uint64_t min_size, @@ -144,11 +149,17 @@ int asapo_consumer_get_by_id(AsapoConsumerHandle consumer, AsapoMessageMetaHandle* info, AsapoMessageDataHandle* data, const char* stream, AsapoErrorHandle* error); - int asapo_consumer_get_last(AsapoConsumerHandle consumer, AsapoMessageMetaHandle* info, AsapoMessageDataHandle* data, const char* stream, AsapoErrorHandle* error); + +int asapo_consumer_get_last_ingroup(AsapoConsumerHandle consumer, + AsapoStringHandle group_id, + AsapoMessageMetaHandle* info, + AsapoMessageDataHandle* data, + const char* stream, AsapoErrorHandle* error); + int asapo_consumer_get_next(AsapoConsumerHandle consumer, AsapoStringHandle group_id, AsapoMessageMetaHandle* info, diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index f7a645f467a4d20feae40bbd045fe92eb0d5f74a..026d4c324cd583068173a6c9eb65428bb3711c54 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -88,7 +88,7 @@ class Consumer { virtual NetworkConnectionType CurrentConnectionType() const = 0; //! Get list of streams with filter, set from to "" to get all streams - virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0; + virtual StreamInfos GetStreamList(std::string from, StreamFilter filter, Error* err) = 0; //! Delete stream /*! @@ -177,6 +177,17 @@ class Consumer { */ virtual DataSet GetLastDataset(uint64_t min_size, std::string stream, Error* err) = 0; + //! Receive last available unique dataset which has min_size messages within a group. Will not return same dataset twice but EndOfStream error. + /*! + \param group_id - group id to use. + \param id - dataset id + \param err - will be set to error data cannot be read or dataset size less than min_size, nullptr otherwise. + \param min_size - wait until dataset has min_size messages (0 for maximum size) + \param stream - stream to use + \return DataSet - information about the dataset + */ + virtual DataSet GetLastDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) = 0; + //! Receive dataset by id. /*! \param id - dataset id @@ -214,6 +225,16 @@ class Consumer { \return Error if both pointers are nullptr or data cannot be read, nullptr otherwise. */ virtual Error GetLast(MessageMeta* info, MessageData* data, std::string stream) = 0; + //! Receive last available unique message within group. Will not return same message twice but EndOfStream error + /*! + \param group_id - group id to use. + \param info - where to store message metadata. Can be set to nullptr only message data is needed. + \param data - where to store message data. Can be set to nullptr only message metadata is needed. + \param stream - stream to use + \return Error if both pointers are nullptr, no data or data cannot be read, nullptr otherwise. + */ + virtual Error GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) = 0; + //! Get all messages matching the query. /*! diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index 969804d432dfcfc09a82ab6ddc0913a606ea3546..83b83d2a1235ad4db8d02afa6fab63c1b1fa4f69 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -365,6 +365,21 @@ extern "C" { return handle_or_null_t(result, error, std::move(err), &asapo::ConsumerErrorTemplates::kPartialData); } +//! wraps asapo::Consumer::GetLastDataset() +/// \copydoc asapo::Consumer::GetLastDataset() +/// \param[in] consumer the consumer that is acted upon +/// the returned data set must be freed with asapo_free_handle() after use. + AsapoDataSetHandle asapo_consumer_get_last_dataset_ingroup(AsapoConsumerHandle consumer, + AsapoStringHandle group_id, + uint64_t min_size, + const char* stream, + AsapoErrorHandle* error) { + asapo::Error err; + auto result = new asapo::DataSet(consumer->handle->GetLastDataset(*group_id->handle, min_size, stream, &err)); + return handle_or_null_t(result, error, std::move(err), + &asapo::ConsumerErrorTemplates::kPartialData); + } + //! wraps asapo::Consumer::GetLastAcknowledgedMessage() /// \copydoc asapo::Consumer::GetLastAcknowledgedMessage() @@ -429,6 +444,22 @@ extern "C" { return process_error(error, std::move(err)); } +//! wraps asapo::Consumer::GetLast() +/// \copydoc asapo::Consumer::GetLast() +/// \param[in] consumer the consumer that is acted upon +/// if data are retrieved (data != NULL) they must be freed with asapo_free_handle() + int asapo_consumer_get_last_ingroup(AsapoConsumerHandle consumer, + AsapoStringHandle group_id, + AsapoMessageMetaHandle* info, + AsapoMessageDataHandle* data, + const char* stream, + AsapoErrorHandle* error) { + dataGetterStart; + auto err = consumer->handle->GetLast(*group_id->handle, fi, data ? &d : nullptr, stream); + dataGetterStop; + return process_error(error, std::move(err)); + } + //! wraps asapo::Consumer::GetNext() /// \copydoc asapo::Consumer::GetNext() /// \param[in] consumer the consumer that is acted upon diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 659ad07a261974ce1cf0dba75cddc107cdb463a7..02cd9b9818c012c4c69326d694a999beca43c90d 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -352,6 +352,16 @@ Error ConsumerImpl::GetNext(std::string group_id, MessageMeta* info, MessageData data); } +Error ConsumerImpl::GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) { + return GetMessageFromServer(GetMessageServerOperation::GetLastInGroup, + 0, + std::move(group_id), + std::move(stream), + info, + data); +} + + Error ConsumerImpl::GetLast(MessageMeta* info, MessageData* data, std::string stream) { return GetMessageFromServer(GetMessageServerOperation::GetLast, 0, @@ -365,10 +375,12 @@ std::string ConsumerImpl::OpToUriCmd(GetMessageServerOperation op) { switch (op) { case GetMessageServerOperation::GetNext: return "next"; + case GetMessageServerOperation::GetLastInGroup: + return "groupedlast"; case GetMessageServerOperation::GetLast: return "last"; default: - return "last"; + return "error"; } } @@ -683,6 +695,12 @@ DataSet ConsumerImpl::GetLastDataset(uint64_t min_size, std::string stream, Erro return GetDatasetFromServer(GetMessageServerOperation::GetLast, 0, "0", std::move(stream), min_size, err); } +DataSet ConsumerImpl::GetLastDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) { + return GetDatasetFromServer(GetMessageServerOperation::GetLastInGroup, 0, std::move(group_id), std::move(stream), + min_size, err); +} + + DataSet ConsumerImpl::GetDatasetFromServer(GetMessageServerOperation op, uint64_t id, std::string group_id, std::string stream, diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index b4d228021c5297cdf2573db47d67eaf3a58f1f09..e8228e60aacdb135a0bb560266269fe50578020c 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -14,7 +14,8 @@ namespace asapo { enum class GetMessageServerOperation { GetNext, GetLast, - GetID + GetID, + GetLastInGroup, }; enum class OutputDataMode { @@ -51,7 +52,6 @@ Error ProcessRequestResponce(const RequestInfo& request, const Error& server_err Error ConsumerErrorFromNoDataResponse(const std::string& response); Error ConsumerErrorFromPartialDataResponse(const std::string& response); DataSet DecodeDatasetFromResponse(std::string response, Error* err); - class ConsumerImpl final : public asapo::Consumer { public: explicit ConsumerImpl(std::string server_uri, std::string source_path, bool has_filesystem, @@ -76,6 +76,7 @@ class ConsumerImpl final : public asapo::Consumer { Error GetNext(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override; Error GetLast(MessageMeta* info, MessageData* data, std::string stream) override; + Error GetLast(std::string group_id, MessageMeta* info, MessageData* data, std::string stream) override; std::string GenerateNewGroupId(Error* err) override; std::string GetBeamtimeMeta(Error* err) override; @@ -98,6 +99,7 @@ class ConsumerImpl final : public asapo::Consumer { DataSet GetNextDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) override; DataSet GetLastDataset(uint64_t min_size, std::string stream, Error* err) override; + DataSet GetLastDataset(std::string group_id, uint64_t min_size, std::string stream, Error* err) override; DataSet GetDatasetById(uint64_t id, uint64_t min_size, std::string stream, Error* err) override; diff --git a/consumer/api/cpp/unittests/test_consumer_impl.cpp b/consumer/api/cpp/unittests/test_consumer_impl.cpp index 0b783b42bf055819ae9f07860b4bdb3d34414c66..ef3ea07bf8e8abcd633ae2833c5d5748137c286e 100644 --- a/consumer/api/cpp/unittests/test_consumer_impl.cpp +++ b/consumer/api/cpp/unittests/test_consumer_impl.cpp @@ -153,7 +153,8 @@ class ConsumerImplTests : public Test { } void MockGetServiceUri(std::string service, std::string result) { EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/" + service + "?token=" - + expected_token + "&protocol=" + expected_consumer_protocol), _, + + expected_token + "&protocol=" + expected_consumer_protocol), + _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::OK), SetArgPointee<2>(nullptr), @@ -239,22 +240,39 @@ TEST_F(ConsumerImplTests, DefaultStreamIsDetector) { TEST_F(ConsumerImplTests, GetNextUsesCorrectUriWithStream) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/" + expected_group_id_encoded + "/next?token=" - + expected_token, _, - _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(""))); + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/" + expected_group_id_encoded + "/next?token=" + + expected_token, _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); consumer->GetNext(expected_group_id, &info, nullptr, expected_stream); } +TEST_F(ConsumerImplTests, GetLastOnceUsesCorrectUri) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + expected_stream_encoded + + "/" + expected_group_id_encoded + "/groupedlast?token=" + + expected_token, _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); + consumer->GetLast(expected_group_id, &info, nullptr, expected_stream); +} + TEST_F(ConsumerImplTests, GetLastUsesCorrectUri) { MockGetBrokerUri(); EXPECT_CALL(mock_http_client, - Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + expected_stream_encoded + + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + expected_stream_encoded + "/0/last?token=" + expected_token, _, _)).WillOnce(DoAll( @@ -373,7 +391,6 @@ TEST_F(ConsumerImplTests, GetMessageReturnsUnsupportedClient) { ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kUnsupportedClient)); } - TEST_F(ConsumerImplTests, GetMessageReturnsIfBrokerUriEmpty) { EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/v0.1/asapo-broker"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll( @@ -603,7 +620,6 @@ TEST_F(ConsumerImplTests, GetMessageCallsRetriesReadFromFile) { consumer->GetNext(expected_group_id, &info, &data, expected_stream); } - TEST_F(ConsumerImplTests, GenerateNewGroupIdReturnsErrorCreateGroup) { MockGetBrokerUri(); @@ -654,14 +670,15 @@ TEST_F(ConsumerImplTests, ResetCounterUsesCorrectUri) { MockGetBrokerUri(); consumer->SetTimeout(100); - EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/" + - expected_group_id_encoded + - "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll( - SetArgPointee<3>(HttpCode::OK), - SetArgPointee<4>(nullptr), - Return(""))); + EXPECT_CALL(mock_http_client, + Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/" + + expected_group_id_encoded + + "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll( + SetArgPointee<3>(HttpCode::OK), + SetArgPointee<4>(nullptr), + Return(""))); auto err = consumer->SetLastReadMarker(expected_group_id, 10, expected_stream); ASSERT_THAT(err, Eq(nullptr)); } @@ -670,13 +687,14 @@ TEST_F(ConsumerImplTests, GetCurrentSizeUsesCorrectUri) { MockGetBrokerUri(); consumer->SetTimeout(100); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/size?token=" - + expected_token, _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return("{\"size\":10}"))); + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/size?token=" + + expected_token, _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return("{\"size\":10}"))); asapo::Error err; auto size = consumer->GetCurrentSize(expected_stream, &err); ASSERT_THAT(err, Eq(nullptr)); @@ -950,12 +968,11 @@ TEST_F(ConsumerImplTests, GetNextDatasetUsesCorrectUri) { } TEST_F(ConsumerImplTests, GetNextErrorOnEmptyStream) { - MessageData data; + MessageData data; auto err = consumer->GetNext(expected_group_id, &info, &data, ""); ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput)); } - TEST_F(ConsumerImplTests, GetDataSetReturnsMessageMetas) { asapo::Error err; MockGetBrokerUri(); @@ -1073,18 +1090,36 @@ TEST_F(ConsumerImplTests, GetDataSetReturnsParseError) { TEST_F(ConsumerImplTests, GetLastDatasetUsesCorrectUri) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/0/last?token=" - + expected_token + "&dataset=true&minsize=1", _, - _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(""))); + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/0/last?token=" + + expected_token + "&dataset=true&minsize=1", _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); asapo::Error err; consumer->GetLastDataset(1, expected_stream, &err); } +TEST_F(ConsumerImplTests, GetLastDatasetInGroupUsesCorrectUri) { + MockGetBrokerUri(); + + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/" + expected_group_id_encoded + "/groupedlast?token=" + + expected_token + "&dataset=true&minsize=1", _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(""))); + asapo::Error err; + consumer->GetLastDataset(expected_group_id, 1, expected_stream, &err); +} + + TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) { MockGetBrokerUri(); @@ -1103,14 +1138,15 @@ TEST_F(ConsumerImplTests, GetDatasetByIdUsesCorrectUri) { TEST_F(ConsumerImplTests, DeleteStreamUsesCorrectUri) { MockGetBrokerUri(); std::string expected_delete_stream_query_string = "{\"ErrorOnNotExist\":true,\"DeleteMeta\":true}"; - EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + expected_stream_encoded + "/delete" - + "?token=" + expected_token, _, - expected_delete_stream_query_string, _, _)).WillOnce(DoAll( - SetArgPointee<3>(HttpCode::OK), - SetArgPointee<4>(nullptr), - Return("") - )); + EXPECT_CALL(mock_http_client, + Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + expected_stream_encoded + "/delete" + + "?token=" + expected_token, _, + expected_delete_stream_query_string, _, _)).WillOnce(DoAll( + SetArgPointee<3>(HttpCode::OK), + SetArgPointee<4>(nullptr), + Return("") + )); asapo::DeleteStreamOptions opt; opt.delete_meta = true; @@ -1291,15 +1327,16 @@ TEST_F(ConsumerImplTests, GetMessageTriesToGetTokenAgainIfTransferFailed) { TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) { MockGetBrokerUri(); auto expected_acknowledge_command = "{\"Op\":\"ackmessage\"}"; - EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/" + - expected_group_id_encoded - + "/" + std::to_string(expected_dataset_id) + "?token=" - + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll( - SetArgPointee<3>(HttpCode::OK), - SetArgPointee<4>(nullptr), - Return(""))); + EXPECT_CALL(mock_http_client, + Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/" + + expected_group_id_encoded + + "/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll( + SetArgPointee<3>(HttpCode::OK), + SetArgPointee<4>(nullptr), + Return(""))); auto err = consumer->Acknowledge(expected_group_id, expected_dataset_id, expected_stream); @@ -1308,13 +1345,16 @@ TEST_F(ConsumerImplTests, AcknowledgeUsesCorrectUri) { void ConsumerImplTests::ExpectIdList(bool error) { MockGetBrokerUri(); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/" + - expected_group_id_encoded + "/nacks?token=" + expected_token + "&from=1&to=0", _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(error ? "" : "{\"unacknowledged\":[1,2,3]}"))); + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/" + + expected_group_id_encoded + "/nacks?token=" + expected_token + "&from=1&to=0", + _, + _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(error ? "" : "{\"unacknowledged\":[1,2,3]}"))); } TEST_F(ConsumerImplTests, GetUnAcknowledgedListReturnsIds) { @@ -1327,13 +1367,14 @@ TEST_F(ConsumerImplTests, GetUnAcknowledgedListReturnsIds) { } void ConsumerImplTests::ExpectLastAckId(bool empty_response) { - EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/" + - expected_group_id_encoded + "/lastack?token=" + expected_token, _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return(empty_response ? "{\"lastAckId\":0}" : "{\"lastAckId\":1}"))); + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/" + + expected_group_id_encoded + "/lastack?token=" + expected_token, _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return(empty_response ? "{\"lastAckId\":0}" : "{\"lastAckId\":1}"))); } TEST_F(ConsumerImplTests, GetLastAcknowledgeUsesOk) { @@ -1382,16 +1423,17 @@ TEST_F(ConsumerImplTests, ResendNacks) { TEST_F(ConsumerImplTests, NegativeAcknowledgeUsesCorrectUri) { MockGetBrokerUri(); auto expected_neg_acknowledge_command = R"({"Op":"negackmessage","Params":{"DelayMs":10000}})"; - EXPECT_CALL(mock_http_client, Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/" + - expected_group_id_encoded - + "/" + std::to_string(expected_dataset_id) + "?token=" - + expected_token, _, expected_neg_acknowledge_command, _, _)).WillOnce( - DoAll( - SetArgPointee<3>(HttpCode::OK), - SetArgPointee<4>(nullptr), - Return(""))); + EXPECT_CALL(mock_http_client, + Post_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/" + + expected_group_id_encoded + + "/" + std::to_string(expected_dataset_id) + "?token=" + + expected_token, _, expected_neg_acknowledge_command, _, _)).WillOnce( + DoAll( + SetArgPointee<3>(HttpCode::OK), + SetArgPointee<4>(nullptr), + Return(""))); auto err = consumer->NegativeAcknowledge(expected_group_id, expected_dataset_id, 10000, expected_stream); @@ -1424,25 +1466,24 @@ TEST_F(ConsumerImplTests, CanInterruptOperation) { } - TEST_F(ConsumerImplTests, GetCurrentDataSetCounteUsesCorrectUri) { MockGetBrokerUri(); consumer->SetTimeout(100); - EXPECT_CALL(mock_http_client, Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" - + - expected_stream_encoded + "/size?token=" - + expected_token + "&incomplete=true", _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::OK), - SetArgPointee<2>(nullptr), - Return("{\"size\":10}"))); + EXPECT_CALL(mock_http_client, + Get_t(expected_broker_api + "/beamtime/beamtime_id/" + expected_data_source_encoded + "/" + + + expected_stream_encoded + "/size?token=" + + expected_token + "&incomplete=true", _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::OK), + SetArgPointee<2>(nullptr), + Return("{\"size\":10}"))); asapo::Error err; auto size = consumer->GetCurrentDatasetCount(expected_stream, true, &err); ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(size, Eq(10)); } - TEST_F(ConsumerImplTests, GetVersionInfoClientOnly) { std::string client_info; auto err = consumer->GetVersionInfo(&client_info, nullptr, nullptr); @@ -1470,5 +1511,4 @@ TEST_F(ConsumerImplTests, GetVersionInfoWithServer) { ASSERT_THAT(server_info, HasSubstr("v0.2")); } - } diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 1347d886237eb27133ff20d87f2a11d3e1e7a54f..f9b808d0ea360edcfa2fa40bd857f0c048d81edf 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -71,6 +71,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: NetworkConnectionType CurrentConnectionType() Error GetNext(string group_id, MessageMeta* info, MessageData* data,string stream) Error GetLast(MessageMeta* info, MessageData* data, string stream) + Error GetLast(string group_id, MessageMeta* info, MessageData* data, string stream) Error GetById(uint64_t id, MessageMeta* info, MessageData* data, string stream) uint64_t GetCurrentSize(string stream, Error* err) uint64_t GetCurrentDatasetCount(string stream, bool include_incomplete, Error* err) @@ -86,6 +87,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: MessageMetas QueryMessages(string query, string stream, Error* err) DataSet GetNextDataset(string group_id, uint64_t min_size, string stream, Error* err) DataSet GetLastDataset(uint64_t min_size, string stream, Error* err) + DataSet GetLastDataset(string group_id, uint64_t min_size, string stream, Error* err) DataSet GetDatasetById(uint64_t id, uint64_t min_size, string stream, Error* err) Error RetrieveData(MessageMeta* info, MessageData* data) vector[StreamInfo] GetStreamList(string from_stream, StreamFilter filter, Error* err) diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index f4232d7c74a914ee65d19a50f82d9782ac7fc977..4c7c8eeee53f32053ea5160f8c964e0c36908927 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -130,9 +130,12 @@ cdef class PyConsumer: if op == "next": with nogil: err = self.c_consumer.get().GetNext(b_group_id, &info, p_data,b_stream) - elif op == "last": + elif op == "last" and group_id == "": with nogil: err = self.c_consumer.get().GetLast(&info, p_data, b_stream) + elif op == "last": + with nogil: + err = self.c_consumer.get().GetLast(b_group_id,&info, p_data, b_stream) elif op == "id": with nogil: err = self.c_consumer.get().GetById(id, &info, p_data, b_stream) @@ -149,8 +152,8 @@ cdef class PyConsumer: return arr,meta def get_next(self, group_id, meta_only = True, stream = "default"): return self._op("next",group_id,stream,meta_only,0) - def get_last(self, meta_only = True, stream = "default"): - return self._op("last","",stream,meta_only,0) + def get_last(self, meta_only = True, stream = "default", ingroup = ""): + return self._op("last",ingroup,stream,meta_only,0) def get_by_id(self,uint64_t id,meta_only = True, stream = "default"): return self._op("id","",stream,meta_only,id) def retrieve_data(self,meta): @@ -349,9 +352,12 @@ cdef class PyConsumer: if op == "next": with nogil: dataset = self.c_consumer.get().GetNextDataset(b_group_id, min_size,b_stream, &err) - elif op == "last": + elif op == "last" and group_id == "": with nogil: dataset = self.c_consumer.get().GetLastDataset(min_size,b_stream, &err) + elif op == "last": + with nogil: + dataset = self.c_consumer.get().GetLastDataset(b_group_id, min_size,b_stream, &err) elif op == "id": with nogil: dataset = self.c_consumer.get().GetDatasetById(id, min_size,b_stream, &err) @@ -364,8 +370,8 @@ cdef class PyConsumer: return res def get_next_dataset(self, group_id, min_size = 0, stream = "default"): return self._op_dataset("next",group_id,stream,min_size,0) - def get_last_dataset(self, min_size = 0, stream = "default"): - return self._op_dataset("last","0",stream,min_size,0) + def get_last_dataset(self, min_size = 0, stream = "default", ingroup = ""): + return self._op_dataset("last",ingroup,stream,min_size,0) def get_dataset_by_id(self, uint64_t id, min_size = 0, stream = "default"): return self._op_dataset("id","0",stream,min_size,id) def get_beamtime_meta(self): diff --git a/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go b/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go index 8c91240a3c47dfa0c0c492e08e15645632f10150..ec0a5e03ad4835f25cf891f25ae66cc53f58c844 100644 --- a/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go +++ b/discovery/src/asapo_discovery/protocols/hard_coded_consumer.go @@ -12,6 +12,14 @@ func getTimefromDate(date string) time.Time{ func GetSupportedConsumerProtocols() []Protocol { return []Protocol{ + Protocol{"v0.5", + map[string]string{ + "Discovery": "v0.1", + "Authorizer": "v0.2", + "Broker": "v0.5", + "File Transfer": "v0.2", + "Data cache service": "v0.1", + }, &protocolValidatorCurrent{}}, Protocol{"v0.4", map[string]string{ "Discovery": "v0.1", @@ -19,7 +27,7 @@ func GetSupportedConsumerProtocols() []Protocol { "Broker": "v0.4", "File Transfer": "v0.2", "Data cache service": "v0.1", - }, &protocolValidatorCurrent{}}, + }, &protocolValidatorDeprecated{getTimefromDate("2022-12-01")}}, Protocol{"v0.3", map[string]string{ "Discovery": "v0.1", diff --git a/discovery/src/asapo_discovery/protocols/protocol_test.go b/discovery/src/asapo_discovery/protocols/protocol_test.go index fbf1bc205d66e7a0241b83f75dfcda8134046171..f7933b7a848af1257f9ae8c0073c8047093fdb2e 100644 --- a/discovery/src/asapo_discovery/protocols/protocol_test.go +++ b/discovery/src/asapo_discovery/protocols/protocol_test.go @@ -15,7 +15,8 @@ type protocolTest struct { var protocolTests = []protocolTest{ // consumer - {"consumer", "v0.4", true, "current", "v0.4"}, + {"consumer", "v0.5", true, "current", "v0.5"}, + {"consumer", "v0.4", true, "deprecates", "v0.4"}, {"consumer", "v0.3", true, "deprecates", "v0.3"}, {"consumer", "v0.2", true, "deprecates", "v0.2"}, {"consumer", "v0.1", true, "deprecates", "v0.1"}, diff --git a/docs/site/examples/cpp/acknowledgements.cpp b/docs/site/examples/cpp/acknowledgements.cpp index 1f63d3f10c2780e1a4b942d3d33cbbb3cb453df7..d3613147dd2db1faf8ab03f34d5e0ad2ddf8935a 100644 --- a/docs/site/examples/cpp/acknowledgements.cpp +++ b/docs/site/examples/cpp/acknowledgements.cpp @@ -124,5 +124,5 @@ int main(int argc, char* argv[]) { std::cout << "file name: " << mm.name << std::endl; } - return EXIT_SUCCESS; + return EXIT_SUCCESS; } diff --git a/docs/site/examples/cpp/consume.cpp b/docs/site/examples/cpp/consume.cpp index a802034c998ea78b11a2829ff7e3d21372bba7b1..aa55634693e10446dbe03f6313cec75215836272 100644 --- a/docs/site/examples/cpp/consume.cpp +++ b/docs/site/examples/cpp/consume.cpp @@ -14,7 +14,7 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; auto beamtime = "asapo_test"; - + // test token. In production it is created during the start of the beamtime auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" @@ -26,28 +26,26 @@ int main(int argc, char* argv[]) { //set it according to your configuration. auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; - auto credentials = asapo::SourceCredentials - { - asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS - beamtime, // the folder should exist - "", // can be empty or "auto", if beamtime_id is given - "test_source", // source - token // athorization token - }; + auto credentials = asapo::SourceCredentials { + asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS + beamtime, // the folder should exist + "", // can be empty or "auto", if beamtime_id is given + "test_source", // source + token // athorization token + }; auto consumer = asapo::ConsumerFactory::CreateConsumer - (endpoint, - path_to_files, - true, // True if the path_to_files is accessible locally, False otherwise - credentials, // same as for producer - &err); + (endpoint, + path_to_files, + true, // True if the path_to_files is accessible locally, False otherwise + credentials, // same as for producer + &err); exit_if_error("Cannot create consumer", err); consumer->SetTimeout(5000); // How long do you want to wait on non-finished stream for a message. // you can get info about the streams in the beamtime - for (const auto& stream : consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err)) - { + for (const auto& stream : consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err)) { std::cout << "Stream name: " << stream.name << std::endl; std::cout << "LastId: " << stream.last_id << std::endl; std::cout << "Stream finished: " << stream.finished << std::endl; @@ -60,24 +58,24 @@ int main(int argc, char* argv[]) { asapo::MessageMeta mm; asapo::MessageData data; - + do { // GetNext is the main function to get messages from streams. You would normally call it in loop. // you can either manually compare the mm.id to the stream.last_id, or wait for the error to happen err = consumer->GetNext(group_id, &mm, &data, "default"); - + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { // all the messages in the stream were processed std::cout << "stream finished" << std::endl; break; } - + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { // not-finished stream timeout, or wrong or empty stream std::cout << "stream ended" << std::endl; break; } - + exit_if_error("Cannot get next record", err); std::cout << "id: " << mm.id << std::endl; diff --git a/docs/site/examples/cpp/consume_dataset.cpp b/docs/site/examples/cpp/consume_dataset.cpp index a5f8be96630de3d2713b863972dd9b33b187470e..f869b9cd121231d4ad8ff3dd28c0ce5867242ab1 100644 --- a/docs/site/examples/cpp/consume_dataset.cpp +++ b/docs/site/examples/cpp/consume_dataset.cpp @@ -14,7 +14,7 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; auto beamtime = "asapo_test"; - + auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG" @@ -35,15 +35,15 @@ int main(int argc, char* argv[]) { asapo::DataSet ds; asapo::MessageData data; - + do { ds = consumer->GetNextDataset(group_id, 0, "default", &err); - + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { std::cout << "stream finished" << std::endl; break; } - + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { std::cout << "stream ended" << std::endl; break; @@ -51,9 +51,8 @@ int main(int argc, char* argv[]) { exit_if_error("Cannot get next record", err); std::cout << "Dataset Id: " << ds.id << std::endl; - - for(int i = 0; i < ds.content.size(); i++) - { + + for(int i = 0; i < ds.content.size(); i++) { err = consumer->RetrieveData(&ds.content[i], &data); exit_if_error("Cannot get dataset content", err); diff --git a/docs/site/examples/cpp/metadata.cpp b/docs/site/examples/cpp/metadata.cpp index b64351fbe7433d8b6946cf52f75a2d25e7d5ced5..2a9e5dda2e07c42852dd8fb41fc7b24fe3550d11 100644 --- a/docs/site/examples/cpp/metadata.cpp +++ b/docs/site/examples/cpp/metadata.cpp @@ -45,15 +45,15 @@ int main(int argc, char* argv[]) { // sample beamtime metadata. You can add any data you want, with any level of complexity // in this example we use strings and ints, and one nested structure auto beamtime_metadata = "{" - " \"name\": \"beamtime name\"," - " \"condition\": \"beamtime condition\"," - " \"intvalue1\": 5," - " \"intvalue2\": 10," - " \"structure\": {" - " \"structint1\": 20," - " \"structint2\": 30" - " }" - "}"; + " \"name\": \"beamtime name\"," + " \"condition\": \"beamtime condition\"," + " \"intvalue1\": 5," + " \"intvalue2\": 10," + " \"structure\": {" + " \"structint1\": 20," + " \"structint2\": 30" + " }" + "}"; // send the metadata // with this call the new metadata will completely replace the one that's already there @@ -62,9 +62,9 @@ int main(int argc, char* argv[]) { // we can update the existing metadata if we want, by modifying the existing fields, or adding new ones auto beamtime_metadata_update = "{" - " \"condition\": \"updated beamtime condition\"," - " \"newintvalue\": 15" - "}"; + " \"condition\": \"updated beamtime condition\"," + " \"newintvalue\": 15" + "}"; // send the metadata in the 'kUpdate' mode err = producer->SendBeamtimeMetadata(beamtime_metadata_update, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, &ProcessAfterSend); @@ -72,22 +72,23 @@ int main(int argc, char* argv[]) { // sample stream metadata auto stream_metadata = "{" - " \"name\": \"stream name\"," - " \"condition\": \"stream condition\"," - " \"intvalue\": 44" - "}"; + " \"name\": \"stream name\"," + " \"condition\": \"stream condition\"," + " \"intvalue\": 44" + "}"; // works the same way: for the initial set we use 'kReplace' the stream metadata, but update is also possible // update works exactly the same as for beamtime, but here we will only do 'kReplace' - err = producer->SendStreamMetadata(stream_metadata, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, "default", &ProcessAfterSend); + err = producer->SendStreamMetadata(stream_metadata, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, + "default", &ProcessAfterSend); exit_if_error("Cannot send metadata", err); // sample message metadata auto message_metadata = "{" - " \"name\": \"message name\"," - " \"condition\": \"message condition\"," - " \"somevalue\": 55" - "}"; + " \"name\": \"message name\"," + " \"condition\": \"message condition\"," + " \"somevalue\": 55" + "}"; std::string data_string = "hello"; auto send_size = data_string.size() + 1; diff --git a/docs/site/examples/cpp/next_stream.cpp b/docs/site/examples/cpp/next_stream.cpp index 85ae4602f3694b6c4eaa68914f16d366910fb40b..964689eec3a88a37285b833224dd8209b3bcbfb8 100644 --- a/docs/site/examples/cpp/next_stream.cpp +++ b/docs/site/examples/cpp/next_stream.cpp @@ -93,7 +93,9 @@ int main(int argc, char* argv[]) { // when the stream finishes, we look for the info on the next stream auto streams = consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err); // first, we find the stream with our name in the list of streams - auto stream = std::find_if(streams.begin(), streams.end(), [&stream_name](const asapo::StreamInfo& s) { return s.name == stream_name; }); + auto stream = std::find_if(streams.begin(), streams.end(), [&stream_name](const asapo::StreamInfo & s) { + return s.name == stream_name; + }); // then we look if the field 'nextStream' is set and not empty if (stream != streams.end() && !stream->next_stream.empty()) { @@ -117,5 +119,5 @@ int main(int argc, char* argv[]) { std::cout << "Message #" << mm.id << ", message content: " << reinterpret_cast<char const*>(data.get()) << std::endl; } while (1); - return EXIT_SUCCESS; + return EXIT_SUCCESS; } diff --git a/docs/site/examples/cpp/pipeline.cpp b/docs/site/examples/cpp/pipeline.cpp index e399e6fe5dd62463f3eaf0e818067644ef4e528f..d1022cd8c1f7bf20afd1bbc846b7fc079e595286 100644 --- a/docs/site/examples/cpp/pipeline.cpp +++ b/docs/site/examples/cpp/pipeline.cpp @@ -51,42 +51,43 @@ int main(int argc, char* argv[]) { asapo::MessageMeta mm; asapo::MessageData data; - + do { // we expect the message to be in the 'default' stream already err = consumer->GetNext(group_id, &mm, &data, "default"); - + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { std::cout << "stream finished" << std::endl; break; } - + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { std::cout << "stream ended" << std::endl; break; } exit_if_error("Cannot get next record", err); - + // work on our data auto processed_string = std::string(reinterpret_cast<char const*>(data.get())) + " processed"; auto send_size = processed_string.size() + 1; auto buffer = asapo::MessageData(new uint8_t[send_size]); memcpy(buffer.get(), processed_string.c_str(), send_size); - + // you may use the same filename, if you want to rewrite the source file. This will result in warning, but it is a valid usecase asapo::MessageHeader message_header{mm.id, send_size, std::string("processed/test_file_") + std::to_string(mm.id)}; - err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, pipelined_stream_name, &ProcessAfterSend); + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, pipelined_stream_name, + &ProcessAfterSend); exit_if_error("Cannot send message", err); } while (1); - err = producer->WaitRequestsFinished(2000); + err = producer->WaitRequestsFinished(2000); exit_if_error("Producer exit on timeout", err); // the meta from the last iteration corresponds to the last message auto last_id = mm.id; - - err = producer->SendStreamFinishedFlag("pipelined",last_id, "", &ProcessAfterSend); + + err = producer->SendStreamFinishedFlag("pipelined", last_id, "", &ProcessAfterSend); exit_if_error("Cannot finish stream", err); // you can remove the source stream if you do not need it anymore diff --git a/docs/site/examples/cpp/produce.cpp b/docs/site/examples/cpp/produce.cpp index 9faa61106eff6e55c502df383854e9275b1fe111..aee2a9b74db8f5ae583b331f593032cceafc3c68 100644 --- a/docs/site/examples/cpp/produce.cpp +++ b/docs/site/examples/cpp/produce.cpp @@ -29,14 +29,13 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; auto beamtime = "asapo_test"; - auto credentials = asapo::SourceCredentials - { - asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS - beamtime, // the folder should exist - "", // can be empty or "auto", if beamtime_id is given - "test_source", // source - "" // athorization token - }; + auto credentials = asapo::SourceCredentials { + asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS + beamtime, // the folder should exist + "", // can be empty or "auto", if beamtime_id is given + "test_source", // source + "" // athorization token + }; auto producer = asapo::Producer::Create(endpoint, 1, // number of threads. Increase, if the sending speed seems slow @@ -63,7 +62,7 @@ int main(int argc, char* argv[]) { // add the following at the end of the script err = producer->WaitRequestsFinished(2000); // will synchronously wait for all the data to be sent. - // Use it when no more data is expected. + // Use it when no more data is expected. exit_if_error("Producer exit on timeout", err); // you may want to mark the stream as finished diff --git a/docs/site/examples/cpp/produce_dataset.cpp b/docs/site/examples/cpp/produce_dataset.cpp index e2ea32e77bd291cafeae3ed1d9febdbed9f57ba4..aaefeed40380abe86134488700ae632f4cf09f44 100644 --- a/docs/site/examples/cpp/produce_dataset.cpp +++ b/docs/site/examples/cpp/produce_dataset.cpp @@ -27,7 +27,7 @@ int main(int argc, char* argv[]) { auto beamtime = "asapo_test"; auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", ""}; - + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, credentials, 60000, &err); exit_if_error("Cannot start producer", err); diff --git a/docs/site/examples/cpp/query.cpp b/docs/site/examples/cpp/query.cpp index 945b5211a1f0a3c41603617f78b25f7d8aa118fc..243e8bd36b062f04307e83d34b370e590fac1f36 100644 --- a/docs/site/examples/cpp/query.cpp +++ b/docs/site/examples/cpp/query.cpp @@ -61,9 +61,9 @@ int main(int argc, char* argv[]) { // let's start with producing some messages with metadata for (uint64_t i = 1; i <= 10; i++) { auto message_metadata = "{" - " \"condition\": \"condition #" + std::to_string(i) + "\"," - " \"somevalue\": " + std::to_string(i * 10) + - "}"; + " \"condition\": \"condition #" + std::to_string(i) + "\"," + " \"somevalue\": " + std::to_string(i * 10) + + "}"; std::string to_send = "message#" + std::to_string(i); auto send_size = to_send.size() + 1; @@ -102,10 +102,13 @@ int main(int argc, char* argv[]) { std::cout << "Message with 30 < somevalue < 60" << std::endl; PrintMessages(metadatas, consumer); - auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); - auto fifteen_minutes_ago = std::chrono::duration_cast<std::chrono::nanoseconds>((std::chrono::system_clock::now() - std::chrono::minutes(15)).time_since_epoch()).count(); + auto now = std::chrono::duration_cast<std::chrono::nanoseconds> + (std::chrono::system_clock::now().time_since_epoch()).count(); + auto fifteen_minutes_ago = std::chrono::duration_cast<std::chrono::nanoseconds>((std::chrono::system_clock::now() - + std::chrono::minutes(15)).time_since_epoch()).count(); std::cout << now << " " << fifteen_minutes_ago << std::endl; - metadatas = consumer->QueryMessages("timestamp < " + std::to_string(now) + " AND timestamp > " + std::to_string(fifteen_minutes_ago), "default", &err); + metadatas = consumer->QueryMessages("timestamp < " + std::to_string(now) + " AND timestamp > " + std::to_string( + fifteen_minutes_ago), "default", &err); exit_if_error("Cannot query messages", err); std::cout << "Messages in the last 15 minutes" << std::endl; PrintMessages(metadatas, consumer); diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/acknowledgements.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/acknowledgements.cpp index 1f63d3f10c2780e1a4b942d3d33cbbb3cb453df7..d3613147dd2db1faf8ab03f34d5e0ad2ddf8935a 100644 --- a/docs/site/versioned_examples/version-21.09.0/cpp/acknowledgements.cpp +++ b/docs/site/versioned_examples/version-21.09.0/cpp/acknowledgements.cpp @@ -124,5 +124,5 @@ int main(int argc, char* argv[]) { std::cout << "file name: " << mm.name << std::endl; } - return EXIT_SUCCESS; + return EXIT_SUCCESS; } diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/consume.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/consume.cpp index a802034c998ea78b11a2829ff7e3d21372bba7b1..aa55634693e10446dbe03f6313cec75215836272 100644 --- a/docs/site/versioned_examples/version-21.09.0/cpp/consume.cpp +++ b/docs/site/versioned_examples/version-21.09.0/cpp/consume.cpp @@ -14,7 +14,7 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; auto beamtime = "asapo_test"; - + // test token. In production it is created during the start of the beamtime auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" @@ -26,28 +26,26 @@ int main(int argc, char* argv[]) { //set it according to your configuration. auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; - auto credentials = asapo::SourceCredentials - { - asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS - beamtime, // the folder should exist - "", // can be empty or "auto", if beamtime_id is given - "test_source", // source - token // athorization token - }; + auto credentials = asapo::SourceCredentials { + asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS + beamtime, // the folder should exist + "", // can be empty or "auto", if beamtime_id is given + "test_source", // source + token // athorization token + }; auto consumer = asapo::ConsumerFactory::CreateConsumer - (endpoint, - path_to_files, - true, // True if the path_to_files is accessible locally, False otherwise - credentials, // same as for producer - &err); + (endpoint, + path_to_files, + true, // True if the path_to_files is accessible locally, False otherwise + credentials, // same as for producer + &err); exit_if_error("Cannot create consumer", err); consumer->SetTimeout(5000); // How long do you want to wait on non-finished stream for a message. // you can get info about the streams in the beamtime - for (const auto& stream : consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err)) - { + for (const auto& stream : consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err)) { std::cout << "Stream name: " << stream.name << std::endl; std::cout << "LastId: " << stream.last_id << std::endl; std::cout << "Stream finished: " << stream.finished << std::endl; @@ -60,24 +58,24 @@ int main(int argc, char* argv[]) { asapo::MessageMeta mm; asapo::MessageData data; - + do { // GetNext is the main function to get messages from streams. You would normally call it in loop. // you can either manually compare the mm.id to the stream.last_id, or wait for the error to happen err = consumer->GetNext(group_id, &mm, &data, "default"); - + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { // all the messages in the stream were processed std::cout << "stream finished" << std::endl; break; } - + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { // not-finished stream timeout, or wrong or empty stream std::cout << "stream ended" << std::endl; break; } - + exit_if_error("Cannot get next record", err); std::cout << "id: " << mm.id << std::endl; diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/consume_dataset.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/consume_dataset.cpp index a5f8be96630de3d2713b863972dd9b33b187470e..f869b9cd121231d4ad8ff3dd28c0ce5867242ab1 100644 --- a/docs/site/versioned_examples/version-21.09.0/cpp/consume_dataset.cpp +++ b/docs/site/versioned_examples/version-21.09.0/cpp/consume_dataset.cpp @@ -14,7 +14,7 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; auto beamtime = "asapo_test"; - + auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJl" "eHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmN" "DNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdG" @@ -35,15 +35,15 @@ int main(int argc, char* argv[]) { asapo::DataSet ds; asapo::MessageData data; - + do { ds = consumer->GetNextDataset(group_id, 0, "default", &err); - + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { std::cout << "stream finished" << std::endl; break; } - + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { std::cout << "stream ended" << std::endl; break; @@ -51,9 +51,8 @@ int main(int argc, char* argv[]) { exit_if_error("Cannot get next record", err); std::cout << "Dataset Id: " << ds.id << std::endl; - - for(int i = 0; i < ds.content.size(); i++) - { + + for(int i = 0; i < ds.content.size(); i++) { err = consumer->RetrieveData(&ds.content[i], &data); exit_if_error("Cannot get dataset content", err); diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/metadata.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/metadata.cpp index b64351fbe7433d8b6946cf52f75a2d25e7d5ced5..2a9e5dda2e07c42852dd8fb41fc7b24fe3550d11 100644 --- a/docs/site/versioned_examples/version-21.09.0/cpp/metadata.cpp +++ b/docs/site/versioned_examples/version-21.09.0/cpp/metadata.cpp @@ -45,15 +45,15 @@ int main(int argc, char* argv[]) { // sample beamtime metadata. You can add any data you want, with any level of complexity // in this example we use strings and ints, and one nested structure auto beamtime_metadata = "{" - " \"name\": \"beamtime name\"," - " \"condition\": \"beamtime condition\"," - " \"intvalue1\": 5," - " \"intvalue2\": 10," - " \"structure\": {" - " \"structint1\": 20," - " \"structint2\": 30" - " }" - "}"; + " \"name\": \"beamtime name\"," + " \"condition\": \"beamtime condition\"," + " \"intvalue1\": 5," + " \"intvalue2\": 10," + " \"structure\": {" + " \"structint1\": 20," + " \"structint2\": 30" + " }" + "}"; // send the metadata // with this call the new metadata will completely replace the one that's already there @@ -62,9 +62,9 @@ int main(int argc, char* argv[]) { // we can update the existing metadata if we want, by modifying the existing fields, or adding new ones auto beamtime_metadata_update = "{" - " \"condition\": \"updated beamtime condition\"," - " \"newintvalue\": 15" - "}"; + " \"condition\": \"updated beamtime condition\"," + " \"newintvalue\": 15" + "}"; // send the metadata in the 'kUpdate' mode err = producer->SendBeamtimeMetadata(beamtime_metadata_update, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, &ProcessAfterSend); @@ -72,22 +72,23 @@ int main(int argc, char* argv[]) { // sample stream metadata auto stream_metadata = "{" - " \"name\": \"stream name\"," - " \"condition\": \"stream condition\"," - " \"intvalue\": 44" - "}"; + " \"name\": \"stream name\"," + " \"condition\": \"stream condition\"," + " \"intvalue\": 44" + "}"; // works the same way: for the initial set we use 'kReplace' the stream metadata, but update is also possible // update works exactly the same as for beamtime, but here we will only do 'kReplace' - err = producer->SendStreamMetadata(stream_metadata, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, "default", &ProcessAfterSend); + err = producer->SendStreamMetadata(stream_metadata, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, true}, + "default", &ProcessAfterSend); exit_if_error("Cannot send metadata", err); // sample message metadata auto message_metadata = "{" - " \"name\": \"message name\"," - " \"condition\": \"message condition\"," - " \"somevalue\": 55" - "}"; + " \"name\": \"message name\"," + " \"condition\": \"message condition\"," + " \"somevalue\": 55" + "}"; std::string data_string = "hello"; auto send_size = data_string.size() + 1; diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/next_stream.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/next_stream.cpp index 85ae4602f3694b6c4eaa68914f16d366910fb40b..964689eec3a88a37285b833224dd8209b3bcbfb8 100644 --- a/docs/site/versioned_examples/version-21.09.0/cpp/next_stream.cpp +++ b/docs/site/versioned_examples/version-21.09.0/cpp/next_stream.cpp @@ -93,7 +93,9 @@ int main(int argc, char* argv[]) { // when the stream finishes, we look for the info on the next stream auto streams = consumer->GetStreamList("", asapo::StreamFilter::kAllStreams, &err); // first, we find the stream with our name in the list of streams - auto stream = std::find_if(streams.begin(), streams.end(), [&stream_name](const asapo::StreamInfo& s) { return s.name == stream_name; }); + auto stream = std::find_if(streams.begin(), streams.end(), [&stream_name](const asapo::StreamInfo & s) { + return s.name == stream_name; + }); // then we look if the field 'nextStream' is set and not empty if (stream != streams.end() && !stream->next_stream.empty()) { @@ -117,5 +119,5 @@ int main(int argc, char* argv[]) { std::cout << "Message #" << mm.id << ", message content: " << reinterpret_cast<char const*>(data.get()) << std::endl; } while (1); - return EXIT_SUCCESS; + return EXIT_SUCCESS; } diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/pipeline.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/pipeline.cpp index e399e6fe5dd62463f3eaf0e818067644ef4e528f..d1022cd8c1f7bf20afd1bbc846b7fc079e595286 100644 --- a/docs/site/versioned_examples/version-21.09.0/cpp/pipeline.cpp +++ b/docs/site/versioned_examples/version-21.09.0/cpp/pipeline.cpp @@ -51,42 +51,43 @@ int main(int argc, char* argv[]) { asapo::MessageMeta mm; asapo::MessageData data; - + do { // we expect the message to be in the 'default' stream already err = consumer->GetNext(group_id, &mm, &data, "default"); - + if (err && err == asapo::ConsumerErrorTemplates::kStreamFinished) { std::cout << "stream finished" << std::endl; break; } - + if (err && err == asapo::ConsumerErrorTemplates::kEndOfStream) { std::cout << "stream ended" << std::endl; break; } exit_if_error("Cannot get next record", err); - + // work on our data auto processed_string = std::string(reinterpret_cast<char const*>(data.get())) + " processed"; auto send_size = processed_string.size() + 1; auto buffer = asapo::MessageData(new uint8_t[send_size]); memcpy(buffer.get(), processed_string.c_str(), send_size); - + // you may use the same filename, if you want to rewrite the source file. This will result in warning, but it is a valid usecase asapo::MessageHeader message_header{mm.id, send_size, std::string("processed/test_file_") + std::to_string(mm.id)}; - err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, pipelined_stream_name, &ProcessAfterSend); + err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, pipelined_stream_name, + &ProcessAfterSend); exit_if_error("Cannot send message", err); } while (1); - err = producer->WaitRequestsFinished(2000); + err = producer->WaitRequestsFinished(2000); exit_if_error("Producer exit on timeout", err); // the meta from the last iteration corresponds to the last message auto last_id = mm.id; - - err = producer->SendStreamFinishedFlag("pipelined",last_id, "", &ProcessAfterSend); + + err = producer->SendStreamFinishedFlag("pipelined", last_id, "", &ProcessAfterSend); exit_if_error("Cannot finish stream", err); // you can remove the source stream if you do not need it anymore diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/produce.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/produce.cpp index 9faa61106eff6e55c502df383854e9275b1fe111..aee2a9b74db8f5ae583b331f593032cceafc3c68 100644 --- a/docs/site/versioned_examples/version-21.09.0/cpp/produce.cpp +++ b/docs/site/versioned_examples/version-21.09.0/cpp/produce.cpp @@ -29,14 +29,13 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; auto beamtime = "asapo_test"; - auto credentials = asapo::SourceCredentials - { - asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS - beamtime, // the folder should exist - "", // can be empty or "auto", if beamtime_id is given - "test_source", // source - "" // athorization token - }; + auto credentials = asapo::SourceCredentials { + asapo::SourceType::kProcessed, // should be kProcessed or kRaw, kProcessed writes to the core FS + beamtime, // the folder should exist + "", // can be empty or "auto", if beamtime_id is given + "test_source", // source + "" // athorization token + }; auto producer = asapo::Producer::Create(endpoint, 1, // number of threads. Increase, if the sending speed seems slow @@ -63,7 +62,7 @@ int main(int argc, char* argv[]) { // add the following at the end of the script err = producer->WaitRequestsFinished(2000); // will synchronously wait for all the data to be sent. - // Use it when no more data is expected. + // Use it when no more data is expected. exit_if_error("Producer exit on timeout", err); // you may want to mark the stream as finished diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/produce_dataset.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/produce_dataset.cpp index e2ea32e77bd291cafeae3ed1d9febdbed9f57ba4..aaefeed40380abe86134488700ae632f4cf09f44 100644 --- a/docs/site/versioned_examples/version-21.09.0/cpp/produce_dataset.cpp +++ b/docs/site/versioned_examples/version-21.09.0/cpp/produce_dataset.cpp @@ -27,7 +27,7 @@ int main(int argc, char* argv[]) { auto beamtime = "asapo_test"; auto credentials = asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", ""}; - + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, credentials, 60000, &err); exit_if_error("Cannot start producer", err); diff --git a/docs/site/versioned_examples/version-21.09.0/cpp/query.cpp b/docs/site/versioned_examples/version-21.09.0/cpp/query.cpp index 945b5211a1f0a3c41603617f78b25f7d8aa118fc..243e8bd36b062f04307e83d34b370e590fac1f36 100644 --- a/docs/site/versioned_examples/version-21.09.0/cpp/query.cpp +++ b/docs/site/versioned_examples/version-21.09.0/cpp/query.cpp @@ -61,9 +61,9 @@ int main(int argc, char* argv[]) { // let's start with producing some messages with metadata for (uint64_t i = 1; i <= 10; i++) { auto message_metadata = "{" - " \"condition\": \"condition #" + std::to_string(i) + "\"," - " \"somevalue\": " + std::to_string(i * 10) + - "}"; + " \"condition\": \"condition #" + std::to_string(i) + "\"," + " \"somevalue\": " + std::to_string(i * 10) + + "}"; std::string to_send = "message#" + std::to_string(i); auto send_size = to_send.size() + 1; @@ -102,10 +102,13 @@ int main(int argc, char* argv[]) { std::cout << "Message with 30 < somevalue < 60" << std::endl; PrintMessages(metadatas, consumer); - auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); - auto fifteen_minutes_ago = std::chrono::duration_cast<std::chrono::nanoseconds>((std::chrono::system_clock::now() - std::chrono::minutes(15)).time_since_epoch()).count(); + auto now = std::chrono::duration_cast<std::chrono::nanoseconds> + (std::chrono::system_clock::now().time_since_epoch()).count(); + auto fifteen_minutes_ago = std::chrono::duration_cast<std::chrono::nanoseconds>((std::chrono::system_clock::now() - + std::chrono::minutes(15)).time_since_epoch()).count(); std::cout << now << " " << fifteen_minutes_ago << std::endl; - metadatas = consumer->QueryMessages("timestamp < " + std::to_string(now) + " AND timestamp > " + std::to_string(fifteen_minutes_ago), "default", &err); + metadatas = consumer->QueryMessages("timestamp < " + std::to_string(now) + " AND timestamp > " + std::to_string( + fifteen_minutes_ago), "default", &err); exit_if_error("Cannot query messages", err); std::cout << "Messages in the last 15 minutes" << std::endl; PrintMessages(metadatas, consumer); diff --git a/tests/automatic/consumer/consumer_api/consumer_api.c b/tests/automatic/consumer/consumer_api/consumer_api.c index 0ec0990021aa97fa665e1ec7770ed50ad13f687d..d9996dddedfb6461620e0c0a267275efbd311742 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.c +++ b/tests/automatic/consumer/consumer_api/consumer_api.c @@ -29,6 +29,15 @@ void test_datasets(AsapoConsumerHandle consumer, AsapoStringHandle group_id) { asapo_free_handle(&md); asapo_free_handle(&dataset); +// get last in group + dataset = asapo_consumer_get_last_dataset_ingroup(consumer,group_id, 0, "default", &err); + EXIT_IF_ERROR("asapo_consumer_get_last_dataset_ingroup", err); + asapo_free_handle(&dataset); + AsapoDataSetHandle ds_ig = asapo_consumer_get_last_dataset_ingroup(consumer,group_id, 0, "default", &err); + ASSERT_TRUE(ds_ig == NULL,"returns null in case of error"); + ASSERT_TRUE(asapo_error_get_type(err) == kEndOfStream,"asapo_consumer_get_last_dataset_ingroup second time end of stream error"); + + // get by id dataset = asapo_consumer_get_dataset_by_id(consumer, 8,0, "default", &err); EXIT_IF_ERROR("asapo_consumer_get_last_dataset", err); @@ -114,6 +123,12 @@ void test_single(AsapoConsumerHandle consumer, AsapoStringHandle group_id) { ASSERT_EQ_INT(10,asapo_message_meta_get_id(md),"id"); ASSERT_EQ_STRING("10",asapo_message_meta_get_name(md),"id"); +//last in group + asapo_consumer_get_last_ingroup(consumer, group_id, &md, NULL, "default",&err); + EXIT_IF_ERROR("asapo_consumer_get_last_ingroup", err); + asapo_consumer_get_last_ingroup(consumer, group_id, &md, NULL, "default",&err); + ASSERT_TRUE(asapo_error_get_type(err) == kEndOfStream,"asapo_consumer_get_last_ingroup second time end of stream error"); + //id asapo_consumer_get_by_id(consumer,8, &md, NULL, "default",&err); EXIT_IF_ERROR("asapo_consumer_get_by_id", err); diff --git a/tests/automatic/consumer/consumer_api/consumer_api.cpp b/tests/automatic/consumer/consumer_api/consumer_api.cpp index 9107d44ac36aadc981ca3f08537077c464479e37..4e1f99ec0320f10b265715a2c3632bffad4c5f6e 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.cpp +++ b/tests/automatic/consumer/consumer_api/consumer_api.cpp @@ -58,6 +58,13 @@ void TestSingle(const std::unique_ptr<asapo::Consumer>& consumer, const std::str M_AssertTrue(fi.name == "10", "GetLast filename"); M_AssertTrue(fi.metadata == "{\"test\":10}", "GetLast metadata"); + err = consumer->GetLast(group_id, &fi, nullptr, "default"); + M_AssertTrue(err == nullptr, "GetLast inside group no error"); + M_AssertTrue(fi.name == "10", "GetLast inside group filename"); + + err = consumer->GetLast(group_id, &fi, nullptr, "default"); + M_AssertTrue(err == asapo::ConsumerErrorTemplates::kEndOfStream, "GetLast inside group error second time"); + err = consumer->GetNext(group_id, &fi, nullptr, "default"); M_AssertTrue(err == nullptr, "GetNext2 no error"); M_AssertTrue(fi.name == "2", "GetNext2 filename"); @@ -254,6 +261,11 @@ void TestDataset(const std::unique_ptr<asapo::Consumer>& consumer, const std::st M_AssertTrue(dataset.content[0].name == "10_1", "GetLastDataset filename"); M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetLastDataset metadata"); + consumer->GetLastDataset(group_id, 0, "default", &err); + M_AssertTrue(err == nullptr, "GetLastDataset in group no error"); + consumer->GetLastDataset(group_id, 0, "default", &err); + M_AssertTrue(err == asapo::ConsumerErrorTemplates::kEndOfStream, "GetLastDataset in group error second time"); + dataset = consumer->GetNextDataset(group_id, 0, "default", &err); M_AssertTrue(err == nullptr, "GetNextDataset2 no error"); M_AssertTrue(dataset.content[0].name == "2_1", "GetNextDataSet2 filename"); diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py index 18a8dd3814210a20761dd626bf46325519fdb00c..4235be0a2bc3a69df91ba4fcad2a22a202be4181 100644 --- a/tests/automatic/consumer/consumer_api_python/consumer_api.py +++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py @@ -76,6 +76,16 @@ def check_single(consumer, group_id): assert_metaname(meta, "5", "get last1") assert_usermetadata(meta, "get last1") +# get last in group + _, meta = consumer.get_last(meta_only=True,ingroup=group_id) + assert_metaname(meta, "5", "get last in group") + try: + consumer.get_last(meta_only=True,ingroup=group_id) + except asapo_consumer.AsapoEndOfStreamError: + pass + else: + exit_on_noerr("get last in group error second time") + try: consumer.get_by_id(30, meta_only=True) except asapo_consumer.AsapoEndOfStreamError: @@ -326,6 +336,17 @@ def check_dataset(consumer, group_id): assert_eq(res['expected_size'], 3, "get_last_dataset1 size ") assert_metaname(res['content'][2], "10_3", "get get_last_dataset1 name3") +# get last dataset in group + res = consumer.get_last_dataset(ingroup=group_id) + assert_eq(res['id'], 10, "get_last_dataset in group") + try: + consumer.get_last_dataset(ingroup=group_id) + except asapo_consumer.AsapoEndOfStreamError: + pass + else: + exit_on_noerr("get last dataset in group error second time") + + res = consumer.get_next_dataset(group_id) assert_eq(res['id'], 3, "get_next_dataset3")