diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 7a9d2c13ddd66cab74161cca616a3f4dc77c0946..939c7997728b770d1e825870356bd07a47239b4e 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -60,6 +60,11 @@ type LocationPointer struct { Value int `bson:"current_pointer"` } +type PersistedStreamsList struct { + ID int `json:"_id"` + Streams []string `json:"persisted_streams"` +} + const data_collection_name_prefix = "data_" const acks_collection_name_prefix = "acks_" const inprocess_collection_name_prefix = "inprocess_" @@ -71,6 +76,7 @@ const last_message_field_name = "last_message" const no_session_msg = "database client not created" const already_connected_msg = "already connected" +const too_many_persisted_streams = "too many persisted streams" const finish_stream_keyword = "asapo_finish_stream" const no_next_stream_keyword = "asapo_no_next" @@ -969,6 +975,30 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) { return nil, err } +func (db *Mongodb) persistStream(request Request) ([]byte, error) { + if db.client == nil { + return nil, &DBError{utils.StatusServiceUnavailable, no_session_msg} + } + + maxNum, _ := strconv.Atoi(request.ExtraParam) + + c := db.client.Database(request.DbName()).Collection(meta_collection_name) + q := bson.M{"_id": "persist"} + l := PersistedStreamsList{} + err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&l) + if err != nil && err != mongo.ErrNoDocuments { + return nil, err + } + if len(l.Streams) >= maxNum { + return nil, &DBError{utils.StatusWrongInput, too_many_persisted_streams} + } + l.Streams = append(l.Streams, request.Stream) + + _, err = c.InsertOne(context.TODO(), l) + + return nil, err +} + func (db *Mongodb) lastAck(request Request) ([]byte, error) { c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId) opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true) @@ -1098,6 +1128,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { return db.lastAck(request) case "delete_stream": return db.deleteStream(request) + case "persist_stream": + return db.persistStream(request) } return nil, errors.New("Wrong db operation: " + request.Op) diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index 3c99b1a8a85f5a6dd4599df35d7e80f143df807a..885bb180077d127f4238adc0f026ba2a24d62b9a 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -29,6 +29,12 @@ var listRoutes = utils.Routes{ "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/delete", routeDeleteStream, }, + utils.Route{ + "PersistStream", + "Post", + "/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/persist", + routePersistStream, + }, utils.Route{ "GetLast", "Get", diff --git a/broker/src/asapo_broker/server/post_persist_stream.go b/broker/src/asapo_broker/server/post_persist_stream.go new file mode 100644 index 0000000000000000000000000000000000000000..11bc6f8c974270008f58d0bb84c770f0b3387b84 --- /dev/null +++ b/broker/src/asapo_broker/server/post_persist_stream.go @@ -0,0 +1,11 @@ +package server + +import ( + "net/http" + "strconv" +) + +func routePersistStream(w http.ResponseWriter, r *http.Request) { + maxNum := strconv.Itoa(settings.GetMaxNumPersistedStreams()) + processRequest(w, r, "persist_stream", maxNum, false) +} diff --git a/broker/src/asapo_broker/server/server.go b/broker/src/asapo_broker/server/server.go index 51800b2c592c4e2cfa28d5b5ece809430a83169f..27ab64fb9a9d13b6233e8d28d00be7a1b5411d94 100644 --- a/broker/src/asapo_broker/server/server.go +++ b/broker/src/asapo_broker/server/server.go @@ -10,6 +10,7 @@ import ( const kDefaultresendInterval = 10 const kDefaultStreamCacheUpdateIntervalMs = 100 +const kDefaultMaxNumPersistedStreams = 100 const kDefaultTokenCacheUpdateIntervalMs = 60000 var db database.Agent @@ -20,7 +21,7 @@ type serverSettings struct { PerformanceDbServer string PerformanceDbName string MonitoringServerUrl string - MonitorPerformance bool + MonitorPerformance bool AuthorizationServer string Port int LogLevel string @@ -28,6 +29,7 @@ type serverSettings struct { CheckResendInterval *int StreamCacheUpdateIntervalMs *int TokenCacheUpdateIntervalMs *int + MaxNumPersistedStreams *int } func (s *serverSettings) GetTokenCacheUpdateInterval() int { @@ -59,6 +61,14 @@ func (s *serverSettings) GetDatabaseServer() string { } } +func (s *serverSettings) GetMaxNumPersistedStreams() int { + if s.MaxNumPersistedStreams == nil { + return kDefaultMaxNumPersistedStreams + } else { + return *s.MaxNumPersistedStreams + } +} + var settings serverSettings var statistics serverStatistics var monitoring brokerMonitoring @@ -93,7 +103,7 @@ func InitDB(dbAgent database.Agent) (err error) { } func CreateDiscoveryService() { - discoveryService = discovery.CreateDiscoveryService(&http.Client{},"http://" + settings.DiscoveryServer) + discoveryService = discovery.CreateDiscoveryService(&http.Client{}, "http://"+settings.DiscoveryServer) } func CleanupDB() { diff --git a/common/cpp/include/asapo/common/networking.h b/common/cpp/include/asapo/common/networking.h index d814a88df4db17d0f9181f46ae5cb0ec6f133da3..548372db34fd179d5a1ce78e67a8f88fea21312e 100644 --- a/common/cpp/include/asapo/common/networking.h +++ b/common/cpp/include/asapo/common/networking.h @@ -28,6 +28,7 @@ enum Opcode : uint8_t { kOpcodeGetBufferData, kOpcodeAuthorize, kOpcodeTransferMetaData, + kOpcodePersistStream, kOpcodeDeleteStream, kOpcodeGetMeta, kOpcodeCount, @@ -43,6 +44,7 @@ inline std::string OpcodeToString(uint8_t code) { case kOpcodeAuthorize:return "authorize"; case kOpcodeTransferMetaData:return "transfer metadata"; case kOpcodeDeleteStream:return "delete stream"; + case kOpcodePersistStream:return "persist stream"; case kOpcodeGetMeta:return "get meta"; default:return "unknown op"; } diff --git a/common/cpp/include/asapo/database/database.h b/common/cpp/include/asapo/database/database.h index 61c32e0dcec5a7867d0f6d9c87aff8fb3846657c..b6acc8cdd3cb2a7965c3d208c6b4d29190474e6e 100644 --- a/common/cpp/include/asapo/database/database.h +++ b/common/cpp/include/asapo/database/database.h @@ -28,6 +28,8 @@ class Database { virtual Error GetLastStream(StreamInfo* info) const = 0; virtual Error DeleteStream(const std::string& stream) const = 0; virtual Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const = 0; + virtual Error GetPersistedStreamsNumber(int* res) const = 0; + virtual Error PersistStream(const std::string& stream_name) const = 0; virtual ~Database() = default; }; diff --git a/common/cpp/include/asapo/unittests/MockDatabase.h b/common/cpp/include/asapo/unittests/MockDatabase.h index 887653507910802a51fe56d66037dd4ec99db07e..97e733dec4d0caeaab30d02b7fea92fcb67d4da0 100644 --- a/common/cpp/include/asapo/unittests/MockDatabase.h +++ b/common/cpp/include/asapo/unittests/MockDatabase.h @@ -70,6 +70,18 @@ class MockDatabase : public Database { return Error{DeleteStream_t(stream)}; } + MOCK_METHOD(ErrorInterface *, GetPersistedStreamsNumber_t, (int*), (const)); + + Error GetPersistedStreamsNumber(int* res) const override { + return Error{GetPersistedStreamsNumber_t(res)}; + } + + MOCK_METHOD(ErrorInterface *, PersistStream_t, (const std::string&), (const)); + + Error PersistStream(const std::string& stream_name) const override { + return Error{PersistStream_t(stream_name)}; + } + MOCK_METHOD(ErrorInterface *, GetLastStream_t, (StreamInfo*), (const)); diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index bfb0d8ffbb6cf083740edbe149962206840251b9..c15dda36068bbb29866111bc28855998d9dc061b 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -819,6 +819,7 @@ Error MongoDBClient::DeleteStream(const std::string& stream) const { DeleteDocumentsInCollection("current_location", querystr); } DeleteDocumentsInCollection("meta", "^" + EscapeQuery(stream_encoded) + "$"); + ModifyPersistedStreamsList(stream, false); return err; } @@ -837,4 +838,56 @@ Error MongoDBClient::GetMetaFromDb(const std::string& collection, const std::str return nullptr; } +Error MongoDBClient::GetPersistedStreamsNumber(int* res) const { + UpdateCurrentCollectionIfNeeded("meta"); + //db.meta.aggregate([{$match: {"_id":"persist"}},{$project: {size: {$size: "$persisted_streams"}}}]) + auto pipeline = BCON_NEW("pipeline", "[", + "{", "$match", "{", "_id", "persist", "}", "}", + "{", "$project", "{", "size", "{", "$size", "$persisted_streams", "}", "}", "}", + "]"); + auto cursor = mongoc_collection_aggregate(current_collection_, MONGOC_QUERY_NONE, pipeline, NULL, NULL); + *res = 0; + const bson_t* doc; + bson_iter_t iter; + while (mongoc_cursor_next(cursor, &doc)) { + if (bson_iter_init_find(&iter, doc, "size") && BSON_ITER_HOLDS_INT32 (&iter)) { + *res = bson_iter_int32(&iter); + } + } + + Error err = nullptr; + bson_error_t mongo_err; + if (mongoc_cursor_error(cursor, &mongo_err)) { + err = DBErrorTemplates::kDBError.Generate(mongo_err.message); + } + + mongoc_cursor_destroy(cursor); + bson_destroy(pipeline); + + return err; +} + +Error MongoDBClient::PersistStream(const std::string& stream_name) const { + return ModifyPersistedStreamsList(stream_name, true); +} + +Error MongoDBClient::ModifyPersistedStreamsList(const std::string& stream_name, bool add) const { + UpdateCurrentCollectionIfNeeded("meta"); + //db.meta.updateOne({"_id" : "persist"},{$push : {"persisted_streams" : "default"}},{"upsert": true}) + bson_t* selector = BCON_NEW ("_id", "persist"); + bson_t* update = BCON_NEW (add ? "$addToSet" : "$pull", "{", "persisted_streams", BCON_UTF8(stream_name.c_str()), "}"); + bson_t* opts = BCON_NEW ("upsert", BCON_BOOL(add)); + + bson_error_t mongo_err; + Error err = nullptr; + if (!mongoc_collection_update_one(current_collection_, selector, update, opts, NULL, &mongo_err)) { + err = DBErrorTemplates::kInsertError.Generate(mongo_err.message); + } + bson_destroy(selector); + bson_destroy(update); + bson_destroy(opts); + + return err; +} + } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index 90fbe73a6b4a6bd60ced7236964001557fe86393..d2a0621f9e382c9540f0c1cf3f7dc76986055616 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -59,6 +59,8 @@ class MongoDBClient final : public Database { Error GetLastStream(StreamInfo* info) const override; Error DeleteStream(const std::string& stream) const override; Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const override; + Error GetPersistedStreamsNumber(int* res) const override; + Error PersistStream(const std::string& stream_name) const override; Error GetNextId(const std::string& collection, uint64_t* id) const; ~MongoDBClient() override; private: @@ -86,6 +88,7 @@ class MongoDBClient final : public Database { Error DeleteCollections(const std::string& prefix) const; Error DeleteDocumentsInCollection(const std::string& collection_name, const std::string& querystr) const; Error InsertWithAutoId(const MessageMeta& file, uint64_t* id_inserted) const; + Error ModifyPersistedStreamsList(const std::string& stream_name, bool add) const; }; struct TransactionContext { diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h index 80c1f841bcf17b09888fcd49a6f6208f3af2810e..1c12207cb4afd0bc5dfe8efef4b1ba2a06c4e27c 100644 --- a/consumer/api/c/include/asapo/consumer_c.h +++ b/consumer/api/c/include/asapo/consumer_c.h @@ -116,6 +116,9 @@ int asapo_consumer_delete_stream(AsapoConsumerHandle consumer, const char* stream, AsapoBool delete_meta, AsapoBool error_on_not_exist, AsapoErrorHandle* error); +int asapo_consumer_set_stream_persistent(AsapoConsumerHandle consumer, + const char* stream, + AsapoErrorHandle* error); int64_t asapo_consumer_get_current_size(AsapoConsumerHandle consumer, const char* stream, AsapoErrorHandle* error); diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h index 2527ca21c2058c32bfec3afed4c64292fe481b11..2715705605dfa8665d7ba98612dce8de528ed073 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer.h @@ -101,6 +101,12 @@ class Consumer { */ virtual Error DeleteStream(std::string stream, DeleteStreamOptions options) = 0; + //! Marks stream persistent + /*! + \param stream - Name of the stream to be marked persistent + \return Error - Will be nullptr on success + */ + virtual Error SetStreamPersistent(std::string stream) = 0 ; //! Get current number of messages in stream /*! diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index 1fb7f6dee01f71bc3ecf89b594d8b45b228c7072..1374b97ace0b1a2233c180d343975366ce399634 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -286,6 +286,18 @@ extern "C" { auto err = consumer->handle->DeleteStream(stream, opt); return process_error(error, std::move(err)); } +//! wraps asapo::Consumer::SetStreamsPersistent() +/// \copydoc asapo::Consumer::SetStreamsPersistent() +/// \param[in] consumer the consumer that is acted upon +/// \param[in] stream the name of the stream to be persisted +/// \param[out] error NULL or pointer to error handle to be set +/// \return 0 if completed successfully, -1 in case of error. +int asapo_consumer_set_stream_persistent(AsapoConsumerHandle consumer, + const char* stream, + AsapoErrorHandle* error) { + auto err = consumer->handle->SetStreamPersistent(stream); + return process_error(error, std::move(err)); + } //! wraps asapo::Consumer::GetCurrentSize() /// \copydoc asapo::Consumer::GetCurrentSize() /// \param[in] consumer the consumer that is acted upon diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp index 76f536cbff432ae4d78b6f493b970b357b526587..a6cfacf60ff6c718ba8785f48d207b60cea85b0f 100644 --- a/consumer/api/cpp/src/consumer_impl.cpp +++ b/consumer/api/cpp/src/consumer_impl.cpp @@ -1060,6 +1060,15 @@ Error ConsumerImpl::DeleteStream(std::string stream, DeleteStreamOptions options return err; } +Error ConsumerImpl::SetStreamPersistent(std::string stream) { + RequestInfo ri = CreateBrokerApiRequest(std::move(stream), "", "persist"); + ri.post = true; + Error err; + BrokerRequestWithTimeout(ri, &err); + return err; +} + + RequestInfo ConsumerImpl::CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const { auto stream_encoded = httpclient__->UrlEscape(std::move(stream)); auto group_encoded = group.size() > 0 ? httpclient__->UrlEscape(std::move(group)) : ""; diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h index 6928e7cf2631971a5dc250f1c4001cb2f956c048..8ac94d281f337bbc1ef910ecaaa35b9810e1a5d6 100644 --- a/consumer/api/cpp/src/consumer_impl.h +++ b/consumer/api/cpp/src/consumer_impl.h @@ -89,6 +89,7 @@ class ConsumerImpl final : public asapo::Consumer { Error GetVersionInfo(std::string* client_info, std::string* server_info, bool* supported) override; Error DeleteStream(std::string stream, DeleteStreamOptions options) override; + Error SetStreamPersistent(std::string stream) override; void SetTimeout(uint64_t timeout_ms) override; void ForceNoRdma() override; Error DisableMonitoring(bool disable) override; diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd index 140e8950eef5ed830f4a9e218792baa059809a92..1715bf9930b84cccaf8f337541ad0ffca68f5644 100644 --- a/consumer/api/python/asapo_consumer.pxd +++ b/consumer/api/python/asapo_consumer.pxd @@ -98,6 +98,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: void InterruptCurrentOperation() Error GetVersionInfo(string* client_info,string* server_info, bool* supported) Error DeleteStream(string stream, DeleteStreamOptions options) + Error SetStreamPersistent(string stream) cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil: cdef cppclass ConsumerFactory: diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in index de490719b58ed31df7d570ad3b226b060498c57d..cee15bdbdce9d141c8b88696646ae459ac822690 100644 --- a/consumer/api/python/asapo_consumer.pyx.in +++ b/consumer/api/python/asapo_consumer.pyx.in @@ -328,6 +328,22 @@ cdef class PyConsumer: if err: throw_exception(err) + def set_stream_persistent(self, stream = 'default'): + """ + :param stream: stream name + :type stream: string + :raises: + AsapoWrongInputError: wrong input (authorization, ...) + AsapoTimeoutError: request not finished for a given timeout + AsapoProducerError: other errors + """ + cdef Error err + cdef string b_stream = _bytes(stream) + with nogil: + err = self.c_consumer.get().SetStreamPersistent(b_stream) + if err: + throw_exception(err) + def get_unacknowledged_messages(self, group_id, uint64_t from_id = 0, uint64_t to_id = 0, stream = "default"): cdef Error err cdef string b_group_id = _bytes(group_id) diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json index 7ae6db9d0e5f29539e8f706f8a5c7e248e9772ed..fdb6b9d2fb71bd233df91baddcb7ab6ef5de3403 100644 --- a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json +++ b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json @@ -25,6 +25,7 @@ "Tag": "receiver", "ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }}, "LogLevel": "info", + "MaxNumPersistedStreams": 100, "Kafka": { "Enabled": true, "KafkaClient": { diff --git a/deploy/asapo_services/scripts/receiver.json.tpl b/deploy/asapo_services/scripts/receiver.json.tpl index 5ca82a26dc5c0f4d6861d7b9c5fc9fed58e19f0f..5997fe677e07a33052a7211c299ca26ac76cb8f5 100644 --- a/deploy/asapo_services/scripts/receiver.json.tpl +++ b/deploy/asapo_services/scripts/receiver.json.tpl @@ -26,6 +26,7 @@ "Tag": "{{ env "attr.unique.hostname" }}", "ReceiveToDiskThresholdMB": {{ env "NOMAD_META_receiver_receive_to_disk_threshold" }}, "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}", + "MaxNumPersistedStreams": 100, "Kafka": { "Enabled": {{ env "NOMAD_META_receiver_kafka_enabled" }}, "KafkaClient": { diff --git a/producer/api/c/include/asapo/producer_c.h b/producer/api/c/include/asapo/producer_c.h index ec90bdcb771017debb0241d10e3a9c971ddb6039..02f62eb1bbce1f491a8c3b88f2a5cebc587c40cf 100644 --- a/producer/api/c/include/asapo/producer_c.h +++ b/producer/api/c/include/asapo/producer_c.h @@ -29,6 +29,7 @@ enum AsapoOpcode { kOpcodeGetBufferData, kOpcodeAuthorize, kOpcodeTransferMetaData, + kOpcodePersistStream, kOpcodeDeleteStream, kOpcodeGetMeta, kOpcodeCount diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h index 0dfdf60e4355eaa9f03a53315c9cceefe9d70f4e..e3290b4a191e28b17d29d1cbcf0198ff15e68f53 100644 --- a/producer/api/cpp/include/asapo/producer/producer.h +++ b/producer/api/cpp/include/asapo/producer/producer.h @@ -111,7 +111,7 @@ class Producer { //! Marks stream finished /*! - \param stream - Name of the stream to makr finished + \param stream - Name of the stream to mark finished \param last_id - ID of the last message in stream \param next_stream - Name of the next stream (empty if not set) \return Error - Will be nullptr on success @@ -119,6 +119,13 @@ class Producer { virtual Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream, RequestCallback callback) = 0 ; + //! Marks stream persistent + /*! + \param stream - Name of the stream to be marked persistent + \param timeout_ms - operation timeout in milliseconds + \return Error - Will be nullptr on success + */ + virtual Error SetStreamPersistent(std::string stream, uint64_t timeout_ms) = 0 ; //! Sends beamtime metadata to the receiver /*! diff --git a/producer/api/cpp/src/producer_c_glue.cpp b/producer/api/cpp/src/producer_c_glue.cpp index 40920cb4a583a94de9592ce7ef7f6f5f9ba3fe64..ba65c92268835301f25218dc6965554a458a60a8 100644 --- a/producer/api/cpp/src/producer_c_glue.cpp +++ b/producer/api/cpp/src/producer_c_glue.cpp @@ -24,6 +24,7 @@ extern "C" { kOpcodeGetBufferData == asapo::Opcode::kOpcodeGetBufferData&& kOpcodeAuthorize == asapo::Opcode::kOpcodeAuthorize&& kOpcodeTransferMetaData == asapo::Opcode::kOpcodeTransferMetaData&& + kOpcodePersistStream == asapo::Opcode::kOpcodePersistStream&& kOpcodeDeleteStream == asapo::Opcode::kOpcodeDeleteStream&& kOpcodeGetMeta == asapo::Opcode::kOpcodeGetMeta&& kOpcodeCount == asapo::Opcode::kOpcodeCount, @@ -109,6 +110,13 @@ extern "C" { auto result = producer->handle->GetBeamtimeMeta(timeout_ms, &err); return handle_or_null_t(result, error, std::move(err)); } + int asapo_producer_set_stream_persistent(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error) { + auto err = producer->handle->SetStreamPersistent(stream, timeout_ms); + return process_error(error, std::move(err)); + } int asapo_producer_delete_stream(AsapoProducerHandle producer, const char* stream, uint64_t timeout_ms, diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp index abe31f8faa5e19624ed7763e9e1dfe9116a4cef2..1c0a13a8e767fcde98823a6bb326b8dd2d975033 100644 --- a/producer/api/cpp/src/producer_impl.cpp +++ b/producer/api/cpp/src/producer_impl.cpp @@ -240,6 +240,14 @@ Error ProducerImpl::SendStreamFinishedFlag(std::string stream, uint64_t last_id, return Send(message_header, std::move(stream), nullptr, "", IngestModeFlags::kTransferMetaDataOnly, callback, true); } +Error ProducerImpl::SetStreamPersistent(std::string stream, uint64_t timeout_ms) { + GenericRequestHeader request_header{kOpcodePersistStream, 0, 0, 0, + stream + ".persist", stream}; + Error err; + BlockingRequest(std::move(request_header), timeout_ms, &err); + return err; +} + void ProducerImpl::SetLogLevel(LogLevel level) { log__->SetLogLevel(level); } diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 3e20d453b2747a00ecba4fb6119ed8eed5cc1459..b9182a9833d9b4f708c1ea0d0b48862791ed0961 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -57,6 +57,7 @@ class ProducerImpl : public Producer { RequestCallback callback) override; Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream, RequestCallback callback) override; + Error SetStreamPersistent(std::string stream, uint64_t timeout_ms) override; Error DeleteStream(std::string stream, uint64_t timeout_ms, DeleteStreamOptions options) const override; diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd index 00898e56192f2cda4e36aebfc1f226aa3aac4996..47b30ff30222a4dcb455e69a4d68b1c1f1b338c6 100644 --- a/producer/api/python/asapo_producer.pxd +++ b/producer/api/python/asapo_producer.pxd @@ -120,6 +120,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil: void SetRequestsQueueLimits(uint64_t size, uint64_t volume) Error WaitRequestsFinished(uint64_t timeout_ms) Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback) + Error SetStreamPersistent(string stream, uint64_t timeout_ms) StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err) StreamInfo GetLastStream(uint64_t timeout_ms, Error* err) Error GetVersionInfo(string* client_info,string* server_info, bool* supported) diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in index 34670d1df9b1c240260cd44e64bf34edf59283cb..ef9942af1639567b05b649e2ed3b49c09ff3d4bf 100644 --- a/producer/api/python/asapo_producer.pyx.in +++ b/producer/api/python/asapo_producer.pyx.in @@ -283,6 +283,24 @@ cdef class PyProducer: if callback != None: Py_XINCREF(<PyObject*>callback) + def set_stream_persistent(self, stream = 'default', uint64_t timeout_ms = 1000): + """ + :param stream: stream name + :type stream: string + :param timeout_ms: timeout in milliseconds + :type timeout_ms: int + :raises: + AsapoWrongInputError: wrong input (authorization, ...) + AsapoTimeoutError: request not finished for a given timeout + AsapoProducerError: other errors + """ + cdef Error err + cdef string b_stream = _bytes(stream) + with nogil: + err = self.c_producer.get().SetStreamPersistent(b_stream,timeout_ms) + if err: + throw_exception(err) + def delete_stream(self, stream = 'default', uint64_t timeout_ms = 1000,bool error_on_not_exist = True): """ :param stream: stream name diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index f64ef8866b5c05a30ffcda2440d33764734a85a0..ce93366bdfe80392a8af1bd31bb9369d223567aa 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -23,6 +23,7 @@ set(RECEIVER_CORE_FILES src/request_handler/request_handler_secondary_authorization.cpp src/request_handler/authorization_client.cpp src/request_handler/request_handler_db_meta_write.cpp + src/request_handler/request_handler_db_persist_stream.cpp src/request_handler/request_handler_db_stream_info.cpp src/request_handler/request_handler_db_last_stream.cpp src/request_handler/request_handler_receive_metadata.cpp diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index 6466d39a040f82cd6f8f60f64f44a0520d5a2959..39e40d9259cf0d1c6222171ab86e9952e5d645b3 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -41,6 +41,7 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { (err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) || (err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) || (err = parser.GetString("LogLevel", &log_level)) || + (err = parser.GetUInt64("MaxNumPersistedStreams", &config.max_num_persisted_streams)) || (err = parser.Embedded("Kafka").GetBool("Enabled", &config.kafka_config.enabled)); if (err) { diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index 728f6d81c698f4bfd41e98477f1c943dfd3bf334..654133b48b425296c3d17d525bd4f57601db5eaa 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -30,6 +30,7 @@ struct ReceiverConfig { ReceiverMetricsConfig metrics; std::string discovery_server; KafkaClientConfig kafka_config; + uint64_t max_num_persisted_streams; }; class ReceiverConfigManager { diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp index b40207a5b030bc104cf2ed28cb0eb8e0c1754e03..2b32c2992f556e8c309d54fa5cb349ef53e91c40 100644 --- a/receiver/src/request_handler/request_factory.cpp +++ b/receiver/src/request_handler/request_factory.cpp @@ -96,6 +96,10 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request, request->AddHandler(&request_handler_delete_stream_); break; } + case Opcode::kOpcodePersistStream: { + request->AddHandler(&request_handler_persist_stream_); + break; + } case Opcode::kOpcodeLastStream: { request->AddHandler(&request_handler_db_last_stream_); break; diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h index 75c39aa91c08427175d2154d866feac8380a7291..ca01644421dc08785e81f761eb51c687ddb3e238 100644 --- a/receiver/src/request_handler/request_factory.h +++ b/receiver/src/request_handler/request_factory.h @@ -16,6 +16,7 @@ #include "request_handler_initial_authorization.h" #include "request_handler_secondary_authorization.h" #include "request_handler_db_meta_write.h" +#include "request_handler_db_persist_stream.h" #include "request_handler_receive_data.h" #include "request_handler_receive_metadata.h" #include "request_handler_db_check_request.h" @@ -48,6 +49,7 @@ class RequestFactory { RequestHandlerMonitoring request_handler_monitoring_; RequestHandlerDbStreamInfo request_handler_db_stream_info_{kDBDataCollectionNamePrefix}; RequestHandlerDbDeleteStream request_handler_delete_stream_{kDBDataCollectionNamePrefix}; + RequestHandlerDbPersistStream request_handler_persist_stream_{kDBMetaCollectionName}; RequestHandlerDbLastStream request_handler_db_last_stream_{kDBDataCollectionNamePrefix}; RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName}; RequestHandlerDbGetMeta request_handler_db_get_meta_{kDBMetaCollectionName}; diff --git a/receiver/src/request_handler/request_handler_db_persist_stream.cpp b/receiver/src/request_handler/request_handler_db_persist_stream.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4aafc2f8b23e4384cf1484d4a2d8eeeef14b472b --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_persist_stream.cpp @@ -0,0 +1,42 @@ +#include "request_handler_db_persist_stream.h" +#include <asapo/json_parser/json_parser.h> +#include <asapo/database/db_error.h> + +#include "../receiver_logger.h" +#include "../receiver_config.h" +#include <sstream> + +namespace asapo { + + RequestHandlerDbPersistStream::RequestHandlerDbPersistStream(std::string collection_name_prefix) : RequestHandlerDb( + std::move(collection_name_prefix)) { + MaxNumPersistedStreams = (int)GetReceiverConfig()->max_num_persisted_streams; + } + + Error RequestHandlerDbPersistStream::ProcessRequest(Request* request) const { + if (auto err = RequestHandlerDb::ProcessRequest(request) ) { + return err; + } + + int stream_num; + auto err = db_client__->GetPersistedStreamsNumber(&stream_num); + + if (err) { + return DBErrorToReceiverError(std::move(err)); + } + + if (stream_num >= MaxNumPersistedStreams) { + return ReceiverErrorTemplates::kBadRequest.Generate("too many persisted streams"); + } + + err = db_client__->PersistStream(request->GetStream()); + + if (err) { + return DBErrorToReceiverError(std::move(err)); + } + + return nullptr; + } + + +} \ No newline at end of file diff --git a/receiver/src/request_handler/request_handler_db_persist_stream.h b/receiver/src/request_handler/request_handler_db_persist_stream.h new file mode 100644 index 0000000000000000000000000000000000000000..c0f094e151d751cc81de0f78e8f40d40dd6792cc --- /dev/null +++ b/receiver/src/request_handler/request_handler_db_persist_stream.h @@ -0,0 +1,19 @@ +#ifndef ASAPO_OLD_REQUEST_HANDLER_DB_PERSIST_STREAM_H +#define ASAPO_OLD_REQUEST_HANDLER_DB_PERSIST_STREAM_H + +#include "request_handler_db.h" +#include "../request.h" + +namespace asapo { + +class RequestHandlerDbPersistStream: public RequestHandlerDb { +public: + RequestHandlerDbPersistStream(std::string collection_name_prefix); + Error ProcessRequest(Request* request) const override; +private: + int MaxNumPersistedStreams; +}; + +} + +#endif //ASAPO_OLD_REQUEST_HANDLER_DB_PERSIST_STREAM_H diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index 026eeb66891154a9eb1eb3298b9a63a931ce89bf..64b531bacfa72705596f79b12f545fe291a91285 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -82,6 +82,7 @@ Error SetReceiverConfigWithError (const ReceiverConfig& config, std::string erro config_string += "," + Key("AuthorizationServer", error_field) + "\"" + config.authorization_server + "\""; config_string += "," + Key("LogLevel", error_field) + "\"" + log_level + "\""; config_string += "," + Key("Tag", error_field) + "\"" + config.tag + "\""; + config_string += "," + Key("MaxNumPersistedStreams", error_field) + "\"" + std::to_string(config.max_num_persisted_streams) + "\""; config_string += "," + Key("Kafka", error_field) + "{"; config_string += Key("Enabled", error_field) + (config.kafka_config.enabled ? "true" : "false") ; config_string += "}"; diff --git a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in index ff23cda05e003c5f20dfee4a716a61743bf8892e..e2c63ebc2004d57df20bbbc1a86c74b2ccb1a971 100644 --- a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in @@ -25,6 +25,7 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, + "MaxNumPersistedStreams":100, "LogLevel" : "debug", "Kafka" : { "Enabled" : false diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in index d7bd04f56d72552911b1f5bb46f1fa4993d4a0c3..1a176526551a493214dba2b18e1ea72450cf0b85 100644 --- a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in @@ -25,6 +25,7 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, + "MaxNumPersistedStreams":100, "LogLevel" : "debug", "Kafka" : { "Enabled" : true, diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in index 386a9db3cd48193d1d335f938f0a16272670e99e..d8c372ba31e79587b8d5d8b72e531efa62181c87 100644 --- a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in @@ -25,6 +25,7 @@ "ListenPort": {{ env "NOMAD_PORT_recv" }}, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, + "MaxNumPersistedStreams":100, "LogLevel" : "debug", "Kafka" : { "Enabled" : false diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.win.in b/tests/automatic/settings/receiver_tcp.json.tpl.win.in index de39191ca2c7fb1ed0219d28dd5c47187d811429..fa9150fc001f34b5ea7180d6614184f019aae196 100644 --- a/tests/automatic/settings/receiver_tcp.json.tpl.win.in +++ b/tests/automatic/settings/receiver_tcp.json.tpl.win.in @@ -25,6 +25,7 @@ }, "Tag": "{{ env "NOMAD_ADDR_recv" }}", "ReceiveToDiskThresholdMB":50, + "MaxNumPersistedStreams":100, "LogLevel" : "debug", "Kafka" : { "Enabled" : false