From 6598cbc92d8ef814cc2c77dfbaff5bf37ae34e58 Mon Sep 17 00:00:00 2001
From: George Sedov <radist.morse@gmail.com>
Date: Wed, 10 Aug 2022 20:00:15 +0200
Subject: [PATCH] first draft of persisted streams

---
 broker/src/asapo_broker/database/mongodb.go   | 32 +++++++++++
 broker/src/asapo_broker/server/listroutes.go  |  6 +++
 .../server/post_persist_stream.go             | 11 ++++
 broker/src/asapo_broker/server/server.go      | 14 ++++-
 common/cpp/include/asapo/common/networking.h  |  2 +
 common/cpp/include/asapo/database/database.h  |  2 +
 .../include/asapo/unittests/MockDatabase.h    | 12 +++++
 common/cpp/src/database/mongodb_client.cpp    | 53 +++++++++++++++++++
 common/cpp/src/database/mongodb_client.h      |  3 ++
 consumer/api/c/include/asapo/consumer_c.h     |  3 ++
 .../api/cpp/include/asapo/consumer/consumer.h |  6 +++
 consumer/api/cpp/src/consumer_c_glue.cpp      | 12 +++++
 consumer/api/cpp/src/consumer_impl.cpp        |  9 ++++
 consumer/api/cpp/src/consumer_impl.h          |  1 +
 consumer/api/python/asapo_consumer.pxd        |  1 +
 consumer/api/python/asapo_consumer.pyx.in     | 16 ++++++
 .../asapo/configs/asapo-receiver.json         |  1 +
 .../asapo_services/scripts/receiver.json.tpl  |  1 +
 producer/api/c/include/asapo/producer_c.h     |  1 +
 .../api/cpp/include/asapo/producer/producer.h |  9 +++-
 producer/api/cpp/src/producer_c_glue.cpp      |  8 +++
 producer/api/cpp/src/producer_impl.cpp        |  8 +++
 producer/api/cpp/src/producer_impl.h          |  1 +
 producer/api/python/asapo_producer.pxd        |  1 +
 producer/api/python/asapo_producer.pyx.in     | 18 +++++++
 receiver/CMakeLists.txt                       |  1 +
 receiver/src/receiver_config.cpp              |  1 +
 receiver/src/receiver_config.h                |  1 +
 .../src/request_handler/request_factory.cpp   |  4 ++
 .../src/request_handler/request_factory.h     |  2 +
 .../request_handler_db_persist_stream.cpp     | 42 +++++++++++++++
 .../request_handler_db_persist_stream.h       | 19 +++++++
 receiver/unittests/mock_receiver_config.cpp   |  1 +
 .../settings/receiver_fabric.json.tpl.lin.in  |  1 +
 .../settings/receiver_kafka.json.tpl.lin.in   |  1 +
 .../settings/receiver_tcp.json.tpl.lin.in     |  1 +
 .../settings/receiver_tcp.json.tpl.win.in     |  1 +
 37 files changed, 303 insertions(+), 3 deletions(-)
 create mode 100644 broker/src/asapo_broker/server/post_persist_stream.go
 create mode 100644 receiver/src/request_handler/request_handler_db_persist_stream.cpp
 create mode 100644 receiver/src/request_handler/request_handler_db_persist_stream.h

diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index 7a9d2c13d..939c79977 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -60,6 +60,11 @@ type LocationPointer struct {
 	Value   int    `bson:"current_pointer"`
 }
 
+type PersistedStreamsList struct {
+	ID      int      `json:"_id"`
+	Streams []string `json:"persisted_streams"`
+}
+
 const data_collection_name_prefix = "data_"
 const acks_collection_name_prefix = "acks_"
 const inprocess_collection_name_prefix = "inprocess_"
@@ -71,6 +76,7 @@ const last_message_field_name = "last_message"
 
 const no_session_msg = "database client not created"
 const already_connected_msg = "already connected"
+const too_many_persisted_streams = "too many persisted streams"
 
 const finish_stream_keyword = "asapo_finish_stream"
 const no_next_stream_keyword = "asapo_no_next"
@@ -969,6 +975,30 @@ func (db *Mongodb) deleteStream(request Request) ([]byte, error) {
 	return nil, err
 }
 
+func (db *Mongodb) persistStream(request Request) ([]byte, error) {
+	if db.client == nil {
+		return nil, &DBError{utils.StatusServiceUnavailable, no_session_msg}
+	}
+
+	maxNum, _ := strconv.Atoi(request.ExtraParam)
+
+	c := db.client.Database(request.DbName()).Collection(meta_collection_name)
+	q := bson.M{"_id": "persist"}
+	l := PersistedStreamsList{}
+	err := c.FindOne(context.TODO(), q, options.FindOne()).Decode(&l)
+	if err != nil && err != mongo.ErrNoDocuments {
+		return nil, err
+	}
+	if len(l.Streams) >= maxNum {
+		return nil, &DBError{utils.StatusWrongInput, too_many_persisted_streams}
+	}
+	l.Streams = append(l.Streams, request.Stream)
+
+	_, err = c.InsertOne(context.TODO(), l)
+
+	return nil, err
+}
+
 func (db *Mongodb) lastAck(request Request) ([]byte, error) {
 	c := db.client.Database(request.DbName()).Collection(acks_collection_name_prefix + request.Stream + "_" + request.GroupId)
 	opts := options.FindOne().SetSort(bson.M{"_id": -1}).SetReturnKey(true)
@@ -1098,6 +1128,8 @@ func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) {
 		return db.lastAck(request)
 	case "delete_stream":
 		return db.deleteStream(request)
+	case "persist_stream":
+		return db.persistStream(request)
 	}
 
 	return nil, errors.New("Wrong db operation: " + request.Op)
diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go
index 3c99b1a8a..885bb1800 100644
--- a/broker/src/asapo_broker/server/listroutes.go
+++ b/broker/src/asapo_broker/server/listroutes.go
@@ -29,6 +29,12 @@ var listRoutes = utils.Routes{
 		"/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/delete",
 		routeDeleteStream,
 	},
+	utils.Route{
+		"PersistStream",
+		"Post",
+		"/{apiver}/beamtime/{beamtime}/{datasource}/{stream}/persist",
+		routePersistStream,
+	},
 	utils.Route{
 		"GetLast",
 		"Get",
diff --git a/broker/src/asapo_broker/server/post_persist_stream.go b/broker/src/asapo_broker/server/post_persist_stream.go
new file mode 100644
index 000000000..11bc6f8c9
--- /dev/null
+++ b/broker/src/asapo_broker/server/post_persist_stream.go
@@ -0,0 +1,11 @@
+package server
+
+import (
+	"net/http"
+	"strconv"
+)
+
+func routePersistStream(w http.ResponseWriter, r *http.Request) {
+	maxNum := strconv.Itoa(settings.GetMaxNumPersistedStreams())
+	processRequest(w, r, "persist_stream", maxNum, false)
+}
diff --git a/broker/src/asapo_broker/server/server.go b/broker/src/asapo_broker/server/server.go
index 51800b2c5..27ab64fb9 100644
--- a/broker/src/asapo_broker/server/server.go
+++ b/broker/src/asapo_broker/server/server.go
@@ -10,6 +10,7 @@ import (
 
 const kDefaultresendInterval = 10
 const kDefaultStreamCacheUpdateIntervalMs = 100
+const kDefaultMaxNumPersistedStreams = 100
 const kDefaultTokenCacheUpdateIntervalMs = 60000
 
 var db database.Agent
@@ -20,7 +21,7 @@ type serverSettings struct {
 	PerformanceDbServer         string
 	PerformanceDbName           string
 	MonitoringServerUrl         string
-	MonitorPerformance 			bool
+	MonitorPerformance          bool
 	AuthorizationServer         string
 	Port                        int
 	LogLevel                    string
@@ -28,6 +29,7 @@ type serverSettings struct {
 	CheckResendInterval         *int
 	StreamCacheUpdateIntervalMs *int
 	TokenCacheUpdateIntervalMs  *int
+	MaxNumPersistedStreams      *int
 }
 
 func (s *serverSettings) GetTokenCacheUpdateInterval() int {
@@ -59,6 +61,14 @@ func (s *serverSettings) GetDatabaseServer() string {
 	}
 }
 
+func (s *serverSettings) GetMaxNumPersistedStreams() int {
+	if s.MaxNumPersistedStreams == nil {
+		return kDefaultMaxNumPersistedStreams
+	} else {
+		return *s.MaxNumPersistedStreams
+	}
+}
+
 var settings serverSettings
 var statistics serverStatistics
 var monitoring brokerMonitoring
@@ -93,7 +103,7 @@ func InitDB(dbAgent database.Agent) (err error) {
 }
 
 func CreateDiscoveryService() {
-	discoveryService = discovery.CreateDiscoveryService(&http.Client{},"http://" + settings.DiscoveryServer)
+	discoveryService = discovery.CreateDiscoveryService(&http.Client{}, "http://"+settings.DiscoveryServer)
 }
 
 func CleanupDB() {
diff --git a/common/cpp/include/asapo/common/networking.h b/common/cpp/include/asapo/common/networking.h
index d814a88df..548372db3 100644
--- a/common/cpp/include/asapo/common/networking.h
+++ b/common/cpp/include/asapo/common/networking.h
@@ -28,6 +28,7 @@ enum Opcode : uint8_t {
   kOpcodeGetBufferData,
   kOpcodeAuthorize,
   kOpcodeTransferMetaData,
+  kOpcodePersistStream,
   kOpcodeDeleteStream,
   kOpcodeGetMeta,
   kOpcodeCount,
@@ -43,6 +44,7 @@ inline std::string OpcodeToString(uint8_t code) {
         case kOpcodeAuthorize:return "authorize";
         case kOpcodeTransferMetaData:return "transfer metadata";
         case kOpcodeDeleteStream:return "delete stream";
+        case kOpcodePersistStream:return "persist stream";
         case kOpcodeGetMeta:return "get meta";
         default:return "unknown op";
     }
diff --git a/common/cpp/include/asapo/database/database.h b/common/cpp/include/asapo/database/database.h
index 61c32e0dc..b6acc8cdd 100644
--- a/common/cpp/include/asapo/database/database.h
+++ b/common/cpp/include/asapo/database/database.h
@@ -28,6 +28,8 @@ class Database {
     virtual Error GetLastStream(StreamInfo* info) const  = 0;
     virtual Error DeleteStream(const std::string& stream) const = 0;
     virtual Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const = 0;
+    virtual Error GetPersistedStreamsNumber(int* res) const = 0;
+    virtual Error PersistStream(const std::string& stream_name) const = 0;
 
     virtual ~Database() = default;
 };
diff --git a/common/cpp/include/asapo/unittests/MockDatabase.h b/common/cpp/include/asapo/unittests/MockDatabase.h
index 887653507..97e733dec 100644
--- a/common/cpp/include/asapo/unittests/MockDatabase.h
+++ b/common/cpp/include/asapo/unittests/MockDatabase.h
@@ -70,6 +70,18 @@ class MockDatabase : public Database {
         return Error{DeleteStream_t(stream)};
     }
 
+    MOCK_METHOD(ErrorInterface *, GetPersistedStreamsNumber_t, (int*), (const));
+
+    Error GetPersistedStreamsNumber(int* res) const override {
+        return Error{GetPersistedStreamsNumber_t(res)};
+    }
+
+    MOCK_METHOD(ErrorInterface *, PersistStream_t, (const std::string&), (const));
+
+    Error PersistStream(const std::string& stream_name) const override {
+        return Error{PersistStream_t(stream_name)};
+    }
+
     MOCK_METHOD(ErrorInterface *, GetLastStream_t, (StreamInfo*), (const));
 
 
diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp
index bfb0d8ffb..c15dda360 100644
--- a/common/cpp/src/database/mongodb_client.cpp
+++ b/common/cpp/src/database/mongodb_client.cpp
@@ -819,6 +819,7 @@ Error MongoDBClient::DeleteStream(const std::string& stream) const {
         DeleteDocumentsInCollection("current_location", querystr);
     }
     DeleteDocumentsInCollection("meta", "^" + EscapeQuery(stream_encoded) + "$");
+    ModifyPersistedStreamsList(stream, false);
     return err;
 }
 
@@ -837,4 +838,56 @@ Error MongoDBClient::GetMetaFromDb(const std::string& collection, const std::str
     return nullptr;
 }
 
+Error MongoDBClient::GetPersistedStreamsNumber(int* res) const {
+    UpdateCurrentCollectionIfNeeded("meta");
+    //db.meta.aggregate([{$match: {"_id":"persist"}},{$project: {size: {$size: "$persisted_streams"}}}])
+    auto pipeline = BCON_NEW("pipeline", "[",
+                                              "{", "$match", "{", "_id", "persist", "}", "}",
+                                              "{", "$project", "{", "size", "{", "$size", "$persisted_streams", "}", "}", "}",
+                                         "]");
+    auto cursor = mongoc_collection_aggregate(current_collection_, MONGOC_QUERY_NONE, pipeline, NULL, NULL);
+    *res = 0;
+    const bson_t* doc;
+    bson_iter_t iter;
+    while (mongoc_cursor_next(cursor, &doc)) {
+        if (bson_iter_init_find(&iter, doc, "size") && BSON_ITER_HOLDS_INT32 (&iter)) {
+            *res = bson_iter_int32(&iter);
+        }
+    }
+
+    Error err = nullptr;
+    bson_error_t mongo_err;
+    if (mongoc_cursor_error(cursor, &mongo_err)) {
+        err = DBErrorTemplates::kDBError.Generate(mongo_err.message);
+    }
+
+    mongoc_cursor_destroy(cursor);
+    bson_destroy(pipeline);
+
+    return err;
+}
+
+Error MongoDBClient::PersistStream(const std::string& stream_name) const {
+    return ModifyPersistedStreamsList(stream_name, true);
+}
+
+Error MongoDBClient::ModifyPersistedStreamsList(const std::string& stream_name, bool add) const {
+    UpdateCurrentCollectionIfNeeded("meta");
+    //db.meta.updateOne({"_id" : "persist"},{$push : {"persisted_streams" : "default"}},{"upsert": true})
+    bson_t* selector = BCON_NEW ("_id", "persist");
+    bson_t* update = BCON_NEW (add ? "$addToSet" : "$pull", "{", "persisted_streams", BCON_UTF8(stream_name.c_str()), "}");
+    bson_t* opts = BCON_NEW ("upsert", BCON_BOOL(add));
+
+    bson_error_t mongo_err;
+    Error err = nullptr;
+    if (!mongoc_collection_update_one(current_collection_, selector, update, opts, NULL, &mongo_err)) {
+        err = DBErrorTemplates::kInsertError.Generate(mongo_err.message);
+    }
+    bson_destroy(selector);
+    bson_destroy(update);
+    bson_destroy(opts);
+
+    return err;
+}
+
 }
diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h
index 90fbe73a6..d2a0621f9 100644
--- a/common/cpp/src/database/mongodb_client.h
+++ b/common/cpp/src/database/mongodb_client.h
@@ -59,6 +59,8 @@ class MongoDBClient final : public Database {
     Error GetLastStream(StreamInfo* info) const override;
     Error DeleteStream(const std::string& stream) const override;
     Error GetMetaFromDb(const std::string& collection, const std::string& id, std::string* res) const override;
+    Error GetPersistedStreamsNumber(int* res) const override;
+    Error PersistStream(const std::string& stream_name) const override;
     Error GetNextId(const std::string& collection, uint64_t* id) const;
     ~MongoDBClient() override;
   private:
@@ -86,6 +88,7 @@ class MongoDBClient final : public Database {
     Error DeleteCollections(const std::string& prefix) const;
     Error DeleteDocumentsInCollection(const std::string& collection_name, const std::string& querystr) const;
     Error InsertWithAutoId(const MessageMeta& file, uint64_t* id_inserted) const;
+    Error ModifyPersistedStreamsList(const std::string& stream_name, bool add) const;
 };
 
 struct TransactionContext {
diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h
index 80c1f841b..1c12207cb 100644
--- a/consumer/api/c/include/asapo/consumer_c.h
+++ b/consumer/api/c/include/asapo/consumer_c.h
@@ -116,6 +116,9 @@ int asapo_consumer_delete_stream(AsapoConsumerHandle consumer,
                                  const char* stream,
                                  AsapoBool delete_meta,
                                  AsapoBool error_on_not_exist, AsapoErrorHandle* error);
+int asapo_consumer_set_stream_persistent(AsapoConsumerHandle consumer,
+                                         const char* stream,
+                                         AsapoErrorHandle* error);
 int64_t asapo_consumer_get_current_size(AsapoConsumerHandle consumer,
                                         const char* stream,
                                         AsapoErrorHandle* error);
diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h
index 2527ca21c..271570560 100644
--- a/consumer/api/cpp/include/asapo/consumer/consumer.h
+++ b/consumer/api/cpp/include/asapo/consumer/consumer.h
@@ -101,6 +101,12 @@ class Consumer {
     */
     virtual Error DeleteStream(std::string stream, DeleteStreamOptions options) = 0;
 
+    //! Marks stream persistent
+    /*!
+      \param stream - Name of the stream to be marked persistent
+      \return Error - Will be nullptr on success
+    */
+    virtual Error SetStreamPersistent(std::string stream) = 0 ;
 
     //! Get current number of messages in stream
     /*!
diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp
index 1fb7f6dee..1374b97ac 100644
--- a/consumer/api/cpp/src/consumer_c_glue.cpp
+++ b/consumer/api/cpp/src/consumer_c_glue.cpp
@@ -286,6 +286,18 @@ extern "C" {
         auto err = consumer->handle->DeleteStream(stream, opt);
         return process_error(error, std::move(err));
     }
+//! wraps asapo::Consumer::SetStreamsPersistent()
+/// \copydoc asapo::Consumer::SetStreamsPersistent()
+/// \param[in] consumer the consumer that is acted upon
+/// \param[in] stream the name of the stream to be persisted
+/// \param[out] error NULL or pointer to error handle to be set
+/// \return 0 if completed successfully, -1 in case of error.
+int asapo_consumer_set_stream_persistent(AsapoConsumerHandle consumer,
+                                         const char* stream,
+                                         AsapoErrorHandle* error) {
+        auto err = consumer->handle->SetStreamPersistent(stream);
+        return process_error(error, std::move(err));
+    }
 //! wraps asapo::Consumer::GetCurrentSize()
 /// \copydoc asapo::Consumer::GetCurrentSize()
 /// \param[in] consumer the consumer that is acted upon
diff --git a/consumer/api/cpp/src/consumer_impl.cpp b/consumer/api/cpp/src/consumer_impl.cpp
index 76f536cbf..a6cfacf60 100644
--- a/consumer/api/cpp/src/consumer_impl.cpp
+++ b/consumer/api/cpp/src/consumer_impl.cpp
@@ -1060,6 +1060,15 @@ Error ConsumerImpl::DeleteStream(std::string stream, DeleteStreamOptions options
     return err;
 }
 
+Error ConsumerImpl::SetStreamPersistent(std::string stream) {
+    RequestInfo ri = CreateBrokerApiRequest(std::move(stream), "", "persist");
+    ri.post = true;
+    Error err;
+    BrokerRequestWithTimeout(ri, &err);
+    return err;
+}
+
+
 RequestInfo ConsumerImpl::CreateBrokerApiRequest(std::string stream, std::string group, std::string suffix) const {
     auto stream_encoded = httpclient__->UrlEscape(std::move(stream));
     auto group_encoded = group.size() > 0 ? httpclient__->UrlEscape(std::move(group)) : "";
diff --git a/consumer/api/cpp/src/consumer_impl.h b/consumer/api/cpp/src/consumer_impl.h
index 6928e7cf2..8ac94d281 100644
--- a/consumer/api/cpp/src/consumer_impl.h
+++ b/consumer/api/cpp/src/consumer_impl.h
@@ -89,6 +89,7 @@ class ConsumerImpl final : public asapo::Consumer {
 
     Error GetVersionInfo(std::string* client_info, std::string* server_info, bool* supported) override;
     Error DeleteStream(std::string stream, DeleteStreamOptions options) override;
+    Error SetStreamPersistent(std::string stream) override;
     void SetTimeout(uint64_t timeout_ms) override;
     void ForceNoRdma() override;
     Error DisableMonitoring(bool disable) override;
diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd
index 140e8950e..1715bf993 100644
--- a/consumer/api/python/asapo_consumer.pxd
+++ b/consumer/api/python/asapo_consumer.pxd
@@ -98,6 +98,7 @@ cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
         void InterruptCurrentOperation()
         Error GetVersionInfo(string* client_info,string* server_info, bool* supported)
         Error DeleteStream(string stream, DeleteStreamOptions options)
+        Error SetStreamPersistent(string stream)
 
 cdef extern from "asapo/asapo_consumer.h" namespace "asapo" nogil:
     cdef cppclass ConsumerFactory:
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index de490719b..cee15bdbd 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -328,6 +328,22 @@ cdef class PyConsumer:
         if err:
             throw_exception(err)
 
+    def set_stream_persistent(self, stream = 'default'):
+        """
+         :param stream: stream name
+         :type stream: string
+         :raises:
+            AsapoWrongInputError: wrong input (authorization, ...)
+            AsapoTimeoutError: request not finished for a given timeout
+            AsapoProducerError: other errors
+        """
+        cdef Error err
+        cdef string b_stream = _bytes(stream)
+        with nogil:
+            err = self.c_consumer.get().SetStreamPersistent(b_stream)
+        if err:
+            throw_exception(err)
+
     def get_unacknowledged_messages(self, group_id, uint64_t from_id = 0, uint64_t to_id = 0, stream = "default"):
         cdef Error err
         cdef string b_group_id = _bytes(group_id)
diff --git a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json
index 7ae6db9d0..fdb6b9d2f 100644
--- a/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json
+++ b/deploy/asapo_helm_chart/asapo/configs/asapo-receiver.json
@@ -25,6 +25,7 @@
   "Tag": "receiver",
   "ReceiveToDiskThresholdMB": {{ .Values.ownServices.receiver.receiveToDiskThresholdMB }},
   "LogLevel": "info",
+  "MaxNumPersistedStreams": 100,
   "Kafka": {
     "Enabled": true,
     "KafkaClient": {
diff --git a/deploy/asapo_services/scripts/receiver.json.tpl b/deploy/asapo_services/scripts/receiver.json.tpl
index 5ca82a26d..5997fe677 100644
--- a/deploy/asapo_services/scripts/receiver.json.tpl
+++ b/deploy/asapo_services/scripts/receiver.json.tpl
@@ -26,6 +26,7 @@
   "Tag": "{{ env "attr.unique.hostname" }}",
   "ReceiveToDiskThresholdMB": {{ env "NOMAD_META_receiver_receive_to_disk_threshold" }},
   "LogLevel": "{{ keyOrDefault "receiver_log_level" "info" }}",
+  "MaxNumPersistedStreams": 100,
   "Kafka": {
     "Enabled": {{ env "NOMAD_META_receiver_kafka_enabled" }},
     "KafkaClient": {
diff --git a/producer/api/c/include/asapo/producer_c.h b/producer/api/c/include/asapo/producer_c.h
index ec90bdcb7..02f62eb1b 100644
--- a/producer/api/c/include/asapo/producer_c.h
+++ b/producer/api/c/include/asapo/producer_c.h
@@ -29,6 +29,7 @@ enum AsapoOpcode {
     kOpcodeGetBufferData,
     kOpcodeAuthorize,
     kOpcodeTransferMetaData,
+    kOpcodePersistStream,
     kOpcodeDeleteStream,
     kOpcodeGetMeta,
     kOpcodeCount
diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h
index 0dfdf60e4..e3290b4a1 100644
--- a/producer/api/cpp/include/asapo/producer/producer.h
+++ b/producer/api/cpp/include/asapo/producer/producer.h
@@ -111,7 +111,7 @@ class Producer {
 
     //! Marks stream finished
     /*!
-      \param stream - Name of the stream to makr finished
+      \param stream - Name of the stream to mark finished
       \param last_id - ID of the last message in stream
       \param next_stream - Name of the next stream (empty if not set)
       \return Error - Will be nullptr on success
@@ -119,6 +119,13 @@ class Producer {
     virtual Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream,
                                          RequestCallback callback) = 0 ;
 
+    //! Marks stream persistent
+    /*!
+      \param stream - Name of the stream to be marked persistent
+      \param timeout_ms - operation timeout in milliseconds
+      \return Error - Will be nullptr on success
+    */
+    virtual Error SetStreamPersistent(std::string stream, uint64_t timeout_ms) = 0 ;
 
     //! Sends beamtime metadata to the receiver
     /*!
diff --git a/producer/api/cpp/src/producer_c_glue.cpp b/producer/api/cpp/src/producer_c_glue.cpp
index 40920cb4a..ba65c9226 100644
--- a/producer/api/cpp/src/producer_c_glue.cpp
+++ b/producer/api/cpp/src/producer_c_glue.cpp
@@ -24,6 +24,7 @@ extern "C" {
                   kOpcodeGetBufferData == asapo::Opcode::kOpcodeGetBufferData&&
                   kOpcodeAuthorize == asapo::Opcode::kOpcodeAuthorize&&
                   kOpcodeTransferMetaData == asapo::Opcode::kOpcodeTransferMetaData&&
+                  kOpcodePersistStream == asapo::Opcode::kOpcodePersistStream&&
                   kOpcodeDeleteStream == asapo::Opcode::kOpcodeDeleteStream&&
                   kOpcodeGetMeta == asapo::Opcode::kOpcodeGetMeta&&
                   kOpcodeCount == asapo::Opcode::kOpcodeCount,
@@ -109,6 +110,13 @@ extern "C" {
         auto result = producer->handle->GetBeamtimeMeta(timeout_ms, &err);
         return handle_or_null_t(result, error, std::move(err));
     }
+    int asapo_producer_set_stream_persistent(AsapoProducerHandle producer,
+                                             const char* stream,
+                                             uint64_t timeout_ms,
+                                             AsapoErrorHandle* error) {
+        auto err = producer->handle->SetStreamPersistent(stream, timeout_ms);
+        return process_error(error, std::move(err));
+    }
     int asapo_producer_delete_stream(AsapoProducerHandle producer,
                                      const char* stream,
                                      uint64_t timeout_ms,
diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp
index abe31f8fa..1c0a13a8e 100644
--- a/producer/api/cpp/src/producer_impl.cpp
+++ b/producer/api/cpp/src/producer_impl.cpp
@@ -240,6 +240,14 @@ Error ProducerImpl::SendStreamFinishedFlag(std::string stream, uint64_t last_id,
     return Send(message_header, std::move(stream), nullptr, "", IngestModeFlags::kTransferMetaDataOnly, callback, true);
 }
 
+Error ProducerImpl::SetStreamPersistent(std::string stream, uint64_t timeout_ms) {
+    GenericRequestHeader request_header{kOpcodePersistStream, 0, 0, 0,
+                                        stream + ".persist", stream};
+    Error err;
+    BlockingRequest(std::move(request_header), timeout_ms, &err);
+    return err;
+}
+
 void ProducerImpl::SetLogLevel(LogLevel level) {
     log__->SetLogLevel(level);
 }
diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h
index 3e20d453b..b9182a983 100644
--- a/producer/api/cpp/src/producer_impl.h
+++ b/producer/api/cpp/src/producer_impl.h
@@ -57,6 +57,7 @@ class ProducerImpl : public Producer {
                    RequestCallback callback) override;
     Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream,
                                  RequestCallback callback) override;
+    Error SetStreamPersistent(std::string stream, uint64_t timeout_ms) override;
 
     Error DeleteStream(std::string stream, uint64_t timeout_ms, DeleteStreamOptions options) const override;
 
diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd
index 00898e561..47b30ff30 100644
--- a/producer/api/python/asapo_producer.pxd
+++ b/producer/api/python/asapo_producer.pxd
@@ -120,6 +120,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil:
         void SetRequestsQueueLimits(uint64_t size, uint64_t volume)
         Error WaitRequestsFinished(uint64_t timeout_ms)
         Error SendStreamFinishedFlag(string stream, uint64_t last_id, string next_stream, RequestCallback callback)
+        Error SetStreamPersistent(string stream, uint64_t timeout_ms)
         StreamInfo GetStreamInfo(string stream, uint64_t timeout_ms, Error* err)
         StreamInfo GetLastStream(uint64_t timeout_ms, Error* err)
         Error GetVersionInfo(string* client_info,string* server_info, bool* supported)
diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index 34670d1df..ef9942af1 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -283,6 +283,24 @@ cdef class PyProducer:
         if callback != None:
             Py_XINCREF(<PyObject*>callback)
 
+    def set_stream_persistent(self, stream = 'default', uint64_t timeout_ms = 1000):
+        """
+         :param stream: stream name
+         :type stream: string
+         :param timeout_ms: timeout in milliseconds
+         :type timeout_ms: int
+         :raises:
+            AsapoWrongInputError: wrong input (authorization, ...)
+            AsapoTimeoutError: request not finished for a given timeout
+            AsapoProducerError: other errors
+        """
+        cdef Error err
+        cdef string b_stream = _bytes(stream)
+        with nogil:
+            err = self.c_producer.get().SetStreamPersistent(b_stream,timeout_ms)
+        if err:
+            throw_exception(err)
+
     def delete_stream(self, stream = 'default', uint64_t timeout_ms = 1000,bool error_on_not_exist = True):
         """
          :param stream: stream name
diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt
index f64ef8866..ce93366bd 100644
--- a/receiver/CMakeLists.txt
+++ b/receiver/CMakeLists.txt
@@ -23,6 +23,7 @@ set(RECEIVER_CORE_FILES
         src/request_handler/request_handler_secondary_authorization.cpp
         src/request_handler/authorization_client.cpp
         src/request_handler/request_handler_db_meta_write.cpp
+        src/request_handler/request_handler_db_persist_stream.cpp
         src/request_handler/request_handler_db_stream_info.cpp
         src/request_handler/request_handler_db_last_stream.cpp
         src/request_handler/request_handler_receive_metadata.cpp
diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp
index 6466d39a0..39e40d925 100644
--- a/receiver/src/receiver_config.cpp
+++ b/receiver/src/receiver_config.cpp
@@ -41,6 +41,7 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
     (err = parser.Embedded("Metrics").GetBool("Expose", &config.metrics.expose)) ||
     (err = parser.Embedded("Metrics").GetUInt64("ListenPort", &config.metrics.listen_port)) ||
     (err = parser.GetString("LogLevel", &log_level)) ||
+    (err = parser.GetUInt64("MaxNumPersistedStreams", &config.max_num_persisted_streams)) ||
     (err = parser.Embedded("Kafka").GetBool("Enabled", &config.kafka_config.enabled));
 
     if (err) {
diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h
index 728f6d81c..654133b48 100644
--- a/receiver/src/receiver_config.h
+++ b/receiver/src/receiver_config.h
@@ -30,6 +30,7 @@ struct ReceiverConfig {
     ReceiverMetricsConfig metrics;
     std::string discovery_server;
     KafkaClientConfig kafka_config;
+    uint64_t max_num_persisted_streams;
 };
 
 class ReceiverConfigManager {
diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp
index b40207a5b..2b32c2992 100644
--- a/receiver/src/request_handler/request_factory.cpp
+++ b/receiver/src/request_handler/request_factory.cpp
@@ -96,6 +96,10 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request,
         request->AddHandler(&request_handler_delete_stream_);
         break;
     }
+    case Opcode::kOpcodePersistStream: {
+        request->AddHandler(&request_handler_persist_stream_);
+        break;
+    }
     case Opcode::kOpcodeLastStream: {
         request->AddHandler(&request_handler_db_last_stream_);
         break;
diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h
index 75c39aa91..ca0164442 100644
--- a/receiver/src/request_handler/request_factory.h
+++ b/receiver/src/request_handler/request_factory.h
@@ -16,6 +16,7 @@
 #include "request_handler_initial_authorization.h"
 #include "request_handler_secondary_authorization.h"
 #include "request_handler_db_meta_write.h"
+#include "request_handler_db_persist_stream.h"
 #include "request_handler_receive_data.h"
 #include "request_handler_receive_metadata.h"
 #include "request_handler_db_check_request.h"
@@ -48,6 +49,7 @@ class RequestFactory {
     RequestHandlerMonitoring request_handler_monitoring_;
     RequestHandlerDbStreamInfo request_handler_db_stream_info_{kDBDataCollectionNamePrefix};
     RequestHandlerDbDeleteStream request_handler_delete_stream_{kDBDataCollectionNamePrefix};
+    RequestHandlerDbPersistStream request_handler_persist_stream_{kDBMetaCollectionName};
     RequestHandlerDbLastStream request_handler_db_last_stream_{kDBDataCollectionNamePrefix};
     RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName};
     RequestHandlerDbGetMeta request_handler_db_get_meta_{kDBMetaCollectionName};
diff --git a/receiver/src/request_handler/request_handler_db_persist_stream.cpp b/receiver/src/request_handler/request_handler_db_persist_stream.cpp
new file mode 100644
index 000000000..4aafc2f8b
--- /dev/null
+++ b/receiver/src/request_handler/request_handler_db_persist_stream.cpp
@@ -0,0 +1,42 @@
+#include "request_handler_db_persist_stream.h"
+#include <asapo/json_parser/json_parser.h>
+#include <asapo/database/db_error.h>
+
+#include "../receiver_logger.h"
+#include "../receiver_config.h"
+#include <sstream>
+
+namespace asapo {
+
+    RequestHandlerDbPersistStream::RequestHandlerDbPersistStream(std::string collection_name_prefix) : RequestHandlerDb(
+            std::move(collection_name_prefix)) {
+        MaxNumPersistedStreams = (int)GetReceiverConfig()->max_num_persisted_streams;
+    }
+
+    Error RequestHandlerDbPersistStream::ProcessRequest(Request* request) const {
+        if (auto err = RequestHandlerDb::ProcessRequest(request) ) {
+            return err;
+        }
+
+        int stream_num;
+        auto err = db_client__->GetPersistedStreamsNumber(&stream_num);
+
+        if (err) {
+            return DBErrorToReceiverError(std::move(err));
+        }
+
+        if (stream_num >= MaxNumPersistedStreams) {
+            return ReceiverErrorTemplates::kBadRequest.Generate("too many persisted streams");
+        }
+
+        err = db_client__->PersistStream(request->GetStream());
+
+        if (err) {
+            return DBErrorToReceiverError(std::move(err));
+        }
+
+        return nullptr;
+    }
+
+
+}
\ No newline at end of file
diff --git a/receiver/src/request_handler/request_handler_db_persist_stream.h b/receiver/src/request_handler/request_handler_db_persist_stream.h
new file mode 100644
index 000000000..c0f094e15
--- /dev/null
+++ b/receiver/src/request_handler/request_handler_db_persist_stream.h
@@ -0,0 +1,19 @@
+#ifndef ASAPO_OLD_REQUEST_HANDLER_DB_PERSIST_STREAM_H
+#define ASAPO_OLD_REQUEST_HANDLER_DB_PERSIST_STREAM_H
+
+#include "request_handler_db.h"
+#include "../request.h"
+
+namespace asapo {
+
+class RequestHandlerDbPersistStream: public RequestHandlerDb {
+public:
+    RequestHandlerDbPersistStream(std::string collection_name_prefix);
+    Error ProcessRequest(Request* request) const override;
+private:
+    int MaxNumPersistedStreams;
+};
+
+}
+
+#endif //ASAPO_OLD_REQUEST_HANDLER_DB_PERSIST_STREAM_H
diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp
index 026eeb668..64b531bac 100644
--- a/receiver/unittests/mock_receiver_config.cpp
+++ b/receiver/unittests/mock_receiver_config.cpp
@@ -82,6 +82,7 @@ Error SetReceiverConfigWithError (const ReceiverConfig& config, std::string erro
     config_string += "," +  Key("AuthorizationServer", error_field) + "\"" + config.authorization_server + "\"";
     config_string += "," +  Key("LogLevel", error_field) + "\"" + log_level + "\"";
     config_string += "," +  Key("Tag", error_field) + "\"" + config.tag + "\"";
+    config_string += "," +  Key("MaxNumPersistedStreams", error_field) + "\"" + std::to_string(config.max_num_persisted_streams) + "\"";
     config_string += "," +  Key("Kafka", error_field) + "{";
     config_string += Key("Enabled", error_field) + (config.kafka_config.enabled ? "true" : "false") ;
     config_string += "}";
diff --git a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in
index ff23cda05..e2c63ebc2 100644
--- a/tests/automatic/settings/receiver_fabric.json.tpl.lin.in
+++ b/tests/automatic/settings/receiver_fabric.json.tpl.lin.in
@@ -25,6 +25,7 @@
   "ListenPort": {{ env "NOMAD_PORT_recv" }},
   "Tag": "{{ env "NOMAD_ADDR_recv" }}",
   "ReceiveToDiskThresholdMB":50,
+  "MaxNumPersistedStreams":100,
   "LogLevel" : "debug",
   "Kafka" : {
     "Enabled" : false
diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in
index d7bd04f56..1a1765265 100644
--- a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in
+++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in
@@ -25,6 +25,7 @@
   "ListenPort": {{ env "NOMAD_PORT_recv" }},
   "Tag": "{{ env "NOMAD_ADDR_recv" }}",
   "ReceiveToDiskThresholdMB":50,
+  "MaxNumPersistedStreams":100,
   "LogLevel" : "debug",
   "Kafka" : {
     "Enabled" : true,
diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in
index 386a9db3c..d8c372ba3 100644
--- a/tests/automatic/settings/receiver_tcp.json.tpl.lin.in
+++ b/tests/automatic/settings/receiver_tcp.json.tpl.lin.in
@@ -25,6 +25,7 @@
   "ListenPort": {{ env "NOMAD_PORT_recv" }},
   "Tag": "{{ env "NOMAD_ADDR_recv" }}",
   "ReceiveToDiskThresholdMB":50,
+  "MaxNumPersistedStreams":100,
   "LogLevel" : "debug",
   "Kafka" : {
     "Enabled" : false
diff --git a/tests/automatic/settings/receiver_tcp.json.tpl.win.in b/tests/automatic/settings/receiver_tcp.json.tpl.win.in
index de39191ca..fa9150fc0 100644
--- a/tests/automatic/settings/receiver_tcp.json.tpl.win.in
+++ b/tests/automatic/settings/receiver_tcp.json.tpl.win.in
@@ -25,6 +25,7 @@
   },
   "Tag": "{{ env "NOMAD_ADDR_recv" }}",
   "ReceiveToDiskThresholdMB":50,
+  "MaxNumPersistedStreams":100,
   "LogLevel" : "debug",
   "Kafka" : {
     "Enabled" : false
-- 
GitLab