From ec1bb032e6f392f446c7bf0a1378f175806b0eaa Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 27 Nov 2019 14:32:55 +0100
Subject: [PATCH] receiver updated for substreams

---
 broker/src/asapo_broker/database/mongodb.go   |  2 +-
 common/cpp/include/database/database.h        | 12 ++--
 common/cpp/include/unittests/MockDatabase.h   | 25 ++++----
 common/cpp/src/database/mongodb_client.cpp    | 61 ++++++++++++-------
 common/cpp/src/database/mongodb_client.h      | 18 +++---
 .../folder_to_db/src/folder_db_importer.cpp   |  4 +-
 .../unittests/test_folder_to_db.cpp           | 32 +++++-----
 .../consumer/getnext_broker/check_linux.sh    |  2 +-
 .../consumer/getnext_broker/check_windows.bat |  2 +-
 .../getnext_broker_python/check_linux.sh      |  2 +-
 .../getnext_broker_python/check_windows.bat   |  2 +-
 examples/pipeline/in_to_out/check_linux.sh    |  6 +-
 examples/pipeline/in_to_out/check_windows.bat |  6 +-
 .../pipeline/in_to_out_python/check_linux.sh  |  4 +-
 .../in_to_out_python/check_windows.bat        |  4 +-
 receiver/src/request.cpp                      |  4 ++
 receiver/src/request.h                        |  1 +
 receiver/src/request_factory.h                |  2 +-
 receiver/src/request_handler_db.cpp           |  6 +-
 receiver/src/request_handler_db.h             |  4 +-
 .../src/request_handler_db_meta_write.cpp     |  4 +-
 receiver/src/request_handler_db_write.cpp     | 13 ++--
 receiver/src/request_handler_db_write.h       |  2 +-
 receiver/unittests/receiver_mocking.h         |  1 +
 receiver/unittests/test_request.cpp           | 15 +++++
 .../unittests/test_request_handler_db.cpp     | 12 ++--
 .../test_request_handler_db_meta_writer.cpp   |  5 +-
 .../test_request_handler_db_writer.cpp        | 17 ++++--
 .../automatic/broker/get_last/check_linux.sh  |  8 +--
 .../broker/get_last/check_windows.bat         |  8 +--
 .../automatic/broker/get_next/check_linux.sh  |  4 +-
 .../broker/get_next/check_windows.bat         |  4 +-
 .../consumer_python_memleak/check_linux.sh    |  2 +-
 .../consumer/consumer_api/check_linux.sh      |  4 +-
 .../consumer/consumer_api/check_windows.bat   |  4 +-
 .../consumer_api_python/check_windows.bat     |  4 +-
 .../consumer/folder_to_db/check_linux.sh      |  4 +-
 .../consumer/folder_to_db/check_windows.bat   |  4 +-
 .../next_multithread_broker/check_linux.sh    |  2 +-
 .../next_multithread_broker/check_windows.bat |  2 +-
 .../receiver_mongo_restart/check_linux.sh     |  6 +-
 .../mongo_db/connect/connect_mongodb.cpp      |  4 +-
 .../mongo_db/insert/insert_mongodb.cpp        |  6 +-
 .../insert_dataset/insert_dataset_mongodb.cpp |  8 +--
 .../mongo_db/upsert/upsert_mongodb.cpp        |  6 +-
 .../transfer_datasets/check_linux.sh          |  2 +-
 .../transfer_datasets/check_windows.bat       |  6 +-
 .../check_linux.sh                            |  2 +-
 .../check_windows.bat                         |  2 +-
 49 files changed, 202 insertions(+), 158 deletions(-)

diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index 41ad42053..5cc0ad09f 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -26,7 +26,7 @@ type LocationPointer struct {
 	Value   int    `bson:"current_pointer"`
 }
 
-const data_collection_name = "data"
+const data_collection_name = "data_default"
 const meta_collection_name = "meta"
 const pointer_collection_name = "current_location"
 const pointer_field_name = "current_pointer"
