diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 7a9d2c13ddd66cab74161cca616a3f4dc77c0946..939c7997728b770d1e825870356bd07a47239b4e 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -60,6 +60,11 @@ type LocationPointer struct { Value int `bson:"current_pointer"` } +type PersistedStreamsList struct { + ID int `json:"_id"` + Streams []string `json:"persisted_streams"` +} + const data_collection_name_prefix = "data_" const acks_collection_name_prefix = "acks_" const inprocess_collection_name_prefix = "inprocess_" @@ -71,6 +76,7 @@ const last_message_field_name = "last_message" const no_session_msg = "database client not created" const already_connected_msg = "already connected" +const too_many_persisted_streams = "too many persisted streams" const finish_stream_keyword = "asapo_finish_stream" const no_next_stream_keyword = "asapo_no_next" @@ -969,6 +975,30 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) { return nil, err } +func (db *Mongodb) persistStream(request Request) ([]byte, error) { + if db.client == nil { + return nil, &DBError{utils.StatusServiceUnavailable, no_session_msg} + } + + maxNum, _ := strconv.Atoi(request.ExtraParam) + + c := db.client.Database(request.DbName()).Collection(meta_collection_name) + q := bson.M{"_id": "persist"} + l := PersistedStreamsList{} + err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&l) + if err != nil && err != mongo.ErrNoDocuments { + return nil, err + } + if len(l.Streams) >= maxNum { + return nil, &DBError{utils.StatusWrongInput, too_many_persisted_streams} + } + l.Streams = append(l.Streams, request.Stream) + + _, err = c.InsertOne(context.TODO(), l) + + return nil, err +} + func (db *Mongodb) lastAck(request Request) ([]byte, error) { c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId) opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) @@ -1098,6 +1128,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { return db.lastAck(request) case "delete_stream": return db.deleteStream(request) + case "persist_stream": + return db.persistStream(request) } return nil, errors.New("Wrong db operation: " + request.Op) diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index 3c99b1a8a85f5a6dd4599df35d7e80f143df807a..885bb180077d127f4238adc0f026ba2a24d62b9a 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -29,6 +29,12 @@ var listRoutes = utils.Routes{ "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/delete", routeDeleteStream, }, + utils.Route{ + "PersistStream", + "Post", + "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/persist", + routePersistStream, + }, utils.Route{ "GetLast", "Get", diff --git a/broker/src/asapo_broker/server/post_persist_stream.go b/broker/src/asapo_broker/server/post_persist_stream.go new file mode 100644 index 0000000000000000000000000000000000000000..11bc6f8c974270008f58d0bb84c770f0b3387b84 --- /dev/null +++ b/broker/src/asapo_broker/server/post_persist_stream.go @@ -0,0 +1,11 @@ +package server + +import ( + "net/http" + "strconv" +) + +func routePersistStream(w http.ResponseWriter, r *http.Request) { + maxNum := strconv.Itoa(settings.GetMaxNumPersistedStreams()) + processRequest(w, r, "persist_stream", maxNum, false) +} diff --git a/broker/src/asapo_broker/server/server.go b/broker/src/asapo_broker/server/server.go index 51800b2c592c4e2cfa28d5b5ece809430a83169f..27ab64fb9a9d13b6233e8d28d00be7a1b5411d94 100644 --- a/broker/src/asapo_broker/server/server.go +++ b/broker/src/asapo_broker/server/server.go @@ -10,6 +10,7 @@ import ( const kDefaultresendInterval = 10 const kDefaultStreamCacheUpdateIntervalMs = 100 +const kDefaultMaxNumPersistedStreams = 100 const kDefaultTokenCacheUpdateIntervalMs = 60000 var db database.Agent @@ -20,7 +21,7 @@ type serverSettings struct { PerformanceDbServer string PerformanceDbName string MonitoringServerUrl string - MonitorPerformance bool + MonitorPerformance bool AuthorizationServer string Port int LogLevel string @@ -28,6 +29,7 @@ type serverSettings struct { CheckResendInterval *int StreamCacheUpdateIntervalMs *int TokenCacheUpdateIntervalMs *int + MaxNumPersistedStreams *int } func (s *serverSettings) GetTokenCacheUpdateInterval() int { @@ -59,6 +61,14 @@ func (s *serverSettings) GetDatabaseServer() string { } } +func (s *serverSettings) GetMaxNumPersistedStreams() int { + if s.MaxNumPersistedStreams == nil { + return kDefaultMaxNumPersistedStreams + } else { + return *s.MaxNumPersistedStreams + } +} + var settings serverSettings var statistics serverStatistics var monitoring brokerMonitoring @@ -93,7 +103,7 @@ func InitDB(dbAgent database.Agent) (err error) { } func CreateDiscoveryService() { - discoveryService = discovery.CreateDiscoveryService(&http.Client{},"http://" + settings.DiscoveryServer) + discoveryService = discovery.CreateDiscoveryService(&http.Client{}, "http://"+settings.DiscoveryServer) } func CleanupDB() { diff --git a/common/cpp/include/asapo/common/networking.h b/common/cpp/include/asapo/common/networking.h index d814a88df4db17d0f9181f46ae5cb0ec6f133da3..be3dfc31d6878dd0f6dc7bfecd6b1118819e4cdc 100644 --- a/common/cpp/include/asapo/common/networking.h +++ b/common/cpp/include/asapo/common/networking.h @@ -31,6 +31,7 @@ enum Opcode : uint8_t { kOpcodeDeleteStream, kOpcodeGetMeta, kOpcodeCount, + kOpcodePersistStream }; inline std::string OpcodeToString(uint8_t code) { @@ -44,6 +45,7 @@ inline std::string OpcodeToString(uint8_t code) { case kOpcodeTransferMetaData:return "transfer metadata"; case kOpcodeDeleteStream:return "delete stream"; case kOpcodeGetMeta:return "get meta"; + case kOpcodePersistStream:return "persist stream"; default:return "unknown op"; } } diff --git a/common/cpp/include/asapo/database/database.h b/common/cpp/include/asapo/database/database.h index 61c32e0dcec5a7867d0f6d9c87aff8fb3846657c..b6acc8cdd3cb2a7965c3d208c6b4d29190474e6e 100644 --- a/common/cpp/include/asapo/database/database.h +++ b/common/cpp/include/asapo/database/database.h @@ -28,6 +28,8 @@ class Database { virtual Error GetLastStream(StreamInfo* info) const = 0; virtual Error DeleteStream(const std::string& stream) const = 0; virtual Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const = 0; + virtual Error GetPersistedStreamsNumber(int* res) const = 0; + virtual Error PersistStream(const std::string& stream_name) const = 0; virtual ~Database() = default; }; diff --git a/common/cpp/include/asapo/unittests/MockDatabase.h b/common/cpp/include/asapo/unittests/MockDatabase.h index 887653507910802a51fe56d66037dd4ec99db07e..97e733dec4d0caeaab30d02b7fea92fcb67d4da0 100644 --- a/common/cpp/include/asapo/unittests/MockDatabase.h +++ b/common/cpp/include/asapo/unittests/MockDatabase.h @@ -70,6 +70,18 @@ class MockDatabase : public Database { return Error{DeleteStream_t(stream)}; } + MOCK_METHOD(ErrorInterface *, GetPersistedStreamsNumber_t, (int*), (const)); + + Error GetPersistedStreamsNumber(int* res) const override { + return Error{GetPersistedStreamsNumber_t(res)}; + } + + MOCK_METHOD(ErrorInterface *, PersistStream_t, (const std::string&), (const)); + + Error PersistStream(const std::string& stream_name) const override { + return Error{PersistStream_t(stream_name)}; + } + MOCK_METHOD(ErrorInterface *, GetLastStream_t, (StreamInfo*), (const)); diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index bfb0d8ffbb6cf083740edbe149962206840251b9..c15dda36068bbb29866111bc28855998d9dc061b 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -819,6 +819,7 @@ Error MongoDBClient::DeleteStream(const std::string& stream) const { DeleteDocumentsInCollection("current_location", querystr); } DeleteDocumentsInCollection("meta", "^" + EscapeQuery(stream_encoded) + "$"); + ModifyPersistedStreamsList(stream, false); return err; } @@ -837,4 +838,56 @@ Error MongoDBClient::GetMetaFromDb(const std::string& collection, const std::str return nullptr; } +Error MongoDBClient::GetPersistedStreamsNumber(int* res) const { + UpdateCurrentCollectionIfNeeded("meta"); + //db.meta.aggregate([{$match: {"_id":"persist"}},{$project: {size: {$size: "$persisted_streams"}}}]) + auto pipeline = BCON_NEW("pipeline", "[", + "{", "$match", "{", "_id", "persist", "}", "}", + "{", "$project", "{", "size", "{", "$size", "$persisted_streams", "}", "}", "}", + "]"); + auto cursor = mongoc_collection_aggregate(current_collection_, MONGOC_QUERY_NONE, pipeline, NULL, NULL); + *res = 0; + const bson_t* doc; + bson_iter_t iter; + while (mongoc_cursor_next(cursor, &doc)) { + if (bson_iter_init_find(&iter, doc, "size") && BSON_ITER_HOLDS_INT32 (&iter)) { + *res = bson_iter_int32(&iter); + } + } + + Error err = nullptr; + bson_error_t mongo_err; + if (mongoc_cursor_error(cursor, &mongo_err)) { + err = DBErrorTemplates::kDBError.Generate(mongo_err.message); + } + + mongoc_cursor_destroy(cursor); + bson_destroy(pipeline); + + return err; +} + +Error MongoDBClient::PersistStream(const std::string& stream_name) const { + return ModifyPersistedStreamsList(stream_name, true); +} + +Error MongoDBClient::ModifyPersistedStreamsList(const std::string& stream_name, bool add) const { + UpdateCurrentCollectionIfNeeded("meta"); + //db.meta.updateOne({"_id" : "persist"},{$push : {"persisted_streams" : "default"}},{"upsert": true}) + bson_t* selector = BCON_NEW ("_id", "persist"); + bson_t* update = BCON_NEW (add ? "$addToSet" : "$pull", "{", "persisted_streams", BCON_UTF8(stream_name.c_str()), "}"); + bson_t* opts = BCON_NEW ("upsert", BCON_BOOL(add)); + + bson_error_t mongo_err; + Error err = nullptr; + if (!mongoc_collection_update_one(current_collection_, selector, update, opts, NULL, &mongo_err)) { + err = DBErrorTemplates::kInsertError.Generate(mongo_err.message); + } + bson_destroy(selector); + bson_destroy(update); + bson_destroy(opts); + + return err; +} + } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index 90fbe73a6b4a6bd60ced7236964001557fe86393..d2a0621f9e382c9540f0c1cf3f7dc76986055616 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -59,6 +59,8 @@ class MongoDBClient final : public Database { Error GetLastStream(StreamInfo* info) const override; Error DeleteStream(const std::string& stream) const override; Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const override; + Error GetPersistedStreamsNumber(int* res) const override; + Error PersistStream(const std::string& stream_name) const override; Error GetNextId(const std::string& collection, uint64_t* id) const; ~MongoDBClient() override; private: @@ -86,6 +88,7 @@ class MongoDBClient final : public Database { Error DeleteCollections(const std::string& prefix) const; Error DeleteDocumentsInCollection(const std::string& collection_name, const std::string& querystr) const; Error InsertWithAutoId(const MessageMeta& file, uint64_t* id_inserted) const; + Error ModifyPersistedStreamsList(const std::string& stream_name, bool add) const; }; struct TransactionContext { diff --git a/config/nomad/authorizer.nmd.in b/config/nomad/authorizer.nmd.in index a8c06827f070904beff4543a861089a245eaf968..60fe8253876d0052b151ae550e1b365d015b38dd 100644 --- a/config/nomad/authorizer.nmd.in +++ b/config/nomad/authorizer.nmd.in @@ -6,6 +6,12 @@ job "authorizer" { group "group" { count = 1 + network { + port "authorizer" { + static = "5007" + } + } + task "authorizer" { driver = "raw_exec" @@ -17,11 +23,6 @@ job "authorizer" { resources { cpu = 50 # 50 MHz memory = 256 # 256MB - network { - port "authorizer" { - static = "5007" - } - } } service { diff --git a/config/nomad/broker.nmd.in b/config/nomad/broker.nmd.in index debdc08802983b6b80b84219614f3b908bddb2fc..45fefdecfeea4659c6275f0fce3c24b89d8a09ef 100644 --- a/config/nomad/broker.nmd.in +++ b/config/nomad/broker.nmd.in @@ -6,6 +6,11 @@ job "broker" { group "group" { count = 1 + network { + port "broker" { + } + } + task "broker" { driver = "raw_exec" @@ -17,10 +22,6 @@ job "broker" { resources { cpu = 50 # 50 MHz memory = 256 # 256MB - network { - port "broker" { - } - } } service { diff --git a/config/nomad/file_transfer.nmd.in b/config/nomad/file_transfer.nmd.in index 90f1833fef8b7dc1e74e068ab93839862f2af85b..554eb9d004f86ecb5aef8f479aa29e7bca7d491d 100644 --- a/config/nomad/file_transfer.nmd.in +++ b/config/nomad/file_transfer.nmd.in @@ -6,6 +6,12 @@ job "file_transfer" { group "group" { count = 1 + network { + port "file_transfer" { + static = "5008" + } + } + task "file_transfer" { driver = "raw_exec" @@ -17,11 +23,6 @@ job "file_transfer" { resources { cpu = 50 # 50 MHz memory = 256 # 256MB - network { - port "file_transfer" { - static = "5008" - } - } } service { diff --git a/config/nomad/monitoring.nmd.in b/config/nomad/monitoring.nmd.in index e60d814c32698621bacb23e95aa549d57b552c6a..aa8c52fdf95f01a94ddb4d8d1095d014d7bc1e03 100644 --- a/config/nomad/monitoring.nmd.in +++ b/config/nomad/monitoring.nmd.in @@ -6,6 +6,12 @@ job "monitoring" { group "monitoring" { count = 1 + network { + port "monitoring_server" { + static = "5009" + } + } + task "monitoring-server" { driver = "raw_exec" @@ -17,11 +23,6 @@ job "monitoring" { resources { cpu = 50 # 50 MHz memory = 256 # 256MB - network { - port "monitoring_server" { - static = "5009" - } - } } service { diff --git a/config/nomad/receiver_fabric.nmd.in b/config/nomad/receiver_fabric.nmd.in index a37759b5f33653f858d1a961ccb3e6ea569c4133..058e5ffeadeced14ba280a5dd0a5474a73180760 100644 --- a/config/nomad/receiver_fabric.nmd.in +++ b/config/nomad/receiver_fabric.nmd.in @@ -6,6 +6,12 @@ job "receiver" { group "group" { count = 1 + network { + port "recv" {} + port "recv_ds" {} + port "recv_metrics" {} + } + task "receiver" { driver = "raw_exec" @@ -17,11 +23,6 @@ job "receiver" { resources { cpu = 50 # 50 MHz memory = 256 # 256MB - network { - port "recv" {} - port "recv_ds" {} - port "recv_metrics" {} - } } service { diff --git a/config/nomad/receiver_kafka.nmd.in b/config/nomad/receiver_kafka.nmd.in index 6b1047d39dfdb3be4db58dbb568fdf0f6bda3e02..1d736d213115da10287f89407a82476826462249 100644 --- a/config/nomad/receiver_kafka.nmd.in +++ b/config/nomad/receiver_kafka.nmd.in @@ -10,6 +10,12 @@ job "receiver" { group "group" { count = 1 + network { + port "recv" {} + port "recv_ds" {} + port "recv_metrics" {} + } + task "receiver" { driver = "raw_exec" @@ -21,11 +27,6 @@ job "receiver" { resources { cpu = 50 # 50 MHz memory = 256 # 256MB - network { - port "recv" {} - port "recv_ds" {} - port "recv_metrics" {} - } } service { diff --git a/config/nomad/receiver_tcp.nmd.in b/config/nomad/receiver_tcp.nmd.in index dcf9f9176917df2090e72be2185128b3b76e7bfb..b24b121106017a01e9f25c2fb55f2aeec74b4b8d 100644 --- a/config/nomad/receiver_tcp.nmd.in +++ b/config/nomad/receiver_tcp.nmd.in @@ -6,6 +6,12 @@ job "receiver" { group "group" { count = 1 + network { + port "recv" {} + port "recv_ds" {} + port "recv_metrics" {} + } + task "receiver" { driver = "raw_exec" @@ -17,11 +23,6 @@ job "receiver" { resources { cpu = 50 # 50 MHz memory = 256 # 256MB - network { - port "recv" {} - port "recv_ds" {} - port "recv_metrics" {} - } } service { diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h index 80c1f841bcf17b09888fcd49a6f6208f3af2810e..1c12207cb4afd0bc5dfe8efef4b1ba2a06c4e27c 100644 --- a/consumer/api/c/include/asapo/consumer_c.h +++ b/consumer/api/c/include/asapo/consumer_c.h @@ -116,6 +116,9 @@ int asapo_consumer_delete_stream(AsapoConsumerHandle consumer, const char* stream, AsapoBool delete_meta, AsapoBool error_on_not_exist, AsapoErrorHandle* error); +int asapo_consumer_set_stream_persistent(AsapoConsumerHandle consumer, + const char* stream, + AsapoErrorHandle* error); int64_t asapo_consumer_get_current_size(AsapoConsumerHandle consumer, const char* stream, AsapoErrorHandle* error); diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 2527ca21c2058c32bfec3afed4c64292fe481b11..2715705605dfa8665d7ba98612dce8de528ed073 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -101,6 +101,12 @@ class Consumer { */ virtual Error DeleteStream(std::string stream, DeleteStreamOptions options) = 0; + //! Marks stream persistent + /*! + \param stream - Name of the stream to be marked persistent + \return Error - Will be nullptr on success + */ + virtual Error SetStreamPersistent(std::string stream) = 0 ; //! Get current number of messages in stream /*! diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index 1fb7f6dee01f71bc3ecf89b594d8b45b228c7072..1374b97ace0b1a2233c180d343975366ce399634 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -286,6 +286,18 @@ extern "C" { auto err = consumer->handle->DeleteStream(stream, opt); return process_error(error, std::move(err)); } +//! wraps asapo::Consumer::SetStreamsPersistent() +/// \copydoc asapo::Consumer::SetStreamsPersistent() +/// \param[in] consumer the consumer that is acted upon +/// \param[in] stream the name of the stream to be persisted +/// \param[out] error NULL or pointer to error handle to be set +/// \return 0 if completed successfully, -1 in case of error. +int asapo_consumer_set_stream_persistent(AsapoConsumerHandle consumer, + const char* stream, + AsapoErrorHandle* error) { + auto err = consumer->handle->SetStreamPersistent(stream); + return process_error(error, std::move(err)); + } //! wraps asapo::Consumer::GetCurrentSize() /// \copydoc asapo::Consumer::GetCurrentSize() /// \param[in] consumer the consumer that is acted upon diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 76f536cbff432ae4d78b6f493b970b357b526587..a6cfacf60ff6c718ba8785f48d207b60cea85b0f 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -1060,6 +1060,15 @@ Error ConsumerImpl::DeleteStream(std::string stream, DeleteStreamOptions options return err; } +Error ConsumerImpl::SetStreamPersistent(std::string stream) { + RequestInfo ri = CreateBrokerApiRequest(std::move(stream), "", "persist"); + ri.post = true; + Error err; + BrokerRequestWithTimeout(ri, &err); + return err; +} + + RequestInfo ConsumerImpl::CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const { auto stream_encoded = httpclient__->UrlEscape(std::move(stream)); auto group_encoded = group.size() > 0 ? httpclient__->UrlEscape(std::move(group)) : ""; diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 6928e7cf2631971a5dc250f1c4001cb2f956c048..8ac94d281f337bbc1ef910ecaaa35b9810e1a5d6 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -89,6 +89,7 @@ class ConsumerImpl final : public asapo::Consumer { Error GetVersionInfo(std::string* client_info, std::string* server_info, bool* supported) override; Error DeleteStream(std::string stream, DeleteStreamOptions options) override; + Error SetStreamPersistent(std::string stream) override; void SetTimeout(uint64_t timeout_ms) override; void ForceNoRdma() override; Error DisableMonitoring(bool disable) override; diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 140e8950eef5ed830f4a9e218792baa059809a92..1715bf9930b84cccaf8f337541ad0ffca68f5644 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -98,6 +98,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: void InterruptCurrentOperation() Error GetVersionInfo(string* client_info,string* server_info, bool* supported) Error DeleteStream(string stream, DeleteStreamOptions options) + Error SetStreamPersistent(string stream) cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: cdef cppclass ConsumerFactory: diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index de490719b58ed31df7d570ad3b226b060498c57d..cee15bdbdce9d141c8b88696646ae459ac822690 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -328,6 +328,22 @@ cdef class PyConsumer: if err: throw_exception(err) + def set_stream_persistent(self, stream = 'default'): + """ + :param stream: stream name + :type stream: string + :raises: + AsapoWrongInputError: wrong input (authorization, ...) + AsapoTimeoutError: request not finished for a given timeout + AsapoProducerError: other errors + """ + cdef Error err + cdef string b_stream = _bytes(stream) + with nogil: + err = self.c_consumer.get().SetStreamPersistent(b_stream) + if err: + throw_exception(err) + def get_unacknowledged_messages(self, group_id, uint64_t from_id = 0, uint64_t to_id = 0, stream = "default"): cdef Error err cdef string b_group_id = _bytes(group_id) diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json index 7ae6db9d0e5f29539e8f706f8a5c7e248e9772ed..fdb6b9d2fb71bd233df91baddcb7ab6ef5de3403 100644 --- a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json +++ b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json @@ -25,6 +25,7 @@ "Tag": "receiver", "ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }}, "LogLevel": "info", + "MaxNumPersistedStreams": 100, "Kafka": { "Enabled": true, "KafkaClient": { diff --git a/deploy/asapo_services/scripts/receiver.json.tpl b/deploy/asapo_services/scripts/receiver.json.tpl index 5ca82a26dc5c0f4d6861d7b9c5fc9fed58e19f0f..5997fe677e07a33052a7211c299ca26ac76cb8f5 100644 --- a/deploy/asapo_services/scripts/receiver.json.tpl +++ b/deploy/asapo_services/scripts/receiver.json.tpl @@ -26,6 +26,7 @@ "Tag": "{{ env "attr.unique.hostname" }}", "ReceiveToDiskThresholdMB": {{ env "NOMAD_META_receiver_receive_to_disk_threshold" }}, "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}", + "MaxNumPersistedStreams": 100, "Kafka": { "Enabled": {{ env "NOMAD_META_receiver_kafka_enabled" }}, "KafkaClient": { diff --git a/discovery/src/asapo_discovery/protocols/protocol_test.go b/discovery/src/asapo_discovery/protocols/protocol_test.go index fff6749cce93bf2e3f832e1eb9e938ac28d5d325..0a6e3adfe98d5a1064df01d3ba82339af27f4955 100644 --- a/discovery/src/asapo_discovery/protocols/protocol_test.go +++ b/discovery/src/asapo_discovery/protocols/protocol_test.go @@ -14,24 +14,22 @@ type protocolTest struct { } var protocolTests = []protocolTest{ -// consumer + // consumer {"consumer", "v0.6", true, "current", "v0.6"}, {"consumer", "v0.5", true, "deprecates", "v0.5"}, {"consumer", "v0.4", true, "deprecates", "v0.4"}, - {"consumer", "v0.3", true, "deprecates", "v0.3"}, - {"consumer", "v0.2", true, "deprecates", "v0.2"}, - {"consumer", "v0.1", true, "deprecates", "v0.1"}, + {"consumer", "v0.3", false, "deprecated", "v0.3"}, + {"consumer", "v0.2", false, "deprecated", "v0.2"}, + {"consumer", "v0.1", false, "deprecated", "v0.1"}, {"consumer", "v1000.2", false, "unknown", "unknown protocol"}, - - -// producer + // producer {"producer", "v0.6", true, "current", "v0.6"}, {"producer", "v0.5", true, "deprecates", "v0.5"}, {"producer", "v0.4", true, "deprecates", "v0.4"}, {"producer", "v0.3", true, "deprecates", "v0.3"}, - {"producer", "v0.2", true, "deprecates", "v0.2"}, - {"producer", "v0.1", true, "deprecates", "v0.1"}, + {"producer", "v0.2", false, "deprecated", "v0.2"}, + {"producer", "v0.1", false, "deprecated", "v0.1"}, {"producer", "v1000.2", false, "unknown", "unknown protocol"}, } diff --git a/discovery/src/asapo_discovery/server/get_version_test.go b/discovery/src/asapo_discovery/server/get_version_test.go index 74c1516234b8ba5bbf94256118a174901174c8ee..1dec9d205abb51fe784994f36bc3b3ce967c5815 100644 --- a/discovery/src/asapo_discovery/server/get_version_test.go +++ b/discovery/src/asapo_discovery/server/get_version_test.go @@ -19,31 +19,31 @@ var versionTests = []struct { message string }{ {"", versionInfo{ - SoftwareVersion: coreVer, - ClientProtocol: protocols.ProtocolInfo{}, - ClientSupported: "", + SoftwareVersion: coreVer, + ClientProtocol: protocols.ProtocolInfo{}, + ClientSupported: "", }, http.StatusOK, "no client"}, {"?client=consumer", versionInfo{ SoftwareVersion: coreVer, - ClientProtocol: protocols.ProtocolInfo{"", nil}, - ClientSupported: "no", + ClientProtocol: protocols.ProtocolInfo{"", nil}, + ClientSupported: "no", }, http.StatusOK, "consumer client, no protocol"}, - {"?client=consumer&protocol=v0.1", versionInfo{ + {"?client=consumer&protocol=v0.6", versionInfo{ SoftwareVersion: coreVer, - ClientProtocol: protocols.ProtocolInfo{"v0.1 (deprecates at 2022-06-01 00:00:00 +0000 UTC)", - map[string]string{"Authorizer":"v0.1", "Broker":"v0.1", "Data cache service":"v0.1", "Discovery":"v0.1", "File Transfer":"v0.1"}}, - ClientSupported: "yes", + ClientProtocol: protocols.ProtocolInfo{"v0.6 (current)", + map[string]string{"Authorizer": "v0.2", "Broker": "v0.6", "Data cache service": "v0.1", "Discovery": "v0.1", "File Transfer": "v0.2"}}, + ClientSupported: "yes", }, http.StatusOK, "consumer client"}, - {"?client=producer&protocol=v0.1", versionInfo{ - SoftwareVersion: coreVer, - ClientProtocol: protocols.ProtocolInfo{"v0.1 (deprecates at 2022-06-01 00:00:00 +0000 UTC)",map[string]string{"Discovery":"v0.1", "Receiver":"v0.1"}}, - ClientSupported: "yes", + {"?client=producer&protocol=v0.6", versionInfo{ + SoftwareVersion: coreVer, + ClientProtocol: protocols.ProtocolInfo{"v0.6 (current)", map[string]string{"Discovery": "v0.1", "Receiver": "v0.6"}}, + ClientSupported: "yes", }, http.StatusOK, "producer client"}, {"?client=producer&protocol=v1000.2", versionInfo{ - SoftwareVersion: coreVer, - ClientProtocol: protocols.ProtocolInfo{"v1000.2 (unknown protocol)",nil}, - ClientSupported: "no", + SoftwareVersion: coreVer, + ClientProtocol: protocols.ProtocolInfo{"v1000.2 (unknown protocol)", nil}, + ClientSupported: "no", }, http.StatusOK, "producer client unknown"}, } @@ -55,9 +55,9 @@ func TestVersionTests(t *testing.T) { var info versionInfo json.Unmarshal(w.Body.Bytes(), &info) fmt.Println(w.Body.String()) - assert.Equal(t, test.result.ClientProtocol,info.ClientProtocol, test.message) - if test.message!="no client" { - assert.Equal(t, true,len(info.SupportedProtocols)>0, test.message) + assert.Equal(t, test.result.ClientProtocol, info.ClientProtocol, test.message) + if test.message != "no client" { + assert.Equal(t, true, len(info.SupportedProtocols) > 0, test.message) } } } diff --git a/discovery/src/asapo_discovery/server/routes_test.go b/discovery/src/asapo_discovery/server/routes_test.go index b98a83a0d7d1688e88c66d3c785ef0a4f12d7d76..30675c91c13661cdc953361014b7cf5ff4fa6fd7 100644 --- a/discovery/src/asapo_discovery/server/routes_test.go +++ b/discovery/src/asapo_discovery/server/routes_test.go @@ -68,7 +68,7 @@ message string var receiverTests = []requestTest { {"/" + version.GetDiscoveryApiVersion()+"/asapo-receiver",http.StatusBadRequest,"protocol missing"}, {"/" + version.GetDiscoveryApiVersion()+"/asapo-receiver?protocol=v1000.2",http.StatusUnsupportedMediaType,"wrong protocol"}, - {"/" + version.GetDiscoveryApiVersion()+"/asapo-receiver?protocol=v0.2",http.StatusOK,"ok"}, + {"/" + version.GetDiscoveryApiVersion()+"/asapo-receiver?protocol=v0.6",http.StatusOK,"ok"}, } func (suite *GetServicesTestSuite) TestGetReceivers() { @@ -94,7 +94,7 @@ func (suite *GetServicesTestSuite) TestGetReceivers() { var brokerTests = []requestTest { {"/" + version.GetDiscoveryApiVersion()+"/asapo-broker",http.StatusBadRequest,"protocol missing"}, {"/" + version.GetDiscoveryApiVersion()+"/asapo-broker?protocol=v1000.2",http.StatusUnsupportedMediaType,"wrong protocol"}, - {"/" + version.GetDiscoveryApiVersion()+"/asapo-broker?protocol=v0.2",http.StatusOK,"ok"}, + {"/" + version.GetDiscoveryApiVersion()+"/asapo-broker?protocol=v0.6",http.StatusOK,"ok"}, } func (suite *GetServicesTestSuite) TestGetBroker() { for _,test:= range brokerTests { @@ -131,7 +131,7 @@ func (suite *GetServicesTestSuite) TestGetFts() { logger.MockLog.On("WithFields", mock.Anything) logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("request"))) - w := doRequest("/" + version.GetDiscoveryApiVersion()+"/asapo-file-transfer?protocol=v0.1") + w := doRequest("/" + version.GetDiscoveryApiVersion()+"/asapo-file-transfer?protocol=v0.6") suite.Equal(http.StatusOK, w.Code, "code ok") suite.Equal(w.Body.String(), "ip_fts", "result") diff --git a/producer/api/c/include/asapo/producer_c.h b/producer/api/c/include/asapo/producer_c.h index ec90bdcb771017debb0241d10e3a9c971ddb6039..34ca22bb40bd97ae883f6ad4b2919ef7ff1d9bca 100644 --- a/producer/api/c/include/asapo/producer_c.h +++ b/producer/api/c/include/asapo/producer_c.h @@ -31,7 +31,8 @@ enum AsapoOpcode { kOpcodeTransferMetaData, kOpcodeDeleteStream, kOpcodeGetMeta, - kOpcodeCount + kOpcodeCount, + kOpcodePersistStream }; diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h index 0dfdf60e4355eaa9f03a53315c9cceefe9d70f4e..e3290b4a191e28b17d29d1cbcf0198ff15e68f53 100644 --- a/producer/api/cpp/include/asapo/producer/producer.h +++ b/producer/api/cpp/include/asapo/producer/producer.h @@ -111,7 +111,7 @@ class Producer { //! Marks stream finished /*! - \param stream - Name of the stream to makr finished + \param stream - Name of the stream to mark finished \param last_id - ID of the last message in stream \param next_stream - Name of the next stream (empty if not set) \return Error - Will be nullptr on success @@ -119,6 +119,13 @@ class Producer { virtual Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream, RequestCallback callback) = 0 ; + //! Marks stream persistent + /*! + \param stream - Name of the stream to be marked persistent + \param timeout_ms - operation timeout in milliseconds + \return Error - Will be nullptr on success + */ + virtual Error SetStreamPersistent(std::string stream, uint64_t timeout_ms) = 0 ; //! Sends beamtime metadata to the receiver /*! diff --git a/producer/api/cpp/src/producer_c_glue.cpp b/producer/api/cpp/src/producer_c_glue.cpp index 40920cb4a583a94de9592ce7ef7f6f5f9ba3fe64..f478eb184ec4eb1a7d0e1ac2fa81403e830bd3aa 100644 --- a/producer/api/cpp/src/producer_c_glue.cpp +++ b/producer/api/cpp/src/producer_c_glue.cpp @@ -26,7 +26,8 @@ extern "C" { kOpcodeTransferMetaData == asapo::Opcode::kOpcodeTransferMetaData&& kOpcodeDeleteStream == asapo::Opcode::kOpcodeDeleteStream&& kOpcodeGetMeta == asapo::Opcode::kOpcodeGetMeta&& - kOpcodeCount == asapo::Opcode::kOpcodeCount, + kOpcodeCount == asapo::Opcode::kOpcodeCount&& + kOpcodePersistStream == asapo::Opcode::kOpcodePersistStream, "incompatible bit reps between c++ and c for asapo::OpCode"); static_assert(kTcp == asapo::RequestHandlerType:: kTcp&& kFilesystem == asapo::RequestHandlerType:: kFilesystem, @@ -109,6 +110,13 @@ extern "C" { auto result = producer->handle->GetBeamtimeMeta(timeout_ms, &err); return handle_or_null_t(result, error, std::move(err)); } + int asapo_producer_set_stream_persistent(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error) { + auto err = producer->handle->SetStreamPersistent(stream, timeout_ms); + return process_error(error, std::move(err)); + } int asapo_producer_delete_stream(AsapoProducerHandle producer, const char* stream, uint64_t timeout_ms, diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index abe31f8faa5e19624ed7763e9e1dfe9116a4cef2..1c0a13a8e767fcde98823a6bb326b8dd2d975033 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -240,6 +240,14 @@ Error ProducerImpl::SendStreamFinishedFlag(std::string stream, uint64_t last_id, return Send(message_header, std::move(stream), nullptr, "", IngestModeFlags::kTransferMetaDataOnly, callback, true); } +Error ProducerImpl::SetStreamPersistent(std::string stream, uint64_t timeout_ms) { + GenericRequestHeader request_header{kOpcodePersistStream, 0, 0, 0, + stream + ".persist", stream}; + Error err; + BlockingRequest(std::move(request_header), timeout_ms, &err); + return err; +} + void ProducerImpl::SetLogLevel(LogLevel level) { log__->SetLogLevel(level); } diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 3e20d453b2747a00ecba4fb6119ed8eed5cc1459..b9182a9833d9b4f708c1ea0d0b48862791ed0961 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -57,6 +57,7 @@ class ProducerImpl : public Producer { RequestCallback callback) override; Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream, RequestCallback callback) override; + Error SetStreamPersistent(std::string stream, uint64_t timeout_ms) override; Error DeleteStream(std::string stream, uint64_t timeout_ms, DeleteStreamOptions options) const override; diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 00898e56192f2cda4e36aebfc1f226aa3aac4996..47b30ff30222a4dcb455e69a4d68b1c1f1b338c6 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -120,6 +120,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: void SetRequestsQueueLimits(uint64_t size, uint64_t volume) Error WaitRequestsFinished(uint64_t timeout_ms) Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback) + Error SetStreamPersistent(string stream, uint64_t timeout_ms) StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err) StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) Error GetVersionInfo(string* client_info,string* server_info, bool* supported) diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 34670d1df9b1c240260cd44e64bf34edf59283cb..ef9942af1639567b05b649e2ed3b49c09ff3d4bf 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -283,6 +283,24 @@ cdef class PyProducer: if callback != None: Py_XINCREF(<PyObject*>callback) + def set_stream_persistent(self, stream = 'default', uint64_t timeout_ms = 1000): + """ + :param stream: stream name + :type stream: string + :param timeout_ms: timeout in milliseconds + :type timeout_ms: int + :raises: + AsapoWrongInputError: wrong input (authorization, ...) + AsapoTimeoutError: request not finished for a given timeout + AsapoProducerError: other errors + """ + cdef Error err + cdef string b_stream = _bytes(stream) + with nogil: + err = self.c_producer.get().SetStreamPersistent(b_stream,timeout_ms) + if err: + throw_exception(err) + def delete_stream(self, stream = 'default', uint64_t timeout_ms = 1000,bool error_on_not_exist = True): """ :param stream: stream name diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index f64ef8866b5c05a30ffcda2440d33764734a85a0..ce93366bdfe80392a8af1bd31bb9369d223567aa 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -23,6 +23,7 @@ set(RECEIVER_CORE_FILES src/request_handler/request_handler_secondary_authorization.cpp src/request_handler/authorization_client.cpp src/request_handler/request_handler_db_meta_write.cpp + src/request_handler/request_handler_db_persist_stream.cpp src/request_handler/request_handler_db_stream_info.cpp src/request_handler/request_handler_db_last_stream.cpp src/request_handler/request_handler_receive_metadata.cpp diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index 6466d39a040f82cd6f8f60f64f44a0520d5a2959..39e40d9259cf0d1c6222171ab86e9952e5d645b3 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -41,6 +41,7 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { (err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) || (err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) || (err = parser.GetString("LogLevel", &log_level)) || + (err = parser.GetUInt64("MaxNumPersistedStreams", &config.max_num_persisted_streams)) || (err = parser.Embedded("Kafka").GetBool("Enabled", &config.kafka_config.enabled)); if (err) { diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index 728f6d81c698f4bfd41e98477f1c943dfd3bf334..0a195650649e04948d03ec865307fb6a6d2c121b 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -30,6 +30,7 @@ struct ReceiverConfig { ReceiverMetricsConfig metrics; std::string discovery_server; KafkaClientConfig kafka_config; + uint64_t max_num_persisted_streams = 0; }; class ReceiverConfigManager { diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index b40207a5b030bc104cf2ed28cb0eb8e0c1754e03..8d6a816cbaa1d5c673228151a4f34561c239e817 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -104,6 +104,10 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request, request->AddHandler(&request_handler_db_get_meta_); break; } + case Opcode::kOpcodePersistStream: { + request->AddHandler(&request_handler_persist_stream_); + break; + } default: return ReceiverErrorTemplates::kInvalidOpCode.Generate(); } diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index 75c39aa91c08427175d2154d866feac8380a7291..ca01644421dc08785e81f761eb51c687ddb3e238 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -16,6 +16,7 @@ #include "request_handler_initial_authorization.h" #include "request_handler_secondary_authorization.h" #include "request_handler_db_meta_write.h" +#include "request_handler_db_persist_stream.h" #include "request_handler_receive_data.h" #include "request_handler_receive_metadata.h" #include "request_handler_db_check_request.h" @@ -48,6 +49,7 @@ class RequestFactory { RequestHandlerMonitoring request_handler_monitoring_; RequestHandlerDbStreamInfo request_handler_db_stream_info_{kDBDataCollectionNamePrefix}; RequestHandlerDbDeleteStream request_handler_delete_stream_{kDBDataCollectionNamePrefix}; + RequestHandlerDbPersistStream request_handler_persist_stream_{kDBMetaCollectionName}; RequestHandlerDbLastStream request_handler_db_last_stream_{kDBDataCollectionNamePrefix}; RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName}; RequestHandlerDbGetMeta request_handler_db_get_meta_{kDBMetaCollectionName}; diff --git a/receiver/src/request_handler/request_handler_db_persist_stream.cpp b/receiver/src/request_handler/request_handler_db_persist_stream.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4aafc2f8b23e4384cf1484d4a2d8eeeef14b472b --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_persist_stream.cpp @@ -0,0 +1,42 @@ +#include "request_handler_db_persist_stream.h" +#include <asapo/json_parser/json_parser.h> +#include <asapo/database/db_error.h> + +#include "../receiver_logger.h" +#include "../receiver_config.h" +#include <sstream> + +namespace asapo { + + RequestHandlerDbPersistStream::RequestHandlerDbPersistStream(std::string collection_name_prefix) : RequestHandlerDb( + std::move(collection_name_prefix)) { + MaxNumPersistedStreams = (int)GetReceiverConfig()->max_num_persisted_streams; + } + + Error RequestHandlerDbPersistStream::ProcessRequest(Request* request) const { + if (auto err = RequestHandlerDb::ProcessRequest(request) ) { + return err; + } + + int stream_num; + auto err = db_client__->GetPersistedStreamsNumber(&stream_num); + + if (err) { + return DBErrorToReceiverError(std::move(err)); + } + + if (stream_num >= MaxNumPersistedStreams) { + return ReceiverErrorTemplates::kBadRequest.Generate("too many persisted streams"); + } + + err = db_client__->PersistStream(request->GetStream()); + + if (err) { + return DBErrorToReceiverError(std::move(err)); + } + + return nullptr; + } + + +} \ No newline at end of file diff --git a/receiver/src/request_handler/request_handler_db_persist_stream.h b/receiver/src/request_handler/request_handler_db_persist_stream.h new file mode 100644 index 0000000000000000000000000000000000000000..c0f094e151d751cc81de0f78e8f40d40dd6792cc --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_persist_stream.h @@ -0,0 +1,19 @@ +#ifndef ASAPO_OLD_REQUEST_HANDLER_DB_PERSIST_STREAM_H +#define ASAPO_OLD_REQUEST_HANDLER_DB_PERSIST_STREAM_H + +#include "request_handler_db.h" +#include "../request.h" + +namespace asapo { + +class RequestHandlerDbPersistStream: public RequestHandlerDb { +public: + RequestHandlerDbPersistStream(std::string collection_name_prefix); + Error ProcessRequest(Request* request) const override; +private: + int MaxNumPersistedStreams; +}; + +} + +#endif //ASAPO_OLD_REQUEST_HANDLER_DB_PERSIST_STREAM_H diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index 026eeb66891154a9eb1eb3298b9a63a931ce89bf..6ed9dada5d130a7c199f195b08acdbd53f2f6f1f 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -82,6 +82,7 @@ Error SetReceiverConfigWithError (const ReceiverConfig& config, std::string erro config_string += "," + Key("AuthorizationServer", error_field) + "\"" + config.authorization_server + "\""; config_string += "," + Key("LogLevel", error_field) + "\"" + log_level + "\""; config_string += "," + Key("Tag", error_field) + "\"" + config.tag + "\""; + config_string += "," + Key("MaxNumPersistedStreams", error_field) + std::to_string(config.max_num_persisted_streams); config_string += "," + Key("Kafka", error_field) + "{"; config_string += Key("Enabled", error_field) + (config.kafka_config.enabled ? "true" : "false") ; config_string += "}"; diff --git a/tests/automatic/broker/check_monitoring/check_linux.sh b/tests/automatic/broker/check_monitoring/check_linux.sh index d0b81c6ee5bfeaaf635fa83be46c8238ba98a7e2..f48c9671cfcb6d83f9677f5aea9c921d45fbe6c9 100644 --- a/tests/automatic/broker/check_monitoring/check_linux.sh +++ b/tests/automatic/broker/check_monitoring/check_linux.sh @@ -17,7 +17,7 @@ Cleanup() { token=$BT_DATA_TOKEN -broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.3` +broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6` echo found broker at $broker groupid=`curl -d '' --silent $broker/v0.2/creategroup` diff --git a/tests/automatic/broker/get_last/check_linux.sh b/tests/automatic/broker/get_last/check_linux.sh index f3cfc8b57ad4ffc548d82a90a618c686e30a03d8..6f50ea116a40be683a75de25dfc07d3d5f0b7b4e 100644 --- a/tests/automatic/broker/get_last/check_linux.sh +++ b/tests/automatic/broker/get_last/check_linux.sh @@ -17,7 +17,7 @@ echo "db.data_${stream}.insert({"_id":1})" | mongo ${database_name} token=$BT_DATA_TOKEN -broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.3` +broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6` echo found broker at $broker groupid=`curl -d '' --silent $broker/v0.2/creategroup` diff --git a/tests/automatic/broker/get_last/check_windows.bat b/tests/automatic/broker/get_last/check_windows.bat index dacb2771c01cd7bd51af8e337ed1598a792fc70f..2148f3a4abe96aa30071d731562a46fcbc9252f9 100644 --- a/tests/automatic/broker/get_last/check_windows.bat +++ b/tests/automatic/broker/get_last/check_windows.bat @@ -6,7 +6,7 @@ echo db.data_default.insert({"_id":2}) | %mongo_exe% %database_name% || goto :e set token=%BT_DATA_TOKEN% -curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.3 > broker +curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6 > broker set /P broker=< broker C:\Curl\curl.exe -d '' --silent %broker%/v0.2/creategroup > groupid diff --git a/tests/automatic/broker/get_meta/check_linux.sh b/tests/automatic/broker/get_meta/check_linux.sh index c1d9e4d45302696d854bf333747a1945f3c1f4d1..75f3ca1daf27faa0d438502af10de9cb01b4de1d 100644 --- a/tests/automatic/broker/get_meta/check_linux.sh +++ b/tests/automatic/broker/get_meta/check_linux.sh @@ -16,7 +16,7 @@ echo 'db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}})' | mongo ${dat token=$BT_TEST_TOKEN -broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.3` +broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6` echo found broker at $broker diff --git a/tests/automatic/broker/get_meta/check_windows.bat b/tests/automatic/broker/get_meta/check_windows.bat index 0631edfaec38a1bd5057408d94cd274826d3b378..885986a8adc6dfa29227e83443ab2b305f0d814f 100644 --- a/tests/automatic/broker/get_meta/check_windows.bat +++ b/tests/automatic/broker/get_meta/check_windows.bat @@ -4,7 +4,7 @@ SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe" echo db.meta.insert({"_id":"bt","meta":{"data":"test_bt"}}) | %mongo_exe% %database_name% || goto :error echo db.meta.insert({"_id":"st_test","meta":{"data":"test_st"}}) | %mongo_exe% %database_name% || goto :error -curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.3 > broker +curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6 > broker set /P broker=< broker set token=%BT_DATA_TOKEN% diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh index bcef1d69dbff830086679c027c77828f680dc117..840a0f12d7583792b841b8954309b553daacae3f 100644 --- a/tests/automatic/broker/get_next/check_linux.sh +++ b/tests/automatic/broker/get_next/check_linux.sh @@ -17,7 +17,7 @@ echo "db.data_${stream}.insert({"_id":1})" | mongo ${database_name} token=$BT_DATA_TOKEN -broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.3` +broker=`curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6` echo found broker at $broker diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat index ba8760fa73377ce49e913da2a9f09f683871d393..3ca4ab9208c931a14631a614fdd593518c807613 100644 --- a/tests/automatic/broker/get_next/check_windows.bat +++ b/tests/automatic/broker/get_next/check_windows.bat @@ -6,7 +6,7 @@ echo db.data_default.insert({"_id":2}) | %mongo_exe% %database_name% || goto :e set token=%BT_DATA_TOKEN% -curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.3 > broker +curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6 > broker set /P broker=< broker diff --git a/tests/automatic/common_scripts/start_services.bat b/tests/automatic/common_scripts/start_services.bat index 459fa99a075ae4bc19e2a6cf155524330576be90..32eab2fe62e2cfd8b8a49b8e5b84c4f6d0fea00d 100644 --- a/tests/automatic/common_scripts/start_services.bat +++ b/tests/automatic/common_scripts/start_services.bat @@ -21,9 +21,9 @@ if %started% EQU 0 ( set started=1 ping 192.0.2.1 -n 1 -w 3000 > nul ) -curl --silent --fail 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | findstr 127.0.0.1 || goto :repeat -curl --silent --fail 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.1 --stderr - | findstr 127.0.0.1 || goto :repeat -curl --silent --fail 127.0.0.1:8400/asapo-discovery/v0.1/asapo-file-transfer?protocol=v0.1 --stderr - | findstr 127.0.0.1 || goto :repeat +curl --silent --fail 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.6 --stderr - | findstr 127.0.0.1 || goto :repeat +curl --silent --fail 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6 --stderr - | findstr 127.0.0.1 || goto :repeat +curl --silent --fail 127.0.0.1:8400/asapo-discovery/v0.1/asapo-file-transfer?protocol=v0.6 --stderr - | findstr 127.0.0.1 || goto :repeat echo services ready diff --git a/tests/automatic/common_scripts/start_services.sh b/tests/automatic/common_scripts/start_services.sh index f321f2cdb2bbc82fa40236b828d3153fd094aecd..19f9e6a2fb5e130cdd4b434bf5143b2ca73598e9 100755 --- a/tests/automatic/common_scripts/start_services.sh +++ b/tests/automatic/common_scripts/start_services.sh @@ -21,15 +21,15 @@ while true do sleep 1 echo starting services - curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.6 --stderr - | grep 127.0.0.1 || continue echo recevier started - curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-broker?protocol=v0.6 --stderr - | grep 127.0.0.1 || continue echo broker started - curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-file-transfer?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-file-transfer?protocol=v0.6 --stderr - | grep 127.0.0.1 || continue echo file transfer started curl --silent 127.0.0.1:8400/asapo-discovery/asapo-monitoring --stderr - | grep 127.0.0.1 || continue echo monitoring started break done -echo services ready \ No newline at end of file +echo services ready diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh index 8bab44a7dbcffb73650feb0a9443c74d94d7e369..f1969878d26e6c52eabf0debe2a1fb6c42f22d1e 100644 --- a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/check_linux.sh @@ -34,7 +34,7 @@ Cleanup() { while true do sleep 1 - curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.6 --stderr - | grep 127.0.0.1 || continue echo recevier started break done @@ -65,7 +65,7 @@ if [[ $network_type == "fabric" ]]; then while true do sleep 1 - curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.6 --stderr - | grep 127.0.0.1 || continue echo recevier started break done diff --git a/tests/automatic/full_chain/two_beamlines/check_linux.sh b/tests/automatic/full_chain/two_beamlines/check_linux.sh index f2fbb99e0c030d7c7a304a0920d79976614ee460..61271989b8d81888f246bd7b2758f3dc9c3160f9 100644 --- a/tests/automatic/full_chain/two_beamlines/check_linux.sh +++ b/tests/automatic/full_chain/two_beamlines/check_linux.sh @@ -35,7 +35,7 @@ Cleanup() { while true do sleep 1 - curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.6 --stderr - | grep 127.0.0.1 || continue echo recevier started break done @@ -64,7 +64,7 @@ if [[ $network_type == "fabric" ]]; then while true do sleep 1 - curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.6 --stderr - | grep 127.0.0.1 || continue echo recevier started break done diff --git a/tests/automatic/high_avail/broker_mongo_restart/check_linux.sh b/tests/automatic/high_avail/broker_mongo_restart/check_linux.sh index a3a9895ec010fbec8dd0afe573e7c6950571fca5..ebb05c5bcc2784927ff7df8f842883ac31b1b985 100755 --- a/tests/automatic/high_avail/broker_mongo_restart/check_linux.sh +++ b/tests/automatic/high_avail/broker_mongo_restart/check_linux.sh @@ -41,6 +41,7 @@ function kill_mongo { function start_mongo { + mkdir -p /tmp/mongo mongod --dbpath /tmp/mongo --port $1 --logpath /tmp/mongolog --fork } diff --git a/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh b/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh index 4d7a281e16dc7281a6dda26d4850ee64719d6fbf..f9ddcf2ba576fcd1c10ac88b86ac62e2cf33a10e 100644 --- a/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh +++ b/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh @@ -24,6 +24,7 @@ function kill_mongo { function start_mongo { + mkdir -p /tmp/mongo mongod --dbpath /tmp/mongo --port 27016 --logpath /tmp/mongolog --fork } diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh index 6af384ac67285ab62fff4094197c7bea4bcdf572..119354a5d92c13145ecd225061d586b5cd88a77c 100644 --- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh @@ -21,7 +21,7 @@ Cleanup() { while true do sleep 1 - curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.6 --stderr - | grep 127.0.0.1 || continue echo recevier started break done @@ -53,7 +53,7 @@ nomad run -var receiver_kafka_metadata_broker_list="${BOOTSTRAP}" receiver_kafka while true do sleep 1 - curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.6 --stderr - | grep 127.0.0.1 || continue echo recevier started break done diff --git a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in index ff23cda05e003c5f20dfee4a716a61743bf8892e..e2c63ebc2004d57df20bbbc1a86c74b2ccb1a971 100644 --- a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in @@ -25,6 +25,7 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, + "MaxNumPersistedStreams":100, "LogLevel" : "debug", "Kafka" : { "Enabled" : false diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in index d7bd04f56d72552911b1f5bb46f1fa4993d4a0c3..1a176526551a493214dba2b18e1ea72450cf0b85 100644 --- a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in @@ -25,6 +25,7 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, + "MaxNumPersistedStreams":100, "LogLevel" : "debug", "Kafka" : { "Enabled" : true, diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in index 386a9db3cd48193d1d335f938f0a16272670e99e..d8c372ba31e79587b8d5d8b72e531efa62181c87 100644 --- a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in @@ -25,6 +25,7 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, + "MaxNumPersistedStreams":100, "LogLevel" : "debug", "Kafka" : { "Enabled" : false diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.win.in b/tests/automatic/settings/receiver_tcp.json.tpl.win.in index de39191ca2c7fb1ed0219d28dd5c47187d811429..fa9150fc001f34b5ea7180d6614184f019aae196 100644 --- a/tests/automatic/settings/receiver_tcp.json.tpl.win.in +++ b/tests/automatic/settings/receiver_tcp.json.tpl.win.in @@ -25,6 +25,7 @@ }, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, + "MaxNumPersistedStreams":100, "LogLevel" : "debug", "Kafka" : { "Enabled" : false