diff --git a/common/cpp/include/database/database.h b/common/cpp/include/database/database.h
index 550e06d37..8d0bca097 100644
--- a/common/cpp/include/database/database.h
+++ b/common/cpp/include/database/database.h
@@ -8,17 +8,17 @@
 
 namespace asapo {
 
-constexpr char kDBDataCollectionName[] = "data";
+constexpr char kDBDataCollectionNamePrefix[] = "data";
 constexpr char kDBMetaCollectionName[] = "meta";
 
 
 class Database {
   public:
-    virtual Error Connect(const std::string& address, const std::string& database,
-                          const std::string& collection ) = 0;
-    virtual Error Insert(const FileInfo& file, bool ignore_duplicates) const = 0;
-    virtual Error Upsert(uint64_t id, const uint8_t* data, uint64_t size) const = 0;
-    virtual Error InsertAsSubset(const FileInfo& file, uint64_t subset_id, uint64_t subset_size,
+    virtual Error Connect(const std::string& address, const std::string& database) = 0;
+    virtual Error Insert(const std::string& collection, const FileInfo& file, bool ignore_duplicates) const = 0;
+    virtual Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const = 0;
+    virtual Error InsertAsSubset(const std::string& collection, const FileInfo& file, uint64_t subset_id,
+                                 uint64_t subset_size,
                                  bool ignore_duplicates) const = 0;
 
     virtual ~Database() = default;
diff --git a/common/cpp/include/unittests/MockDatabase.h b/common/cpp/include/unittests/MockDatabase.h
index 94c611d5f..85ff272d8 100644
--- a/common/cpp/include/unittests/MockDatabase.h
+++ b/common/cpp/include/unittests/MockDatabase.h
@@ -11,33 +11,32 @@ namespace asapo {
 
 class MockDatabase : public Database {
   public:
-    Error Connect(const std::string& address, const std::string& database,
-                  const std::string& collection ) override {
-        return Error{Connect_t(address, database, collection)};
+    Error Connect(const std::string& address, const std::string& database) override {
+        return Error{Connect_t(address, database)};
 
     }
-    Error Insert(const FileInfo& file, bool ignore_duplicates) const override {
-        return Error{Insert_t(file, ignore_duplicates)};
+    Error Insert(const std::string& collection, const FileInfo& file, bool ignore_duplicates) const override {
+        return Error{Insert_t(collection, file, ignore_duplicates)};
     }
 
-    Error InsertAsSubset(const FileInfo& file, uint64_t subset_id,
+    Error InsertAsSubset(const std::string& collection, const FileInfo& file, uint64_t subset_id,
                          uint64_t subset_size, bool ignore_duplicates) const override {
-        return Error{InsertAsSubset_t(file, subset_id, subset_size, ignore_duplicates)};
+        return Error{InsertAsSubset_t(collection, file, subset_id, subset_size, ignore_duplicates)};
     }
 
 
-    MOCK_METHOD3(Connect_t, ErrorInterface * (const std::string&, const std::string&, const std::string&));
-    MOCK_CONST_METHOD2(Insert_t, ErrorInterface * (const FileInfo&, bool));
+    MOCK_METHOD2(Connect_t, ErrorInterface * (const std::string&, const std::string&));
+    MOCK_CONST_METHOD3(Insert_t, ErrorInterface * (const std::string&, const FileInfo&, bool));
 
 
-    MOCK_CONST_METHOD4(InsertAsSubset_t, ErrorInterface * (const FileInfo&, uint64_t, uint64_t, bool));
+    MOCK_CONST_METHOD5(InsertAsSubset_t, ErrorInterface * (const std::string&, const FileInfo&, uint64_t, uint64_t, bool));
 
 
-    Error Upsert(uint64_t id, const uint8_t* data, uint64_t size) const override {
-        return Error{Upsert_t(id, data, size)};
+    Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const override {
+        return Error{Upsert_t(collection, id, data, size)};
 
     }
-    MOCK_CONST_METHOD3(Upsert_t, ErrorInterface * (uint64_t id, const uint8_t* data, uint64_t size));
+    MOCK_CONST_METHOD4(Upsert_t, ErrorInterface * (const std::string&, uint64_t id, const uint8_t* data, uint64_t size));
 
 
 
diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp
index 66c8b02cf..9380bdcd0 100644
--- a/common/cpp/src/database/mongodb_client.cpp
+++ b/common/cpp/src/database/mongodb_client.cpp
@@ -40,19 +40,27 @@ Error MongoDBClient::InitializeClient(const string& address) {
     if (client_ == nullptr) {
         return DBErrorTemplates::kBadAddress.Generate();
     }
+
+    write_concern_ = mongoc_write_concern_new ();
+    mongoc_write_concern_set_w (write_concern_, MONGOC_WRITE_CONCERN_W_DEFAULT);
+    mongoc_write_concern_set_journal (write_concern_, true);
+
     return nullptr;
 
 }
 
-void MongoDBClient::InitializeCollection(const string& database_name,
-                                         const string& collection_name) {
-    collection_ = mongoc_client_get_collection (client_, database_name.c_str(),
-                                                collection_name.c_str());
+void MongoDBClient::UpdateCurrentCollectionIfNeeded(const string& collection_name) const {
+    if (collection_name == current_collection_name_) {
+        return;
+    }
+    if (current_collection_ != nullptr) {
+        mongoc_collection_destroy (current_collection_);
+    }
 
-    write_concern_ = mongoc_write_concern_new ();
-    mongoc_write_concern_set_w (write_concern_, MONGOC_WRITE_CONCERN_W_DEFAULT);
-    mongoc_write_concern_set_journal (write_concern_, true);
-    mongoc_collection_set_write_concern (collection_, write_concern_);
+    current_collection_ = mongoc_client_get_collection (client_, database_name_.c_str(),
+                          collection_name.c_str());
+    current_collection_name_ = collection_name;
+    mongoc_collection_set_write_concern (current_collection_, write_concern_);
 }
 
 Error MongoDBClient::TryConnectDatabase() {
@@ -63,8 +71,7 @@ Error MongoDBClient::TryConnectDatabase() {
     return err;
 }
 
-Error MongoDBClient::Connect(const string& address, const string& database_name,
-                             const string& collection_name) {
+Error MongoDBClient::Connect(const string& address, const string& database_name) {
     if (connected_) {
         return DBErrorTemplates::kAlreadyConnected.Generate();
     }
@@ -74,7 +81,7 @@ Error MongoDBClient::Connect(const string& address, const string& database_name,
         return err;
     }
 
-    InitializeCollection(database_name, collection_name);
+    database_name_ = std::move(database_name);
 
     err = TryConnectDatabase();
     if (err) {
@@ -88,9 +95,15 @@ string MongoDBClient::DBAddress(const string& address) const {
 }
 
 void MongoDBClient::CleanUp() {
-    mongoc_write_concern_destroy(write_concern_);
-    mongoc_collection_destroy (collection_);
-    mongoc_client_destroy (client_);
+    if (write_concern_) {
+        mongoc_write_concern_destroy(write_concern_);
+    }
+    if (current_collection_) {
+        mongoc_collection_destroy (current_collection_);
+    }
+    if (client_) {
+        mongoc_client_destroy (client_);
+    }
 }
 
 bson_p PrepareBsonDocument(const FileInfo& file, Error* err) {
@@ -128,7 +141,7 @@ bson_p PrepareBsonDocument(const uint8_t* json, ssize_t len, Error* err) {
 
 Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_duplicates) const {
     bson_error_t mongo_err;
-    if (!mongoc_collection_insert_one(collection_, document.get(), NULL, NULL, &mongo_err)) {
+    if (!mongoc_collection_insert_one(current_collection_, document.get(), NULL, NULL, &mongo_err)) {
         if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) {
             return ignore_duplicates ? nullptr : DBErrorTemplates::kDuplicateID.Generate();
         }
@@ -147,7 +160,7 @@ Error MongoDBClient::UpdateBsonDocument(uint64_t id, const bson_p& document, boo
 
     Error err = nullptr;
 
-    if (!mongoc_collection_replace_one(collection_, selector, document.get(), opts, NULL, &mongo_err)) {
+    if (!mongoc_collection_replace_one(current_collection_, selector, document.get(), opts, NULL, &mongo_err)) {
         err = DBErrorTemplates::kInsertError.Generate(mongo_err.message);
     }
 
@@ -158,11 +171,13 @@ Error MongoDBClient::UpdateBsonDocument(uint64_t id, const bson_p& document, boo
 }
 
 
-Error MongoDBClient::Insert(const FileInfo& file, bool ignore_duplicates) const {
+Error MongoDBClient::Insert(const std::string& collection, const FileInfo& file, bool ignore_duplicates) const {
     if (!connected_) {
         return DBErrorTemplates::kNotConnected.Generate();
     }
 
+    UpdateCurrentCollectionIfNeeded(collection);
+
     Error err;
     auto document = PrepareBsonDocument(file, &err);
     if (err) {
@@ -180,11 +195,13 @@ MongoDBClient::~MongoDBClient() {
     CleanUp();
 }
 
-Error MongoDBClient::Upsert(uint64_t id, const uint8_t* data, uint64_t size) const {
+Error MongoDBClient::Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const {
     if (!connected_) {
         return DBErrorTemplates::kNotConnected.Generate();
     }
 
+    UpdateCurrentCollectionIfNeeded(collection);
+
     Error err;
     auto document = PrepareBsonDocument(data, (ssize_t) size, &err);
     if (err) {
@@ -205,9 +222,9 @@ Error MongoDBClient::AddBsonDocumentToArray(bson_t* query, bson_t* update, bool
     bson_error_t mongo_err;
 // first update may fail due to multiple threads try to create document at once, the second one should succeed
 // https://jira.mongodb.org/browse/SERVER-14322
-    if (!mongoc_collection_update (collection_, MONGOC_UPDATE_UPSERT, query, update, NULL, &mongo_err)) {
+    if (!mongoc_collection_update (current_collection_, MONGOC_UPDATE_UPSERT, query, update, NULL, &mongo_err)) {
         if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) {
-            if (!mongoc_collection_update (collection_, MONGOC_UPDATE_UPSERT, query, update, NULL, &mongo_err)) {
+            if (!mongoc_collection_update (current_collection_, MONGOC_UPDATE_UPSERT, query, update, NULL, &mongo_err)) {
                 if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) {
                     err =  ignore_duplicates ? nullptr : DBErrorTemplates::kDuplicateID.Generate();
                 } else {
@@ -223,7 +240,7 @@ Error MongoDBClient::AddBsonDocumentToArray(bson_t* query, bson_t* update, bool
 
 
 
-Error MongoDBClient::InsertAsSubset(const FileInfo& file,
+Error MongoDBClient::InsertAsSubset(const std::string& collection, const FileInfo& file,
                                     uint64_t subset_id,
                                     uint64_t subset_size,
                                     bool ignore_duplicates) const {
@@ -231,6 +248,8 @@ Error MongoDBClient::InsertAsSubset(const FileInfo& file,
         return DBErrorTemplates::kNotConnected.Generate();
     }
 
+    UpdateCurrentCollectionIfNeeded(collection);
+
     Error err;
     auto document = PrepareBsonDocument(file, &err);
     if (err) {
diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h
index 6537151c6..0544d7912 100644
--- a/common/cpp/src/database/mongodb_client.h
+++ b/common/cpp/src/database/mongodb_client.h
@@ -36,23 +36,23 @@ using bson_p = std::unique_ptr<_bson_t, BsonDestroyFunctor>;
 class MongoDBClient final : public Database {
   public:
     MongoDBClient();
-    Error Connect(const std::string& address, const std::string& database,
-                  const std::string& collection) override;
-    Error Insert(const FileInfo& file, bool ignore_duplicates) const override;
-    Error InsertAsSubset(const FileInfo& file, uint64_t subset_id, uint64_t subset_size,
+    Error Connect(const std::string& address, const std::string& database) override;
+    Error Insert(const std::string& collection, const FileInfo& file, bool ignore_duplicates) const override;
+    Error InsertAsSubset(const std::string& collection, const FileInfo& file, uint64_t subset_id, uint64_t subset_size,
                          bool ignore_duplicates) const override;
-    Error Upsert(uint64_t id, const uint8_t* data, uint64_t size) const override;
+    Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const override;
     ~MongoDBClient() override;
   private:
     mongoc_client_t* client_{nullptr};
-    mongoc_collection_t* collection_{nullptr};
-    mongoc_write_concern_t* write_concern_;
+    mutable mongoc_collection_t* current_collection_{nullptr};
+    mutable std::string current_collection_name_;
+    std::string database_name_;
+    mongoc_write_concern_t* write_concern_{nullptr};
     bool connected_{false};
     void CleanUp();
     std::string DBAddress(const std::string& address) const;
     Error InitializeClient(const std::string& address);
-    void InitializeCollection(const std::string& database_name,
-                              const std::string& collection_name);
+    void UpdateCurrentCollectionIfNeeded(const std::string& collection_name) const ;
     Error Ping();
     Error TryConnectDatabase();
     Error InsertBsonDocument(const bson_p& document, bool ignore_duplicates) const;
diff --git a/consumer/tools/folder_to_db/src/folder_db_importer.cpp b/consumer/tools/folder_to_db/src/folder_db_importer.cpp
index 8d764c3c7..30be63bcf 100644
--- a/consumer/tools/folder_to_db/src/folder_db_importer.cpp
+++ b/consumer/tools/folder_to_db/src/folder_db_importer.cpp
@@ -16,12 +16,12 @@ FolderToDbImporter::FolderToDbImporter() :
 }
 
 Error FolderToDbImporter::ConnectToDb(const std::unique_ptr<asapo::Database>& db) const {
-    return db->Connect(db_uri_, db_name_, kDBDataCollectionName);
+    return db->Connect(db_uri_, db_name_);
 }
 
 Error FolderToDbImporter::ImportSingleFile(const std::unique_ptr<asapo::Database>& db,
                                            const FileInfo& file) const {
-    return db->Insert(file, ignore_duplicates_);
+    return db->Insert(std::string(kDBDataCollectionNamePrefix) + "_default", file, ignore_duplicates_);
 }
 
 Error FolderToDbImporter::ImportFilelistChunk(const std::unique_ptr<asapo::Database>& db,
diff --git a/consumer/tools/folder_to_db/unittests/test_folder_to_db.cpp b/consumer/tools/folder_to_db/unittests/test_folder_to_db.cpp
index 7562b0c58..0c733b24f 100644
--- a/consumer/tools/folder_to_db/unittests/test_folder_to_db.cpp
+++ b/consumer/tools/folder_to_db/unittests/test_folder_to_db.cpp
@@ -73,7 +73,7 @@ class MockDatabaseFactory : public DatabaseFactory {
         for (int i = 0; i < n; i++) {
             auto val = new NiceMock<MockDatabase>;
             db.push_back(val);
-            ON_CALL(*val, Connect_t(_, _, _))
+            ON_CALL(*val, Connect_t(_, _))
             .WillByDefault(Return(nullptr));
         }
     }
@@ -112,7 +112,7 @@ class FolderDBConverterTests : public Test {
   public:
     FolderToDbImporter converter{};
     NiceMock<MockIO> mock_io;
-
+    std::string expected_collection_name = std::string(kDBDataCollectionNamePrefix) + "_default";
     MockDatabaseFactory* mock_dbf;
     FileInfos file_infos;
     std::string folder, uri, db_name;
@@ -135,7 +135,7 @@ class FolderDBConverterTests : public Test {
 };
 
 TEST_F(FolderDBConverterTests, ErrorWhenCannotConnect) {
-    EXPECT_CALL(*(mock_dbf->db[0]), Connect_t(uri, db_name, kDBDataCollectionName)).
+    EXPECT_CALL(*(mock_dbf->db[0]), Connect_t(uri, db_name)).
     WillOnce(testing::Return(asapo::DBErrorTemplates::kConnectionError.Generate().release()));
 
     auto error = converter.Convert(uri, folder, db_name);
@@ -144,11 +144,11 @@ TEST_F(FolderDBConverterTests, ErrorWhenCannotConnect) {
 
 TEST_F(FolderDBConverterTests, ErrorWhenCannotCreateDbParallel) {
     int nparallel = 3;
-    EXPECT_CALL(*(mock_dbf->db[0]), Connect_t(uri, _, _)).
+    EXPECT_CALL(*(mock_dbf->db[0]), Connect_t(uri, _)).
     WillOnce(testing::Return(asapo::DBErrorTemplates::kConnectionError.Generate().release()));
-    EXPECT_CALL(*(mock_dbf->db[1]), Connect_t(uri, _, _)).
+    EXPECT_CALL(*(mock_dbf->db[1]), Connect_t(uri, _)).
     WillOnce(testing::Return(asapo::DBErrorTemplates::kConnectionError.Generate().release()));
-    EXPECT_CALL(*(mock_dbf->db[2]), Connect_t(uri, _, _)).
+    EXPECT_CALL(*(mock_dbf->db[2]), Connect_t(uri, _)).
     WillOnce(testing::Return(asapo::DBErrorTemplates::kConnectionError.Generate().release()));
 
     converter.SetNParallelTasks(nparallel);
@@ -177,7 +177,7 @@ TEST_F(FolderDBConverterTests, ErrorWhenCannotGetFileList) {
 
 TEST_F(FolderDBConverterTests, PassesIgnoreDuplicates) {
 
-    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(_, true)).Times(3);
+    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, _, true)).Times(3);
 
     converter.IgnoreDuplicates(true);
     converter.Convert(uri, folder, db_name);
@@ -186,7 +186,7 @@ TEST_F(FolderDBConverterTests, PassesIgnoreDuplicates) {
 
 TEST_F(FolderDBConverterTests, ErrorWhenCannotImportFileListToDb) {
 
-    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(_, _)).
+    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(_, _, _)).
     WillOnce(testing::Return(asapo::DBErrorTemplates::kInsertError.Generate().release()));
 
     auto error = converter.Convert(uri, folder, db_name);
@@ -205,7 +205,7 @@ MATCHER_P(CompareFileInfo, file, "") {
 TEST_F(FolderDBConverterTests, PassesFileListToInsert) {
 
     for (auto& file : file_infos) {
-        EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(CompareFileInfo(file), _)).
+        EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareFileInfo(file), _)).
         WillOnce(testing::Return(nullptr));
     }
 
@@ -216,11 +216,11 @@ TEST_F(FolderDBConverterTests, PassesFileListToInsert) {
 
 TEST_F(FolderDBConverterTests, PassesFileListToInsertInParallel3by3) {
 
-    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(CompareFileInfo(file_infos[0]), _)).
+    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareFileInfo(file_infos[0]), _)).
     WillOnce(testing::Return(nullptr));
-    EXPECT_CALL(*(mock_dbf->db[1]), Insert_t(CompareFileInfo(file_infos[1]), _)).
+    EXPECT_CALL(*(mock_dbf->db[1]), Insert_t(expected_collection_name, CompareFileInfo(file_infos[1]), _)).
     WillOnce(testing::Return(nullptr));
-    EXPECT_CALL(*(mock_dbf->db[2]), Insert_t(CompareFileInfo(file_infos[2]), _)).
+    EXPECT_CALL(*(mock_dbf->db[2]), Insert_t(expected_collection_name, CompareFileInfo(file_infos[2]), _)).
     WillOnce(testing::Return(nullptr));
 
     converter.SetNParallelTasks(3, false);
@@ -230,11 +230,11 @@ TEST_F(FolderDBConverterTests, PassesFileListToInsertInParallel3by3) {
 
 TEST_F(FolderDBConverterTests, PassesFileListToInsertInParallel3by2) {
 
-    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(CompareFileInfo(file_infos[0]), _)).
+    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareFileInfo(file_infos[0]), _)).
     WillOnce(testing::Return(nullptr));
-    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(CompareFileInfo(file_infos[1]), _)).
+    EXPECT_CALL(*(mock_dbf->db[0]), Insert_t(expected_collection_name, CompareFileInfo(file_infos[1]), _)).
     WillOnce(testing::Return(nullptr));
-    EXPECT_CALL(*(mock_dbf->db[1]), Insert_t(CompareFileInfo(file_infos[2]), _)).
+    EXPECT_CALL(*(mock_dbf->db[1]), Insert_t(expected_collection_name, CompareFileInfo(file_infos[2]), _)).
     WillOnce(testing::Return(nullptr));
 
     converter.SetNParallelTasks(2, false);
@@ -244,7 +244,7 @@ TEST_F(FolderDBConverterTests, PassesFileListToInsertInParallel3by2) {
 
 TEST_F(FolderDBConverterTests, ComputesStatistics) {
 
-    EXPECT_CALL(*mock_dbf->db[0], Insert_t(_, false)).
+    EXPECT_CALL(*mock_dbf->db[0], Insert_t(_, _, false)).
     Times(file_infos.size()).
     WillRepeatedly(testing::Return(nullptr));
 
diff --git a/examples/consumer/getnext_broker/check_linux.sh b/examples/consumer/getnext_broker/check_linux.sh
index 10c158e9f..01b701c9b 100644
--- a/examples/consumer/getnext_broker/check_linux.sh
+++ b/examples/consumer/getnext_broker/check_linux.sh
@@ -25,7 +25,7 @@ nomad run broker.nmd
 
 for i in `seq 1 3`;
 do
-	echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
+	echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
 done
 
 sleep 1
diff --git a/examples/consumer/getnext_broker/check_windows.bat b/examples/consumer/getnext_broker/check_windows.bat
index f75fbc5f5..76f25ae90 100644
--- a/examples/consumer/getnext_broker/check_windows.bat
+++ b/examples/consumer/getnext_broker/check_windows.bat
@@ -13,7 +13,7 @@ c:\opt\consul\nomad run nginx.nmd
 
 ping 1.0.0.0 -n 10 -w 100 > nul
 
-for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
+for /l %%x in (1, 1, 3) do echo db.data_default.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
 
 
 "%1" 127.0.0.1:8400 %source_path% %beamtime_id% 1 %token_test_run% 12000 1 | findstr /c:"Processed 3 file" || goto :error
diff --git a/examples/consumer/getnext_broker_python/check_linux.sh b/examples/consumer/getnext_broker_python/check_linux.sh
index 86fa43d26..2217862e5 100644
--- a/examples/consumer/getnext_broker_python/check_linux.sh
+++ b/examples/consumer/getnext_broker_python/check_linux.sh
@@ -27,7 +27,7 @@ nomad run broker.nmd
 
 for i in `seq 1 3`;
 do
-	echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
+	echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
 done
 
 echo 'db.meta.insert({"_id":0,"meta_test":"test"})' | mongo ${database_name}
diff --git a/examples/consumer/getnext_broker_python/check_windows.bat b/examples/consumer/getnext_broker_python/check_windows.bat
index b27b91152..610c6a4c3 100644
--- a/examples/consumer/getnext_broker_python/check_windows.bat
+++ b/examples/consumer/getnext_broker_python/check_windows.bat
@@ -13,7 +13,7 @@ c:\opt\consul\nomad run nginx.nmd
 
 ping 1.0.0.0 -n 10 -w 100 > nul
 
-for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
+for /l %%x in (1, 1, 3) do echo db.data_default.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
 
 
 echo db.meta.insert({"_id":0,"meta_test":"test"}) | %mongo_exe% %database_name%  || goto :error
diff --git a/examples/pipeline/in_to_out/check_linux.sh b/examples/pipeline/in_to_out/check_linux.sh
index 9f0b6abca..8d5d293f9 100644
--- a/examples/pipeline/in_to_out/check_linux.sh
+++ b/examples/pipeline/in_to_out/check_linux.sh
@@ -54,7 +54,7 @@ echo hello3 > file3
 
 for i in `seq 1 3`;
 do
-	echo 'db.data.insert({"_id":'$i',"size":6,"name":"'file$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${indatabase_name}
+	echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'file$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${indatabase_name}
 done
 
 sleep 1
@@ -64,7 +64,7 @@ cat out
 cat out | grep "Processed 3 file(s)"
 cat out | grep "Sent 3 file(s)"
 
-echo "db.data.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep file1_${stream_out}
+echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep file1_${stream_out}
 
 cat ${receiver_folder}/file1_${stream_out} | grep hello1
 cat ${receiver_folder}/file2_${stream_out} | grep hello2
@@ -73,4 +73,4 @@ cat ${receiver_folder}/file3_${stream_out} | grep hello3
 $1 127.0.0.1:8400 $source_path $beamtime_id $stream_in $stream_out2 $token 2 1000 25000 0  > out2
 cat out2
 test ! -f ${receiver_folder}/file1_${stream_out2}
-echo "db.data.find({"_id":1})" | mongo ${outdatabase_name2} | tee /dev/stderr | grep ./file1
+echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name2} | tee /dev/stderr | grep ./file1
diff --git a/examples/pipeline/in_to_out/check_windows.bat b/examples/pipeline/in_to_out/check_windows.bat
index 09cab7028..673e62683 100644
--- a/examples/pipeline/in_to_out/check_windows.bat
+++ b/examples/pipeline/in_to_out/check_windows.bat
@@ -27,7 +27,7 @@ c:\opt\consul\nomad run authorizer.nmd
 
 ping 1.0.0.0 -n 10 -w 100 > nul
 
-for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":6,"name":"file%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %indatabase_name%  || goto :error
+for /l %%x in (1, 1, 3) do echo db.data_default.insert({"_id":%%x,"size":6,"name":"file%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %indatabase_name%  || goto :error
 
 mkdir %receiver_folder%
 
@@ -41,7 +41,7 @@ type out
 findstr /I /L /C:"Processed 3 file(s)" out || goto :error
 findstr /I /L /C:"Sent 3 file(s)" out || goto :error
 
-echo db.data.find({"_id":1}) | %mongo_exe% %outdatabase_name% | findstr  /c:"file1_%stream_out%"  || goto :error
+echo db.data_default.find({"_id":1}) | %mongo_exe% %outdatabase_name% | findstr  /c:"file1_%stream_out%"  || goto :error
 
 findstr /I /L /C:"hello1" %receiver_folder%\file1_%stream_out% || goto :error
 findstr /I /L /C:"hello2" %receiver_folder%\file2_%stream_out% || goto :error
@@ -54,7 +54,7 @@ findstr /I /L /C:"Processed 3 file(s)" out2 || goto :error
 findstr /I /L /C:"Sent 3 file(s)" out2 || goto :error
 
 
-echo db.data.find({"_id":1}) | %mongo_exe% %outdatabase_name2% | findstr /c:".\\\\file1" || goto :error
+echo db.data_default.find({"_id":1}) | %mongo_exe% %outdatabase_name2% | findstr /c:".\\\\file1" || goto :error
 
 
 goto :clean
diff --git a/examples/pipeline/in_to_out_python/check_linux.sh b/examples/pipeline/in_to_out_python/check_linux.sh
index cc546b0b3..b1780ca9b 100644
--- a/examples/pipeline/in_to_out_python/check_linux.sh
+++ b/examples/pipeline/in_to_out_python/check_linux.sh
@@ -56,7 +56,7 @@ echo hello3 > file3
 
 for i in `seq 1 3`;
 do
-	echo 'db.data.insert({"_id":'$i',"size":6,"name":"'file$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${indatabase_name}
+	echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'file$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${indatabase_name}
 done
 
 sleep 1
@@ -69,7 +69,7 @@ cat out
 cat out | grep "Processed 3 file(s)"
 cat out | grep "Sent 3 file(s)"
 
-echo "db.data.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${stream_out}"
+echo "db.data_default.find({"_id":1})" | mongo ${outdatabase_name} | tee /dev/stderr | grep "file1_${stream_out}"
 
 cat ${receiver_folder}/file1_${stream_out} | grep hello1
 cat ${receiver_folder}/file2_${stream_out} | grep hello2
diff --git a/examples/pipeline/in_to_out_python/check_windows.bat b/examples/pipeline/in_to_out_python/check_windows.bat
index ae1c143e5..fe83804aa 100644
--- a/examples/pipeline/in_to_out_python/check_windows.bat
+++ b/examples/pipeline/in_to_out_python/check_windows.bat
@@ -28,7 +28,7 @@ c:\opt\consul\nomad run authorizer.nmd
 
 ping 1.0.0.0 -n 10 -w 100 > nul
 
-for /l %%x in (1, 1, 3) do echo db.data.insert({"_id":%%x,"size":6,"name":"file%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %indatabase_name%  || goto :error
+for /l %%x in (1, 1, 3) do echo db.data_default.insert({"_id":%%x,"size":6,"name":"file%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %indatabase_name%  || goto :error
 
 mkdir %receiver_folder%
 
@@ -44,7 +44,7 @@ type out
 findstr /I /L /C:"Processed 3 file(s)" out || goto :error
 findstr /I /L /C:"Sent 3 file(s)" out || goto :error
 
-echo db.data.find({"_id":1}) | %mongo_exe% %outdatabase_name% | findstr  /c:"file1_%stream_out%"  || goto :error
+echo db.data_default.find({"_id":1}) | %mongo_exe% %outdatabase_name% | findstr  /c:"file1_%stream_out%"  || goto :error
 
 findstr /I /L /C:"hello1" %receiver_folder%\file1_%stream_out% || goto :error
 findstr /I /L /C:"hello2" %receiver_folder%\file2_%stream_out% || goto :error
diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp
index 672fb0928..2a5cec65f 100644
--- a/receiver/src/request.cpp
+++ b/receiver/src/request.cpp
@@ -83,6 +83,10 @@ std::string Request::GetFileName() const {
     return orig_name;
 }
 
+std::string Request::GetSubstream() const {
+    return request_header_.substream;
+}
+
 const std::string& Request::GetOriginUri() const {
     return origin_uri_;
 }
diff --git a/receiver/src/request.h b/receiver/src/request.h
index d665b0503..7af37f61b 100644
--- a/receiver/src/request.h
+++ b/receiver/src/request.h
@@ -35,6 +35,7 @@ class Request {
     VIRTUAL uint64_t GetMetaDataSize() const;
     VIRTUAL uint64_t GetDataID() const;
     VIRTUAL std::string GetFileName() const;
+    VIRTUAL std::string GetSubstream() const;
     VIRTUAL void* GetData() const;
     VIRTUAL Opcode GetOpCode() const;
     VIRTUAL const char* GetMessage() const;
diff --git a/receiver/src/request_factory.h b/receiver/src/request_factory.h
index 350a44aac..90ce81c28 100644
--- a/receiver/src/request_factory.h
+++ b/receiver/src/request_factory.h
@@ -16,7 +16,7 @@ class RequestFactory {
     RequestHandlerFileWrite request_handler_filewrite_;
     RequestHandlerReceiveData request_handler_receivedata_;
     RequestHandlerReceiveMetaData request_handler_receive_metadata_;
-    RequestHandlerDbWrite request_handler_dbwrite_{kDBDataCollectionName};
+    RequestHandlerDbWrite request_handler_dbwrite_{kDBDataCollectionNamePrefix};
     RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName};
     RequestHandlerAuthorize request_handler_authorize_;
     RequestHandlerFileReceive request_handler_filereceive_;
diff --git a/receiver/src/request_handler_db.cpp b/receiver/src/request_handler_db.cpp
index b04f73224..2b64f85c0 100644
--- a/receiver/src/request_handler_db.cpp
+++ b/receiver/src/request_handler_db.cpp
@@ -16,9 +16,9 @@ Error RequestHandlerDb::ProcessRequest(Request* request) const {
     return ConnectToDbIfNeeded();
 }
 
-RequestHandlerDb::RequestHandlerDb(std::string collection_name): log__{GetDefaultReceiverLogger()},
+RequestHandlerDb::RequestHandlerDb(std::string collection_name_prefix): log__{GetDefaultReceiverLogger()},
     http_client__{DefaultHttpClient()},
-    collection_name_{std::move(collection_name)} {
+    collection_name_prefix_{std::move(collection_name_prefix)} {
     DatabaseFactory factory;
     Error err;
     db_client__ = factory.Create(&err);
@@ -64,7 +64,7 @@ Error RequestHandlerDb::ConnectToDbIfNeeded() const {
         if (err) {
             return err;
         }
-        err = db_client__->Connect(uri, db_name_, collection_name_);
+        err = db_client__->Connect(uri, db_name_);
         if (err) {
             return ReceiverErrorTemplates::kInternalServerError.Generate("error connecting to database " + err->Explain());
         }
diff --git a/receiver/src/request_handler_db.h b/receiver/src/request_handler_db.h
index d25b6d1fa..86b09a5c4 100644
--- a/receiver/src/request_handler_db.h
+++ b/receiver/src/request_handler_db.h
@@ -13,7 +13,7 @@ namespace asapo {
 class RequestHandlerDb : public ReceiverRequestHandler {
   public:
     RequestHandlerDb() = delete;
-    RequestHandlerDb(std::string collection_name);
+    RequestHandlerDb(std::string collection_name_prefix);
     StatisticEntity GetStatisticEntity() const override;
     Error ProcessRequest(Request* request) const override;
     std::unique_ptr<Database> db_client__;
@@ -23,7 +23,7 @@ class RequestHandlerDb : public ReceiverRequestHandler {
     Error ConnectToDbIfNeeded() const;
     mutable bool connected_to_db = false;
     mutable std::string db_name_;
-    std::string collection_name_;
+    std::string collection_name_prefix_;
   private:
     Error GetDatabaseServerUri(std::string* uri) const ;
 };
diff --git a/receiver/src/request_handler_db_meta_write.cpp b/receiver/src/request_handler_db_meta_write.cpp
index 25e0277ce..23d45e0f4 100644
--- a/receiver/src/request_handler_db_meta_write.cpp
+++ b/receiver/src/request_handler_db_meta_write.cpp
@@ -16,9 +16,9 @@ Error RequestHandlerDbMetaWrite::ProcessRequest(Request* request) const {
     auto meta = (uint8_t*)request->GetData();
     auto meta_id = request->GetDataID();
 
-    auto err =  db_client__->Upsert(meta_id, meta, size);
+    auto err =  db_client__->Upsert(collection_name_prefix_, meta_id, meta, size);
     if (!err) {
-        log__->Debug(std::string{"insert beamtime meta"} + " to " + collection_name_ + " in " +
+        log__->Debug(std::string{"insert beamtime meta"} + " to " + collection_name_prefix_ + " in " +
                      db_name_ +
                      " at " + GetReceiverConfig()->database_uri);
     }
diff --git a/receiver/src/request_handler_db_write.cpp b/receiver/src/request_handler_db_write.cpp
index 91580bfae..e2d3c656c 100644
--- a/receiver/src/request_handler_db_write.cpp
+++ b/receiver/src/request_handler_db_write.cpp
@@ -28,21 +28,22 @@ Error RequestHandlerDbWrite::InsertRecordToDb(const Request* request) const {
     auto file_info = PrepareFileInfo(request);
 
     auto op_code = request->GetOpCode();
+    auto col_name = collection_name_prefix_ + "_" + request->GetSubstream();
     Error err;
     if (op_code == Opcode::kOpcodeTransferData) {
-        err =  db_client__->Insert(file_info, true);
+        err =  db_client__->Insert(col_name, file_info, true);
         if (!err) {
-            log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + collection_name_ + " in " +
+            log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + col_name + " in " +
                          db_name_ +
                          " at " + GetReceiverConfig()->database_uri);
         }
     } else {
         auto subset_id = request->GetCustomData()[1];
         auto subset_size = request->GetCustomData()[2];
-        err =  db_client__->InsertAsSubset(file_info, subset_id, subset_size, true);
+        err =  db_client__->InsertAsSubset(col_name, file_info, subset_id, subset_size, true);
         if (!err) {
             log__->Debug(std::string{"insert record as subset id "} + std::to_string(subset_id) + ", id: " +
-                         std::to_string(file_info.id) + " to " + collection_name_ + " in " +
+                         std::to_string(file_info.id) + " to " + col_name + " in " +
                          db_name_ +
                          " at " + GetReceiverConfig()->database_uri);
         }
@@ -61,8 +62,8 @@ FileInfo RequestHandlerDbWrite::PrepareFileInfo(const Request* request) const {
     file_info.metadata = request->GetMetaData();
     return file_info;
 }
-RequestHandlerDbWrite::RequestHandlerDbWrite(std::string collection_name) : RequestHandlerDb(std::move(
-                collection_name)) {
+RequestHandlerDbWrite::RequestHandlerDbWrite(std::string collection_name_prefix) : RequestHandlerDb(std::move(
+                collection_name_prefix)) {
 
 }
 
diff --git a/receiver/src/request_handler_db_write.h b/receiver/src/request_handler_db_write.h
index b4804bf31..56a9c00a8 100644
--- a/receiver/src/request_handler_db_write.h
+++ b/receiver/src/request_handler_db_write.h
@@ -12,7 +12,7 @@ namespace asapo {
 class RequestHandlerDbWrite final: public RequestHandlerDb {
   public:
     Error ProcessRequest(Request* request) const override;
-    RequestHandlerDbWrite(std::string collection_name);
+    RequestHandlerDbWrite(std::string collection_name_prefix);
   private:
     FileInfo PrepareFileInfo(const Request* request) const;
     Error InsertRecordToDb(const Request* request) const;
diff --git a/receiver/unittests/receiver_mocking.h b/receiver/unittests/receiver_mocking.h
index a37d82746..a0ae63135 100644
--- a/receiver/unittests/receiver_mocking.h
+++ b/receiver/unittests/receiver_mocking.h
@@ -46,6 +46,7 @@ class MockRequest: public Request {
         Request(request_header, socket_fd, std::move(origin_uri), nullptr) {};
 
     MOCK_CONST_METHOD0(GetFileName, std::string());
+    MOCK_CONST_METHOD0(GetSubstream, std::string());
     MOCK_CONST_METHOD0(GetDataSize, uint64_t());
     MOCK_CONST_METHOD0(GetDataID, uint64_t());
     MOCK_CONST_METHOD0(GetSlotId, uint64_t());
diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp
index 45875b4e6..392f83a13 100644
--- a/receiver/unittests/test_request.cpp
+++ b/receiver/unittests/test_request.cpp
@@ -69,6 +69,7 @@ class RequestTests : public Test {
     uint64_t expected_slot_id{16};
     std::string expected_origin_uri = "origin_uri";
     std::string expected_metadata = "meta";
+    std::string expected_substream = "substream";
     uint64_t expected_metadata_size = expected_metadata.size();
     asapo::Opcode expected_op_code = asapo::kOpcodeTransferData;
     char expected_request_message[asapo::kMaxMessageSize] = "test_message";
@@ -174,6 +175,20 @@ void RequestTests::ExpectFileName(std::string sended, std::string received) {
 
 }
 
+
+TEST_F(RequestTests, GetSubstream) {
+    strcpy(generic_request_header.substream, expected_substream.c_str());
+
+    request->io__.release();
+    request.reset(new Request{generic_request_header, expected_socket_id, expected_origin_uri, nullptr});
+    request->io__ = std::unique_ptr<asapo::IO> {&mock_io};;
+
+    auto substream = request->GetSubstream();
+
+    ASSERT_THAT(substream, Eq(expected_substream));
+}
+
+
 TEST_F(RequestTests, GetFileName) {
     ExpectFileName("filename.txt", "filename.txt");
 }
diff --git a/receiver/unittests/test_request_handler_db.cpp b/receiver/unittests/test_request_handler_db.cpp
index 92554351d..a2804fae7 100644
--- a/receiver/unittests/test_request_handler_db.cpp
+++ b/receiver/unittests/test_request_handler_db.cpp
@@ -156,8 +156,7 @@ TEST_F(DbHandlerTests, ProcessRequestDiscoversMongoDbAddress) {
     ;
 
 
-    EXPECT_CALL(mock_db, Connect_t(expected_database_server, expected_beamtime_id + "_" + expected_stream,
-                                   expected_collection_name)).
+    EXPECT_CALL(mock_db, Connect_t(expected_database_server, expected_beamtime_id + "_" + expected_stream)).
     WillOnce(testing::Return(nullptr));
 
     auto err = handler.ProcessRequest(mock_request.get());
@@ -172,7 +171,7 @@ TEST_F(DbHandlerTests, ProcessRequestErrorDiscoversMongoDbAddress) {
 
     MockAuthRequest(true, HttpCode::BadRequest);
 
-    EXPECT_CALL(mock_db, Connect_t(_, _, _)).Times(0);
+    EXPECT_CALL(mock_db, Connect_t(_, _)).Times(0);
 
     auto err = handler.ProcessRequest(mock_request.get());
     ASSERT_THAT(err, Eq(asapo::ReceiverErrorTemplates::kInternalServerError));
@@ -195,8 +194,7 @@ TEST_F(DbHandlerTests, ProcessRequestCallsConnectDbWhenNotConnected) {
 
 
 
-    EXPECT_CALL(mock_db, Connect_t("127.0.0.1:27017", expected_beamtime_id + "_" + expected_stream,
-                                   expected_collection_name)).
+    EXPECT_CALL(mock_db, Connect_t("127.0.0.1:27017", expected_beamtime_id + "_" + expected_stream)).
     WillOnce(testing::Return(nullptr));
 
     auto err = handler.ProcessRequest(mock_request.get());
@@ -205,7 +203,7 @@ TEST_F(DbHandlerTests, ProcessRequestCallsConnectDbWhenNotConnected) {
 
 TEST_F(DbHandlerTests, ProcessRequestReturnsErrorWhenCannotConnect) {
 
-    EXPECT_CALL(mock_db, Connect_t(_, _, expected_collection_name)).
+    EXPECT_CALL(mock_db, Connect_t(_, _)).
     WillOnce(testing::Return(new asapo::SimpleError("")));
 
     auto err = handler.ProcessRequest(mock_request.get());
@@ -216,7 +214,7 @@ TEST_F(DbHandlerTests, ProcessRequestReturnsErrorWhenCannotConnect) {
 
 TEST_F(DbHandlerTests, ProcessRequestDoesNotCallConnectSecondTime) {
 
-    EXPECT_CALL(mock_db, Connect_t(_, _, expected_collection_name)).
+    EXPECT_CALL(mock_db, Connect_t(_, _)).
     WillOnce(testing::Return(nullptr));
 
     handler.ProcessRequest(mock_request.get());
diff --git a/receiver/unittests/test_request_handler_db_meta_writer.cpp b/receiver/unittests/test_request_handler_db_meta_writer.cpp
index a81ff983d..822bbc42a 100644
--- a/receiver/unittests/test_request_handler_db_meta_writer.cpp
+++ b/receiver/unittests/test_request_handler_db_meta_writer.cpp
@@ -96,8 +96,7 @@ TEST_F(DbMetaWriterHandlerTests, CallsUpdate) {
     ;
 
 
-    EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream,
-                                   expected_collection_name)).
+    EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_stream)).
     WillOnce(testing::Return(nullptr));
 
 
@@ -109,7 +108,7 @@ TEST_F(DbMetaWriterHandlerTests, CallsUpdate) {
     .WillOnce(Return((void*)expected_meta))
     ;
 
-    EXPECT_CALL(mock_db, Upsert_t(expected_meta_id, expected_meta, expected_meta_size)).
+    EXPECT_CALL(mock_db, Upsert_t(expected_collection_name, expected_meta_id, expected_meta, expected_meta_size)).
     WillOnce(testing::Return(nullptr));
 
     EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("insert beamtime meta"),
diff --git a/receiver/unittests/test_request_handler_db_writer.cpp b/receiver/unittests/test_request_handler_db_writer.cpp
index 19c1ec322..8c6454049 100644
--- a/receiver/unittests/test_request_handler_db_writer.cpp
+++ b/receiver/unittests/test_request_handler_db_writer.cpp
@@ -62,8 +62,9 @@ TEST(DbWriterHandler, Constructor) {
 
 class DbWriterHandlerTests : public Test {
   public:
-    std::string expected_collection_name = asapo::kDBDataCollectionName;
-    RequestHandlerDbWrite handler{expected_collection_name};
+    std::string expected_substream = "substream";
+    std::string expected_collection_name = std::string(asapo::kDBDataCollectionNamePrefix) + "_" + expected_substream;
+    RequestHandlerDbWrite handler{asapo::kDBDataCollectionNamePrefix};
     std::unique_ptr<NiceMock<MockRequest>> mock_request;
     NiceMock<MockDatabase> mock_db;
     NiceMock<asapo::MockLogger> mock_logger;
@@ -134,7 +135,7 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std:
     std::string db_name = expected_beamtime_id;
     db_name += "_" + stream;
 
-    EXPECT_CALL(mock_db, Connect_t(config.database_uri, db_name, expected_collection_name)).
+    EXPECT_CALL(mock_db, Connect_t(config.database_uri, db_name)).
     WillOnce(testing::Return(nullptr));
 
     EXPECT_CALL(*mock_request, GetDataSize())
@@ -145,6 +146,11 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std:
     .WillOnce(Return(expected_file_name))
     ;
 
+    EXPECT_CALL(*mock_request, GetSubstream())
+    .WillOnce(Return(expected_substream))
+    ;
+
+
     EXPECT_CALL(*mock_request, GetMetaData())
     .WillOnce(ReturnRef(expected_metadata))
     ;
@@ -183,7 +189,7 @@ TEST_F(DbWriterHandlerTests, CallsInsert) {
     ExpectRequestParams(asapo::Opcode::kOpcodeTransferData, expected_stream);
     auto file_info = PrepareFileInfo();
 
-    EXPECT_CALL(mock_db, Insert_t(CompareFileInfo(file_info), _)).
+    EXPECT_CALL(mock_db, Insert_t(expected_collection_name, CompareFileInfo(file_info), _)).
     WillOnce(testing::Return(nullptr));
 
     EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("insert record"),
@@ -204,7 +210,8 @@ TEST_F(DbWriterHandlerTests, CallsInsertSubset) {
     auto file_info = PrepareFileInfo();
 
 
-    EXPECT_CALL(mock_db, InsertAsSubset_t(CompareFileInfo(file_info), expected_subset_id, expected_subset_size, _)).
+    EXPECT_CALL(mock_db, InsertAsSubset_t(expected_collection_name, CompareFileInfo(file_info), expected_subset_id,
+                                          expected_subset_size, _)).
     WillOnce(testing::Return(nullptr));
 
     EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("insert record"),
diff --git a/tests/automatic/broker/get_last/check_linux.sh b/tests/automatic/broker/get_last/check_linux.sh
index 7dc7542ec..4417903f6 100644
--- a/tests/automatic/broker/get_last/check_linux.sh
+++ b/tests/automatic/broker/get_last/check_linux.sh
@@ -12,8 +12,8 @@ Cleanup() {
 	kill -9 $brokerid
 }
 
-echo "db.data.insert({"_id":2})" | mongo ${database_name}
-echo "db.data.insert({"_id":1})" | mongo ${database_name}
+echo "db.data_default.insert({"_id":2})" | mongo ${database_name}
+echo "db.data_default.insert({"_id":1})" | mongo ${database_name}
 
 token=`$2 token -secret auth_secret.key data`
 
@@ -30,11 +30,11 @@ curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$tok
 curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr - | grep '"_id":2'
 curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr - | grep '"_id":2'
 
-echo "db.data.insert({"_id":3})" | mongo ${database_name}
+echo "db.data_default.insert({"_id":3})" | mongo ${database_name}
 
 curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr - | grep '"_id":3'
 
-echo "db.data.insert({"_id":4})" | mongo ${database_name}
+echo "db.data_default.insert({"_id":4})" | mongo ${database_name}
 
 curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/next?token=$token --stderr - | grep '"_id":4'
 curl -v  --silent 127.0.0.1:5005/database/data/stream/${groupid}/last?token=$token --stderr - | grep '"_id":4'
diff --git a/tests/automatic/broker/get_last/check_windows.bat b/tests/automatic/broker/get_last/check_windows.bat
index bd9d99582..c0746966f 100644
--- a/tests/automatic/broker/get_last/check_windows.bat
+++ b/tests/automatic/broker/get_last/check_windows.bat
@@ -1,8 +1,8 @@
 SET database_name=data_stream
 SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe"
 
-echo db.data.insert({"_id":1}) | %mongo_exe% %database_name%  || goto :error
-echo db.data.insert({"_id":2}) | %mongo_exe% %database_name%  || goto :error
+echo db.data_default.insert({"_id":1}) | %mongo_exe% %database_name%  || goto :error
+echo db.data_default.insert({"_id":2}) | %mongo_exe% %database_name%  || goto :error
 
 set full_name="%1"
 set short_name="%~nx1"
@@ -20,10 +20,10 @@ set /P groupid=< groupid
 C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":2  || goto :error
 C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":2  || goto :error
 
-echo db.data.insert({"_id":3}) | %mongo_exe% %database_name%  || goto :error
+echo db.data_default.insert({"_id":3}) | %mongo_exe% %database_name%  || goto :error
 C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":3  || goto :error
 
-echo db.data.insert({"_id":4}) | %mongo_exe% %database_name%  || goto :error
+echo db.data_default.insert({"_id":4}) | %mongo_exe% %database_name%  || goto :error
 
 C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/next?token=%token% --stderr - | findstr /c:\"_id\":4  || goto :error
 C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/stream/%groupid%/last?token=%token% --stderr - | findstr /c:\"_id\":4  || goto :error
diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh
index 760c2402b..f65a59a48 100644
--- a/tests/automatic/broker/get_next/check_linux.sh
+++ b/tests/automatic/broker/get_next/check_linux.sh
@@ -12,8 +12,8 @@ Cleanup() {
 	kill -9 $brokerid
 }
 
-echo "db.data.insert({"_id":2})" | mongo ${database_name}
-echo "db.data.insert({"_id":1})" | mongo ${database_name}
+echo "db.data_default.insert({"_id":2})" | mongo ${database_name}
+echo "db.data_default.insert({"_id":1})" | mongo ${database_name}
 
 token=`$2 token -secret auth_secret.key data`
 
diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat
index b1faffe4e..e108767a8 100644
--- a/tests/automatic/broker/get_next/check_windows.bat
+++ b/tests/automatic/broker/get_next/check_windows.bat
@@ -1,8 +1,8 @@
 SET database_name=data_stream
 SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe"
 
-echo db.data.insert({"_id":1}) | %mongo_exe% %database_name%  || goto :error
-echo db.data.insert({"_id":2}) | %mongo_exe% %database_name%  || goto :error
+echo db.data_default.insert({"_id":1}) | %mongo_exe% %database_name%  || goto :error
+echo db.data_default.insert({"_id":2}) | %mongo_exe% %database_name%  || goto :error
 
 set full_name="%1"
 set short_name="%~nx1"
diff --git a/tests/automatic/bug_fixes/consumer_python_memleak/check_linux.sh b/tests/automatic/bug_fixes/consumer_python_memleak/check_linux.sh
index a299cd765..c3dff6e8a 100644
--- a/tests/automatic/bug_fixes/consumer_python_memleak/check_linux.sh
+++ b/tests/automatic/bug_fixes/consumer_python_memleak/check_linux.sh
@@ -28,7 +28,7 @@ nomad run broker.nmd
 
 sleep 1
 
-echo 'db.data.insert({"_id":'1',"size":'$size',"name":"'$fname'","lastchange":1,"source":"none","buf_id":0,"meta":{}})' | mongo ${beamtime_id}_stream
+echo 'db.data_default.insert({"_id":'1',"size":'$size',"name":"'$fname'","lastchange":1,"source":"none","buf_id":0,"meta":{}})' | mongo ${beamtime_id}_stream
 dd if=/dev/zero of=$fname bs=$size count=1
 
 export PYTHONPATH=$1:${PYTHONPATH}
diff --git a/tests/automatic/consumer/consumer_api/check_linux.sh b/tests/automatic/consumer/consumer_api/check_linux.sh
index 1634fb47a..39e266c9d 100644
--- a/tests/automatic/consumer/consumer_api/check_linux.sh
+++ b/tests/automatic/consumer/consumer_api/check_linux.sh
@@ -28,7 +28,7 @@ sleep 1
 
 for i in `seq 1 10`;
 do
-	echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
+	echo 'db.data_default.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
 done
 
 echo hello1 > 1
@@ -49,7 +49,7 @@ do
 		images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}"
 	done
 	images=${images#?}
-	echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name}
+	echo 'db.data_default.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name}
 done
 
 echo hello1 > 1_1
diff --git a/tests/automatic/consumer/consumer_api/check_windows.bat b/tests/automatic/consumer/consumer_api/check_windows.bat
index 283468e99..e5116fa46 100644
--- a/tests/automatic/consumer/consumer_api/check_windows.bat
+++ b/tests/automatic/consumer/consumer_api/check_windows.bat
@@ -14,7 +14,7 @@ c:\opt\consul\nomad run nginx.nmd
 
 ping 1.0.0.0 -n 10 -w 100 > nul
 
-for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
+for /l %%x in (1, 1, 10) do echo db.data_default.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
 
 echo hello1 > 1
 
@@ -23,7 +23,7 @@ echo hello1 > 1
 
 echo db.dropDatabase() | %mongo_exe% %database_name%
 
-for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":6,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":6,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name%  || goto :error
+for /l %%x in (1, 1, 10) do echo db.data_default.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":6,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":6,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name%  || goto :error
 
 echo hello1 > 1_1
 
diff --git a/tests/automatic/consumer/consumer_api_python/check_windows.bat b/tests/automatic/consumer/consumer_api_python/check_windows.bat
index 242059197..8daf30d95 100644
--- a/tests/automatic/consumer/consumer_api_python/check_windows.bat
+++ b/tests/automatic/consumer/consumer_api_python/check_windows.bat
@@ -13,7 +13,7 @@ c:\opt\consul\nomad run nginx.nmd
 
 ping 1.0.0.0 -n 10 -w 100 > nul
 
-for /l %%x in (1, 1, 5) do echo db.data.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
+for /l %%x in (1, 1, 5) do echo db.data_default.insert({"_id":%%x,"size":6,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
 
 set PYTHONPATH=%1
 
@@ -25,7 +25,7 @@ python consumer_api.py 127.0.0.1:8400 %source_path% %beamtime_id%  %token_test_r
 
 echo db.dropDatabase() | %mongo_exe% %database_name%
 
-for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":6,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":6,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name%  || goto :error
+for /l %%x in (1, 1, 10) do echo db.data_default.insert({"_id":%%x,"size":3,"images":[{"_id":1, "size":6,"name":"%%x_1","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":2, "size":6,"name":"%%x_2","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}},{"_id":3, "size":6,"name":"%%x_3","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}]}) | %mongo_exe% %database_name%  || goto :error
 
 python consumer_api.py 127.0.0.1:8400 %source_path% %beamtime_id%  %token_test_run% datasets || goto :error
 
diff --git a/tests/automatic/consumer/folder_to_db/check_linux.sh b/tests/automatic/consumer/folder_to_db/check_linux.sh
index 3e24645bc..98a424876 100644
--- a/tests/automatic/consumer/folder_to_db/check_linux.sh
+++ b/tests/automatic/consumer/folder_to_db/check_linux.sh
@@ -20,8 +20,8 @@ touch test/file1
 $@ test test_run 127.0.0.1
 
 echo "show collections" | mongo ${database_name} | grep data
-echo "db.data.find({"_id":1})" | mongo ${database_name} | grep file2
-echo "db.data.find({"_id":2})" | mongo ${database_name} | grep file1
+echo "db.data_default.find({"_id":1})" | mongo ${database_name} | grep file2
+echo "db.data_default.find({"_id":2})" | mongo ${database_name} | grep file1
 
 # check if gives error on duplicates
 ! $@ test test_run 127.0.0.1
diff --git a/tests/automatic/consumer/folder_to_db/check_windows.bat b/tests/automatic/consumer/folder_to_db/check_windows.bat
index be88973e9..f24ad478f 100644
--- a/tests/automatic/consumer/folder_to_db/check_windows.bat
+++ b/tests/automatic/consumer/folder_to_db/check_windows.bat
@@ -9,8 +9,8 @@ echo "" > test/file1
 %* test test_run 127.0.0.1 || goto :error
 
 echo show collections | %mongo_exe% %database_name% | findstr data  || goto :error
-echo db.data.find({"_id":1}) | %mongo_exe% %database_name% | findstr file2  || goto :error
-echo db.data.find({"_id":2}) | %mongo_exe% %database_name% | findstr file1  || goto :error
+echo db.data_default.find({"_id":1}) | %mongo_exe% %database_name% | findstr file2  || goto :error
+echo db.data_default.find({"_id":2}) | %mongo_exe% %database_name% | findstr file1  || goto :error
 
 # check if gives error on duplicates
 %* test test_run 127.0.0.1  && goto :error
diff --git a/tests/automatic/consumer/next_multithread_broker/check_linux.sh b/tests/automatic/consumer/next_multithread_broker/check_linux.sh
index 933166f64..b4a6c53c9 100644
--- a/tests/automatic/consumer/next_multithread_broker/check_linux.sh
+++ b/tests/automatic/consumer/next_multithread_broker/check_linux.sh
@@ -25,7 +25,7 @@ sleep 1
 
 for i in `seq 1 10`;
 do
-	echo 'db.data.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
+	echo 'db.data_default.insert({"_id":'$i',"size":100,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name}
 done
 
 $@ 127.0.0.1:8400 test_run 4 10 $token_test_run
diff --git a/tests/automatic/consumer/next_multithread_broker/check_windows.bat b/tests/automatic/consumer/next_multithread_broker/check_windows.bat
index 5a7c02e40..de24971c9 100644
--- a/tests/automatic/consumer/next_multithread_broker/check_windows.bat
+++ b/tests/automatic/consumer/next_multithread_broker/check_windows.bat
@@ -10,7 +10,7 @@ c:\opt\consul\nomad run nginx.nmd
 
 ping 1.0.0.0 -n 10 -w 100 > nul
 
-for /l %%x in (1, 1, 10) do echo db.data.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
+for /l %%x in (1, 1, 10) do echo db.data_default.insert({"_id":%%x,"size":100,"name":"%%x","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}}) | %mongo_exe% %database_name%  || goto :error
 
 
 %1 127.0.0.1:8400 test_run 4 10 %token_test_run% || goto :error
diff --git a/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh b/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh
index 9f6d303b2..45fbc528d 100644
--- a/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh
+++ b/tests/automatic/high_avail/receiver_mongo_restart/check_linux.sh
@@ -83,12 +83,12 @@ start_mongo
 
 wait
 
-echo "db.data.validate(true)" | mongo --port 27016 ${beamtime_id}_detector
+echo "db.data_default.validate(true)" | mongo --port 27016 ${beamtime_id}_detector
 
 echo processed files:
-echo "db.data.count()" | mongo --port 27016 ${beamtime_id}_detector
+echo "db.data_default.count()" | mongo --port 27016 ${beamtime_id}_detector
 
 
-echo "db.data.count()" | mongo --port 27016 ${beamtime_id}_detector | grep $nfiles
+echo "db.data_default.count()" | mongo --port 27016 ${beamtime_id}_detector | grep $nfiles
 
 
diff --git a/tests/automatic/mongo_db/connect/connect_mongodb.cpp b/tests/automatic/mongo_db/connect/connect_mongodb.cpp
index eb928e05f..9c156098c 100644
--- a/tests/automatic/mongo_db/connect/connect_mongodb.cpp
+++ b/tests/automatic/mongo_db/connect/connect_mongodb.cpp
@@ -39,11 +39,11 @@ int main(int argc, char* argv[]) {
     auto args = GetArgs(argc, argv);
     asapo::MongoDBClient db;
 
-    auto err = db.Connect(args.address, args.database_name, args.collection_name);
+    auto err = db.Connect(args.address, args.database_name);
     Assert(err, args.keyword);
 
     if (err == nullptr) {
-        err = db.Connect(args.address, args.database_name, args.collection_name);
+        err = db.Connect(args.address, args.database_name);
         Assert(err, asapo::DBErrorTemplates::kAlreadyConnected.Generate()->Explain());
     }
     return 0;
diff --git a/tests/automatic/mongo_db/insert/insert_mongodb.cpp b/tests/automatic/mongo_db/insert/insert_mongodb.cpp
index 5922fb935..87a525270 100644
--- a/tests/automatic/mongo_db/insert/insert_mongodb.cpp
+++ b/tests/automatic/mongo_db/insert/insert_mongodb.cpp
@@ -46,14 +46,14 @@ int main(int argc, char* argv[]) {
     fi.source = "host:1234";
 
     if (args.keyword != "Notconnected") {
-        db.Connect("127.0.0.1", "data", "test");
+        db.Connect("127.0.0.1", "data");
     }
 
-    auto err = db.Insert(fi, false);
+    auto err = db.Insert("test", fi, false);
 
     if (args.keyword == "DuplicateID") {
         Assert(err, "OK");
-        err = db.Insert(fi, false);
+        err = db.Insert("test", fi, false);
     }
 
     Assert(err, args.keyword);
diff --git a/tests/automatic/mongo_db/insert_dataset/insert_dataset_mongodb.cpp b/tests/automatic/mongo_db/insert_dataset/insert_dataset_mongodb.cpp
index c15b83406..eea1a1a84 100644
--- a/tests/automatic/mongo_db/insert_dataset/insert_dataset_mongodb.cpp
+++ b/tests/automatic/mongo_db/insert_dataset/insert_dataset_mongodb.cpp
@@ -49,18 +49,18 @@ int main(int argc, char* argv[]) {
     uint64_t subset_size = 2;
 
     if (args.keyword != "Notconnected") {
-        db.Connect("127.0.0.1", "data", "test");
+        db.Connect("127.0.0.1", "data");
     }
 
-    auto err =  db.InsertAsSubset(fi, subset_id, subset_size, true);
+    auto err =  db.InsertAsSubset("test", fi, subset_id, subset_size, true);
 
 
     if (args.keyword == "DuplicateID") {
         Assert(err, "OK");
         fi.id = 2;
-        err =  db.InsertAsSubset(fi, subset_id, subset_size, true);
+        err =  db.InsertAsSubset("test", fi, subset_id, subset_size, true);
 //        Assert(err, "OK");
-        err =  db.InsertAsSubset(fi, subset_id, subset_size, false);
+        err =  db.InsertAsSubset("test", fi, subset_id, subset_size, false);
     }
 
     Assert(err, args.keyword);
diff --git a/tests/automatic/mongo_db/upsert/upsert_mongodb.cpp b/tests/automatic/mongo_db/upsert/upsert_mongodb.cpp
index 6855c88f7..89ec2de53 100644
--- a/tests/automatic/mongo_db/upsert/upsert_mongodb.cpp
+++ b/tests/automatic/mongo_db/upsert/upsert_mongodb.cpp
@@ -45,17 +45,17 @@ int main(int argc, char* argv[]) {
 
 
     if (args.keyword != "Notconnected") {
-        db.Connect("127.0.0.1", "test", "meta");
+        db.Connect("127.0.0.1", "test");
     }
 
-    auto err = db.Upsert(0, reinterpret_cast<const uint8_t*>(json.c_str()), json.size());
+    auto err = db.Upsert("meta", 0, reinterpret_cast<const uint8_t*>(json.c_str()), json.size());
     if (err) {
         std::cout << err->Explain() << std::endl;
     }
 
     Assert(err, args.keyword);
 
-    err = db.Upsert(0, reinterpret_cast<const uint8_t*>(json.c_str()), json.size());
+    err = db.Upsert("meta", 0, reinterpret_cast<const uint8_t*>(json.c_str()), json.size());
     if (err) {
         std::cout << err->Explain() << std::endl;
     }
diff --git a/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh
index d1545ecb5..27dc92b86 100644
--- a/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh
+++ b/tests/automatic/producer_receiver/transfer_datasets/check_linux.sh
@@ -44,4 +44,4 @@ ls -ln ${receiver_folder}/1_1 | awk '{ print $5 }'| grep 100000
 ls -ln ${receiver_folder}/1_2 | awk '{ print $5 }'| grep 100000
 ls -ln ${receiver_folder}/1_3 | awk '{ print $5 }'| grep 100000
 
-echo 'db.data.find({"images._id":{$gt:0}},{"images.name":1})' | mongo asapo_test_detector | grep 1_1 | grep 1_2 | grep 1_3
\ No newline at end of file
+echo 'db.data_default.find({"images._id":{$gt:0}},{"images.name":1})' | mongo asapo_test_detector | grep 1_1 | grep 1_2 | grep 1_3
\ No newline at end of file
diff --git a/tests/automatic/producer_receiver/transfer_datasets/check_windows.bat b/tests/automatic/producer_receiver/transfer_datasets/check_windows.bat
index 9a7ca0e1e..28016c93d 100644
--- a/tests/automatic/producer_receiver/transfer_datasets/check_windows.bat
+++ b/tests/automatic/producer_receiver/transfer_datasets/check_windows.bat
@@ -31,9 +31,9 @@ FOR /F "usebackq" %%A IN ('%receiver_folder%\1_3') DO set size=%%~zA
 if %size% NEQ 100000 goto :error
 
 
-echo db.data.find({"images._id":{$gt:0}},{"images.name":1}) | %mongo_exe% %beamtime_id%_detector | findstr 1_1  || goto :error
-echo db.data.find({"images._id":{$gt:0}},{"images.name":1}) | %mongo_exe% %beamtime_id%_detector | findstr 1_2  || goto :error
-echo db.data.find({"images._id":{$gt:0}},{"images.name":1}) | %mongo_exe% %beamtime_id%_detector | findstr 1_3  || goto :error
+echo db.data_default.find({"images._id":{$gt:0}},{"images.name":1}) | %mongo_exe% %beamtime_id%_detector | findstr 1_1  || goto :error
+echo db.data_default.find({"images._id":{$gt:0}},{"images.name":1}) | %mongo_exe% %beamtime_id%_detector | findstr 1_2  || goto :error
+echo db.data_default.find({"images._id":{$gt:0}},{"images.name":1}) | %mongo_exe% %beamtime_id%_detector | findstr 1_3  || goto :error
 
 goto :clean
 
diff --git a/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_linux.sh
index d31d7d1ef..59e9600aa 100644
--- a/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_linux.sh
+++ b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_linux.sh
@@ -40,7 +40,7 @@ sleep 1
 
 $1 localhost:8400 ${beamtime_id} 60000 1 1  0 30
 
-echo "db.data.find({"_id":1})" | mongo ${beamtime_id}_detector  > out
+echo "db.data_default.find({"_id":1})" | mongo ${beamtime_id}_detector  > out
 cat out
 cat out | grep '"buf_id" : 0'
 cat out | grep user_meta
diff --git a/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_windows.bat b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_windows.bat
index 2a4aaf721..2a9fe358f 100644
--- a/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_windows.bat
+++ b/tests/automatic/producer_receiver/transfer_single_file_bypass_buffer/check_windows.bat
@@ -24,7 +24,7 @@ ping 1.0.0.0 -n 1 -w 100 > nul
 FOR /F "usebackq" %%A IN ('%receiver_folder%\1') DO set size=%%~zA
 if %size% NEQ 60000000 goto :error
 
-echo db.data.find({"_id":1}) |  %mongo_exe% %beamtime_id%_detector  > out
+echo db.data_default.find({"_id":1}) |  %mongo_exe% %beamtime_id%_detector  > out
 type out
 type out | findstr /c:"\"buf_id\" : 0" || goto :error
 type out | findstr /c:user_meta || goto :error
-- 
GitLab