From 85e2141bac72bd83ac8e3b2c0ce7b71f1d9453ed Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 16 Jun 2021 13:43:29 +0200
Subject: [PATCH] add python functions, add tests

---
 common/cpp/include/asapo/common/networking.h  |   1 +
 common/cpp/src/database/mongodb_client.cpp    |  34 ++---
 producer/api/cpp/src/producer_impl.cpp        |   8 +-
 .../api/cpp/unittests/test_producer_impl.cpp  |  37 ++++-
 producer/api/python/asapo_producer.pxd        |   3 +-
 producer/api/python/asapo_producer.pyx.in     |  35 +++++
 receiver/CMakeLists.txt                       |   2 +
 .../src/request_handler/request_factory.cpp   |   4 +
 .../src/request_handler/request_factory.h     |   2 +
 .../request_handler_db_get_meta.cpp           |  38 ++++++
 .../request_handler_db_get_meta.h             |  18 +++
 .../request_handler/test_request_factory.cpp  |  12 ++
 .../test_request_handler_db_get_meta.cpp      | 128 ++++++++++++++++++
 .../consumer/consumer_api/CMakeLists.txt      |   3 +-
 .../automatic/mongo_db/meta/meta_mongodb.cpp  |   5 +-
 .../automatic/producer/cpp_api/CMakeLists.txt |   3 +-
 .../automatic/producer/cpp_api/check_linux.sh |   2 +-
 .../producer/cpp_api/producer_api.cpp         |  25 +++-
 .../producer/python_api/check_linux.sh        |  13 +-
 .../producer/python_api/check_windows.bat     |   2 +-
 .../producer/python_api/producer_api.py       |  13 ++
 21 files changed, 346 insertions(+), 42 deletions(-)
 create mode 100644 receiver/src/request_handler/request_handler_db_get_meta.cpp
 create mode 100644 receiver/src/request_handler/request_handler_db_get_meta.h
 create mode 100644 receiver/unittests/request_handler/test_request_handler_db_get_meta.cpp

diff --git a/common/cpp/include/asapo/common/networking.h b/common/cpp/include/asapo/common/networking.h
index d49d83c32..ca2c2f8e8 100644
--- a/common/cpp/include/asapo/common/networking.h
+++ b/common/cpp/include/asapo/common/networking.h
@@ -30,6 +30,7 @@ enum Opcode : uint8_t {
     kOpcodeAuthorize,
     kOpcodeTransferMetaData,
     kOpcodeDeleteStream,
+    kOpcodeGetMeta,
     kOpcodeCount,
 };
 
diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp
index f264d7db8..d4e1de03f 100644
--- a/common/cpp/src/database/mongodb_client.cpp
+++ b/common/cpp/src/database/mongodb_client.cpp
@@ -203,6 +203,17 @@ Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_dupl
     return nullptr;
 }
 
+bool documentWasChanged(bson_t* reply) {
+    bson_iter_t iter;
+    bson_iter_init_find(&iter, reply, "upsertedCount");
+    auto n_upsert = bson_iter_int32(&iter);
+    bson_iter_init_find(&iter, reply, "modifiedCount");
+    auto n_mod = bson_iter_int32(&iter);
+    bson_iter_init_find(&iter, reply, "matchedCount");
+    auto n_matched = bson_iter_int32(&iter);
+    return n_mod + n_upsert + n_matched > 0;
+}
+
 Error MongoDBClient::ReplaceBsonDocument(const std::string& id, const bson_p& document, bool upsert) const {
     bson_error_t mongo_err;
 
@@ -210,20 +221,13 @@ Error MongoDBClient::ReplaceBsonDocument(const std::string& id, const bson_p& do
     bson_t* selector = BCON_NEW ("_id", BCON_UTF8(id.c_str()));
     bson_t reply;
     Error err = nullptr;
-    bson_iter_t iter;
 
     if (!mongoc_collection_replace_one(current_collection_, selector, document.get(), opts, &reply, &mongo_err)) {
         err = DBErrorTemplates::kInsertError.Generate(mongo_err.message);
     }
 
-    if (err == nullptr) {
-        bson_iter_init_find(&iter, &reply, "upsertedCount");
-        auto n_upsert = bson_iter_int32(&iter);
-        bson_iter_init_find(&iter, &reply, "modifiedCount");
-        auto n_mod = bson_iter_int32(&iter);
-        if (n_mod + n_upsert != 1) {
-            err = DBErrorTemplates::kInsertError.Generate("metadata does not exist");
-        }
+    if (err == nullptr && !documentWasChanged(&reply)) {
+        err = DBErrorTemplates::kWrongInput.Generate("cannot replace: metadata does not exist");
     }
 
     bson_free(opts);
@@ -242,20 +246,12 @@ Error MongoDBClient::UpdateBsonDocument(const std::string& id, const bson_p& doc
 
     bson_t reply;
     Error err = nullptr;
-    bson_iter_t iter;
-
     if (!mongoc_collection_update_one(current_collection_, selector, update, opts, &reply, &mongo_err)) {
         err = DBErrorTemplates::kInsertError.Generate(mongo_err.message);
     }
 
-    if (err == nullptr) {
-        bson_iter_init_find(&iter, &reply, "upsertedCount");
-        auto n_upsert = bson_iter_int32(&iter);
-        bson_iter_init_find(&iter, &reply, "modifiedCount");
-        auto n_mod = bson_iter_int32(&iter);
-        if (n_mod + n_upsert != 1) {
-            err = DBErrorTemplates::kInsertError.Generate("metadata does not exist");
-        }
+    if (err == nullptr && !documentWasChanged(&reply)) {
+        err = DBErrorTemplates::kWrongInput.Generate("cannot update: metadata does not exist");
     }
 
     bson_free(opts);
diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp
index 3409b3d27..a0f2c7477 100644
--- a/producer/api/cpp/src/producer_impl.cpp
+++ b/producer/api/cpp/src/producer_impl.cpp
@@ -507,7 +507,13 @@ std::string ProducerImpl::GetBeamtimeMeta(uint64_t timeout_ms, Error* err) const
 }
 
 std::string ProducerImpl::GetMeta(const std::string& stream, uint64_t timeout_ms, Error* err) const {
-    return std::string();
+    auto header =  GenericRequestHeader{kOpcodeGetMeta, 0, 0, 0, "", stream};
+    auto response = BlockingRequest(std::move(header), timeout_ms, err);
+    if (*err) {
+        return "";
+    }
+    *err = nullptr;
+    return response;
 }
 
 }
\ No newline at end of file
diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp
index 1fc2dea86..216ac1a73 100644
--- a/producer/api/cpp/unittests/test_producer_impl.cpp
+++ b/producer/api/cpp/unittests/test_producer_impl.cpp
@@ -501,7 +501,7 @@ TEST_F(ProducerImplTests, WaitRequestsFinished) {
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout));
 }
 
-MATCHER_P3(M_CheckGetStreamInfoRequest, op_code, source_credentials, stream,
+MATCHER_P3(M_CheckGetRequest, op_code, source_credentials, stream,
            "Checks if a valid GenericRequestHeader was Send") {
     auto request = static_cast<ProducerRequest*>(arg);
     return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code
@@ -509,9 +509,9 @@ MATCHER_P3(M_CheckGetStreamInfoRequest, op_code, source_credentials, stream,
            && strcmp(((asapo::GenericRequestHeader) (arg->header)).stream, stream) == 0;
 }
 
-TEST_F(ProducerImplTests, GetStreamInfoMakesCorerctRequest) {
+TEST_F(ProducerImplTests, GetStreamInfoMakesCorrectRequest) {
     producer.SetCredentials(expected_credentials);
-    EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetStreamInfoRequest(asapo::kOpcodeStreamInfo,
+    EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeStreamInfo,
                                         expected_credentials_str,
                                         expected_stream), true)).WillOnce(
                                             Return(nullptr));
@@ -537,9 +537,9 @@ TEST(GetStreamInfoTest, GetStreamInfoTimeout) {
     ASSERT_THAT(err->Explain(), HasSubstr("opcode: 4"));
 }
 
-TEST_F(ProducerImplTests, GetLastStreamMakesCorerctRequest) {
+TEST_F(ProducerImplTests, GetLastStreamMakesCorrectRequest) {
     producer.SetCredentials(expected_credentials);
-    EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetStreamInfoRequest(asapo::kOpcodeLastStream,
+    EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeLastStream,
                                         expected_credentials_str,
                                         ""), true)).WillOnce(
                                             Return(nullptr));
@@ -605,7 +605,7 @@ MATCHER_P4(M_CheckDeleteStreamRequest, op_code, source_credentials, stream, flag
            && strcmp(((asapo::GenericRequestHeader) (arg->header)).stream, stream) == 0;
 }
 
-TEST_F(ProducerImplTests, DeleteStreamMakesCorerctRequest) {
+TEST_F(ProducerImplTests, DeleteStreamMakesCorrectRequest) {
     producer.SetCredentials(expected_credentials);
     asapo::DeleteStreamOptions expected_options{};
     expected_options.delete_meta = true;
@@ -622,4 +622,29 @@ TEST_F(ProducerImplTests, DeleteStreamMakesCorerctRequest) {
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout));
 }
 
+TEST_F(ProducerImplTests, GetStreamMetaMakesCorrectRequest) {
+    producer.SetCredentials(expected_credentials);
+    EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeGetMeta,
+                                        expected_credentials_str,
+                                        expected_stream), true)).WillOnce(
+                                            Return(nullptr));
+
+    asapo::Error err;
+    producer.GetStreamMeta(expected_stream, 1000, &err);
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout));
+}
+
+
+TEST_F(ProducerImplTests, GetBeamtimeMetaMakesCorrectRequest) {
+    producer.SetCredentials(expected_credentials);
+    EXPECT_CALL(mock_pull, AddRequest_t(M_CheckGetRequest(asapo::kOpcodeGetMeta,
+                                        expected_credentials_str,
+                                        ""), true)).WillOnce(
+                                            Return(nullptr));
+
+    asapo::Error err;
+    producer.GetBeamtimeMeta(1000, &err);
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kTimeout));
+}
+
 }
diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd
index 421e41f20..e50f762bc 100644
--- a/producer/api/python/asapo_producer.pxd
+++ b/producer/api/python/asapo_producer.pxd
@@ -121,7 +121,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil:
         Error DeleteStream(string stream, uint64_t timeout_ms, DeleteStreamOptions options)
         Error SendBeamtimeMetadata(string metadata, MetaIngestMode mode, RequestCallback callback)
         Error SendStreamMetadata(string metadata, MetaIngestMode mode, string stream, RequestCallback callback)
-
+        string GetStreamMeta(string stream, uint64_t timeout_ms, Error* err)
+        string GetBeamtimeMeta(uint64_t timeout_ms, Error* err)
 
 cdef extern from "asapo/asapo_producer.h" namespace "asapo":
     uint64_t kDefaultIngestMode
diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index 218358a9a..f681fd636 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -300,6 +300,41 @@ cdef class PyProducer:
             err = self.c_producer.get().DeleteStream(b_stream,timeout_ms,opts)
         if err:
             throw_exception(err)
+    def get_stream_meta(self,stream = 'default', uint64_t timeout_ms = 1000):
+        """
+         :param stream: stream name
+         :type stream: string
+         :param timeout_ms: timeout in milliseconds
+         :type timeout_ms: int
+         :raises:
+            AsapoWrongInputError: wrong input (authorization, ...)
+            AsapoTimeoutError: request not finished for a given timeout
+            AsapoProducerError: other errors
+        """
+        cdef Error err
+        cdef string res
+        cdef string b_stream = _bytes(stream)
+        with nogil:
+            res = self.c_producer.get().GetStreamMeta(b_stream,timeout_ms,&err)
+        if err:
+            throw_exception(err)
+        return json.loads(_str(res) or 'null')
+    def get_beamtime_meta(self, uint64_t timeout_ms = 1000):
+        """
+         :param timeout_ms: timeout in milliseconds
+         :type timeout_ms: int
+         :raises:
+            AsapoWrongInputError: wrong input (authorization, ...)
+            AsapoTimeoutError: request not finished for a given timeout
+            AsapoProducerError: other errors
+        """
+        cdef Error err
+        cdef string res
+        with nogil:
+            res = self.c_producer.get().GetBeamtimeMeta(timeout_ms,&err)
+        if err:
+            throw_exception(err)
+        return json.loads(_str(res) or 'null')
     def stream_info(self, stream = 'default', uint64_t timeout_ms = 1000):
         """
          :param stream: stream name
diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt
index 4a18b1896..b743d6131 100644
--- a/receiver/CMakeLists.txt
+++ b/receiver/CMakeLists.txt
@@ -22,6 +22,7 @@ set(RECEIVER_CORE_FILES
         src/request_handler/request_handler_receive_metadata.cpp
         src/request_handler/request_handler_db_check_request.cpp
         src/request_handler/request_handler_db_delete_stream.cpp
+        src/request_handler/request_handler_db_get_meta.cpp
         src/request_handler/request_factory.cpp
         src/request_handler/request_handler_db.cpp
         src/file_processors/write_file_processor.cpp
@@ -99,6 +100,7 @@ set(TEST_SOURCE_FILES
         unittests/request_handler/test_request_handler_receive_data.cpp
         unittests/request_handler/test_request_handler_receive_metadata.cpp
         unittests/request_handler/test_request_handler_delete_stream.cpp
+        unittests/request_handler/test_request_handler_db_get_meta.cpp
         unittests/statistics/test_statistics_sender_influx_db.cpp
         unittests/statistics/test_statistics_sender_fluentd.cpp
         unittests/mock_receiver_config.cpp
diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp
index 4279f485a..39a273707 100644
--- a/receiver/src/request_handler/request_factory.cpp
+++ b/receiver/src/request_handler/request_factory.cpp
@@ -91,6 +91,10 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request>& request,
         request->AddHandler(&request_handler_db_last_stream_);
         break;
     }
+    case Opcode::kOpcodeGetMeta: {
+        request->AddHandler(&request_handler_db_get_meta_);
+        break;
+    }
     default:
         return ReceiverErrorTemplates::kInvalidOpCode.Generate();
     }
diff --git a/receiver/src/request_handler/request_factory.h b/receiver/src/request_handler/request_factory.h
index ee371d5ac..a44d7efaf 100644
--- a/receiver/src/request_handler/request_factory.h
+++ b/receiver/src/request_handler/request_factory.h
@@ -7,6 +7,7 @@
 #include "request_handler_db_stream_info.h"
 #include "request_handler_db_last_stream.h"
 #include "request_handler_db_delete_stream.h"
+#include "request_handler_db_get_meta.h"
 
 namespace asapo {
 
@@ -29,6 +30,7 @@ class RequestFactory {
     RequestHandlerDbDeleteStream request_handler_delete_stream_{kDBDataCollectionNamePrefix};
     RequestHandlerDbLastStream request_handler_db_last_stream_{kDBDataCollectionNamePrefix};
     RequestHandlerDbMetaWrite request_handler_db_meta_write_{kDBMetaCollectionName};
+    RequestHandlerDbGetMeta request_handler_db_get_meta_{kDBMetaCollectionName};
     RequestHandlerAuthorize request_handler_authorize_;
     RequestHandlerDbCheckRequest request_handler_db_check_{kDBDataCollectionNamePrefix};;
     SharedCache cache_;
diff --git a/receiver/src/request_handler/request_handler_db_get_meta.cpp b/receiver/src/request_handler/request_handler_db_get_meta.cpp
new file mode 100644
index 000000000..b046aa119
--- /dev/null
+++ b/receiver/src/request_handler/request_handler_db_get_meta.cpp
@@ -0,0 +1,38 @@
+#include "request_handler_db_get_meta.h"
+#include "../receiver_config.h"
+#include <asapo/database/db_error.h>
+
+namespace asapo {
+
+RequestHandlerDbGetMeta::RequestHandlerDbGetMeta(std::string collection_name_prefix) : RequestHandlerDb(
+        std::move(collection_name_prefix)) {
+}
+
+Error RequestHandlerDbGetMeta::ProcessRequest(Request* request) const {
+    if (auto err = RequestHandlerDb::ProcessRequest(request) ) {
+        return err;
+    }
+
+    auto stream_name = request->GetStream();
+
+    std::string metaid = stream_name.empty() ? "bt" : "st_" + stream_name;
+    std::string meta;
+    auto err =  db_client__->GetMetaFromDb(kDBMetaCollectionName, metaid, &meta);
+
+    bool no_error = err == nullptr;
+    if (err == DBErrorTemplates::kNoRecord) {
+        no_error = true;
+    }
+
+    if (no_error) {
+        log__->Debug(std::string{"get meta for "} + (stream_name.empty() ? "beamtime" : stream_name) + " in " +
+                     db_name_ + " at " + GetReceiverConfig()->database_uri);
+        request->SetResponseMessage(meta, ResponseMessageType::kInfo);
+        return nullptr;
+    }
+
+    return DBErrorToReceiverError(err);
+}
+
+
+}
\ No newline at end of file
diff --git a/receiver/src/request_handler/request_handler_db_get_meta.h b/receiver/src/request_handler/request_handler_db_get_meta.h
new file mode 100644
index 000000000..faeddc5f6
--- /dev/null
+++ b/receiver/src/request_handler/request_handler_db_get_meta.h
@@ -0,0 +1,18 @@
+#include "request_handler_db.h"
+#include "../request.h"
+
+#ifndef ASAPO_RECEIVER_SRC_REQUEST_HANDLER_REQUEST_HANDLER_DB_GET_META_H_
+#define ASAPO_RECEIVER_SRC_REQUEST_HANDLER_REQUEST_HANDLER_DB_GET_META_H_
+
+namespace asapo {
+
+class RequestHandlerDbGetMeta final: public RequestHandlerDb {
+  public:
+    RequestHandlerDbGetMeta(std::string collection_name_prefix);
+    Error ProcessRequest(Request* request) const override;
+};
+
+}
+
+
+#endif //ASAPO_RECEIVER_SRC_REQUEST_HANDLER_REQUEST_HANDLER_DB_GET_META_H_
diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp
index 03d8bf449..64029d2ee 100644
--- a/receiver/unittests/request_handler/test_request_factory.cpp
+++ b/receiver/unittests/request_handler/test_request_factory.cpp
@@ -16,6 +16,7 @@
 #include "../../src/request_handler/request_handler_db_stream_info.h"
 #include "../../src/request_handler/request_handler_db_last_stream.h"
 #include "../../src/request_handler/request_handler_db_delete_stream.h"
+#include "../../src/request_handler/request_handler_db_get_meta.h"
 
 #include "../../src/request_handler/request_handler_receive_data.h"
 #include "../../src/request_handler/request_handler_receive_metadata.h"
@@ -225,5 +226,16 @@ TEST_F(FactoryTests, DeleteStreamRequest) {
     ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbDeleteStream*>(request->GetListHandlers()[1]), Ne(nullptr));
 }
 
+TEST_F(FactoryTests, GetMetamRequest) {
+    generic_request_header.op_code = asapo::Opcode::kOpcodeGetMeta;
+    auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err);
+    ASSERT_THAT(err, Eq(nullptr));
+    ASSERT_THAT(request->GetListHandlers().size(), Eq(2));
+    ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerAuthorize*>(request->GetListHandlers()[0]), Ne(nullptr));
+    ASSERT_THAT(dynamic_cast<const asapo::RequestHandlerDbGetMeta*>(request->GetListHandlers()[1]), Ne(nullptr));
+}
+
+
+
 
 }
diff --git a/receiver/unittests/request_handler/test_request_handler_db_get_meta.cpp b/receiver/unittests/request_handler/test_request_handler_db_get_meta.cpp
new file mode 100644
index 000000000..bfb4a89e5
--- /dev/null
+++ b/receiver/unittests/request_handler/test_request_handler_db_get_meta.cpp
@@ -0,0 +1,128 @@
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+#include <asapo/database/db_error.h>
+
+#include "asapo/unittests/MockIO.h"
+#include "asapo/unittests/MockDatabase.h"
+#include "asapo/unittests/MockLogger.h"
+
+#include "../../src/receiver_error.h"
+#include "../../src/request.h"
+#include "../../src/request_handler/request_factory.h"
+#include "../../src/request_handler/request_handler.h"
+#include "../../src/request_handler/request_handler_db_get_meta.h"
+#include "../../../common/cpp/src/database/mongodb_client.h"
+
+#include "../mock_receiver_config.h"
+#include "asapo/common/data_structs.h"
+#include "asapo/common/networking.h"
+#include "../receiver_mocking.h"
+
+using asapo::MockRequest;
+using asapo::MessageMeta;
+using ::testing::Test;
+using ::testing::Return;
+using ::testing::ReturnRef;
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::SetArgReferee;
+using ::testing::Gt;
+using ::testing::Eq;
+using ::testing::Ne;
+using ::testing::Mock;
+using ::testing::NiceMock;
+using ::testing::InSequence;
+using ::testing::SetArgPointee;
+using ::testing::AllOf;
+using ::testing::HasSubstr;
+
+
+using ::asapo::Error;
+using ::asapo::ErrorInterface;
+using ::asapo::FileDescriptor;
+using ::asapo::SocketDescriptor;
+using ::asapo::MockIO;
+using asapo::Request;
+using asapo::RequestHandlerDbGetMeta;
+using ::asapo::GenericRequestHeader;
+
+using asapo::MockDatabase;
+using asapo::RequestFactory;
+using asapo::SetReceiverConfig;
+using asapo::ReceiverConfig;
+
+
+namespace {
+
+class DbMetaGetMetaTests : public Test {
+  public:
+    RequestHandlerDbGetMeta handler{asapo::kDBDataCollectionNamePrefix};
+    std::unique_ptr<NiceMock<MockRequest>> mock_request;
+    NiceMock<MockDatabase> mock_db;
+    NiceMock<asapo::MockLogger> mock_logger;
+    ReceiverConfig config;
+    std::string expected_beamtime_id = "beamtime_id";
+    std::string expected_data_source = "source";
+    std::string expected_stream = "stream";
+    std::string expected_meta = "meta";
+    void SetUp() override {
+        GenericRequestHeader request_header;
+        handler.db_client__ = std::unique_ptr<asapo::Database> {&mock_db};
+        handler.log__ = &mock_logger;
+        mock_request.reset(new NiceMock<MockRequest> {request_header, 1, "", nullptr});
+        ON_CALL(*mock_request, GetBeamtimeId()).WillByDefault(ReturnRef(expected_beamtime_id));
+    }
+    void TearDown() override {
+        handler.db_client__.release();
+    }
+    void ExpectGet(bool stream, const asapo::DBErrorTemplate* errorTemplate) {
+        SetReceiverConfig(config, "none");
+        EXPECT_CALL(*mock_request, GetDataSource()).WillOnce(ReturnRef(expected_data_source));
+        if (stream) {
+            EXPECT_CALL(*mock_request, GetStream()).WillOnce(Return(expected_stream));
+        }
+
+        EXPECT_CALL(mock_db, Connect_t(config.database_uri, expected_beamtime_id + "_" + expected_data_source)).
+        WillOnce(testing::Return(nullptr));
+        EXPECT_CALL(mock_db, GetMetaFromDb_t("meta", stream ? "st_" + expected_stream : "bt", _)).
+        WillOnce(DoAll(
+                     SetArgPointee<2>(expected_meta),
+                     testing::Return(errorTemplate == nullptr ? nullptr : errorTemplate->Generate().release())
+                 ));
+        if (errorTemplate == nullptr) {
+            EXPECT_CALL(*mock_request, SetResponseMessage(expected_meta, asapo::ResponseMessageType::kInfo));
+            EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("meta"),
+                                                 HasSubstr(config.database_uri),
+                                                 HasSubstr(expected_data_source),
+                                                 HasSubstr(stream ? expected_stream : "beamtime"),
+                                                 HasSubstr(expected_beamtime_id)
+                                                )
+                                          )
+                       );
+        }
+
+    }
+};
+
+
+
+TEST_F(DbMetaGetMetaTests, GetBeamtimeMetaOk) {
+    ExpectGet(false, nullptr);
+    auto err = handler.ProcessRequest(mock_request.get());
+    ASSERT_THAT(err, Eq(nullptr));
+}
+
+
+TEST_F(DbMetaGetMetaTests, GetStreamMetaOk) {
+    ExpectGet(true, nullptr);
+    auto err = handler.ProcessRequest(mock_request.get());
+    ASSERT_THAT(err, Eq(nullptr));
+}
+
+TEST_F(DbMetaGetMetaTests, GetStreamMetaError) {
+    ExpectGet(true, &asapo::DBErrorTemplates::kDBError);
+    auto err = handler.ProcessRequest(mock_request.get());
+    ASSERT_THAT(err, Ne(nullptr));
+}
+
+}
diff --git a/tests/automatic/consumer/consumer_api/CMakeLists.txt b/tests/automatic/consumer/consumer_api/CMakeLists.txt
index d7f451aa2..59dcf868a 100644
--- a/tests/automatic/consumer/consumer_api/CMakeLists.txt
+++ b/tests/automatic/consumer/consumer_api/CMakeLists.txt
@@ -11,6 +11,5 @@ target_link_libraries(${TARGET_NAME} test_common asapo-consumer)
 ################################
 # Testing
 ################################
-add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>"
-        )
+add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>")
 
diff --git a/tests/automatic/mongo_db/meta/meta_mongodb.cpp b/tests/automatic/mongo_db/meta/meta_mongodb.cpp
index 1aaf184a3..6a9f07b0c 100644
--- a/tests/automatic/mongo_db/meta/meta_mongodb.cpp
+++ b/tests/automatic/mongo_db/meta/meta_mongodb.cpp
@@ -75,7 +75,7 @@ int main(int argc, char* argv[]) {
             db.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
         M_AssertEq(nullptr, err);
         err = db.InsertMeta("meta", "notexist", reinterpret_cast<const uint8_t*>(meta.c_str()), meta.size(), mode);
-        M_AssertTrue(err == asapo::DBErrorTemplates::kInsertError);
+        M_AssertTrue(err == asapo::DBErrorTemplates::kWrongInput);
 
         std::string meta_res;
         err = db.GetMetaFromDb("meta", "0", &meta_res);
@@ -87,6 +87,9 @@ int main(int argc, char* argv[]) {
         mode.op = asapo::MetaIngestOp::kUpdate;
         err = db.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(mod_meta.c_str()), mod_meta.size(), mode);
         M_AssertEq(nullptr, err);
+        err = db.InsertMeta("meta", stream_name, reinterpret_cast<const uint8_t*>(mod_meta.c_str()), mod_meta.size(), mode);
+        M_AssertEq(nullptr, err);
+
         err = db.GetMetaFromDb("meta", stream_name, &meta_res);
         M_AssertEq(nullptr, err);
         M_AssertEq(expected_meta, meta_res);
diff --git a/tests/automatic/producer/cpp_api/CMakeLists.txt b/tests/automatic/producer/cpp_api/CMakeLists.txt
index 0e19465b1..e4143f91e 100644
--- a/tests/automatic/producer/cpp_api/CMakeLists.txt
+++ b/tests/automatic/producer/cpp_api/CMakeLists.txt
@@ -11,6 +11,5 @@ target_link_libraries(${TARGET_NAME} test_common asapo-producer)
 ################################
 # Testing
 ################################
-add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>"
-        )
+add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>")
 
diff --git a/tests/automatic/producer/cpp_api/check_linux.sh b/tests/automatic/producer/cpp_api/check_linux.sh
index b3c4f6f84..7d6d7dbc2 100644
--- a/tests/automatic/producer/cpp_api/check_linux.sh
+++ b/tests/automatic/producer/cpp_api/check_linux.sh
@@ -23,4 +23,4 @@ mkdir -p ${receiver_folder}
 
 echo test > file1
 
-$1 "127.0.0.1:8400" $data_source $beamtime_id
+$@ "127.0.0.1:8400" $data_source $beamtime_id
diff --git a/tests/automatic/producer/cpp_api/producer_api.cpp b/tests/automatic/producer/cpp_api/producer_api.cpp
index 2b687bc4d..d5e5982ab 100644
--- a/tests/automatic/producer/cpp_api/producer_api.cpp
+++ b/tests/automatic/producer/cpp_api/producer_api.cpp
@@ -23,6 +23,21 @@ Args GetArgs(int argc, char* argv[]) {
     return Args{server, source, beamtime};
 }
 
+void TestMeta(const std::unique_ptr<asapo::Producer>& producer) {
+    asapo::Error err;
+    std::string meta = R"({"data":"test","embedded":{"edata":2}})";
+    producer->SendBeamtimeMetadata(meta, asapo::MetaIngestMode{asapo::MetaIngestOp::kInsert, true}, nullptr);
+    producer->WaitRequestsFinished(5000);
+    auto meta_received = producer->GetBeamtimeMeta(5000, &err);
+    M_AssertTrue(meta_received == meta);
+    std::string meta_update = R"({"embedded":{"edata":3}})";
+    std::string meta_updated = R"({"data":"test","embedded":{"edata":3}})";
+    producer->SendBeamtimeMetadata(meta_update, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, false}, nullptr);
+    producer->WaitRequestsFinished(5000);
+    meta_received = producer->GetBeamtimeMeta(5000, &err);
+    M_AssertTrue(meta_received == meta_updated);
+}
+
 
 void Test(const std::unique_ptr<asapo::Producer>& producer) {
     asapo::MessageMeta fi;
@@ -37,9 +52,11 @@ void Test(const std::unique_ptr<asapo::Producer>& producer) {
     M_AssertTrue(!server.empty(), "server version");
 
 
-    producer->GetStreamInfo("default",5000,&err);
+    TestMeta(producer);
+
+    producer->GetStreamInfo("default", 5000, &err);
     if (err) {
-        printf("%s\n",err->Explain().c_str());
+        printf("%s\n", err->Explain().c_str());
     }
     M_AssertTrue(err == nullptr, "stream info");
 
@@ -51,7 +68,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
     auto producer = asapo::Producer::Create(args.server, 2,
                                             asapo::RequestHandlerType::kTcp,
                                             asapo::SourceCredentials{asapo::SourceType::kProcessed, args.beamtime,
-                                                                     "", args.source, ""}, 60000, &err);
+                                                    "", args.source, ""}, 60000, &err);
     if (err) {
         std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
         exit(EXIT_FAILURE);
@@ -66,7 +83,7 @@ std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
 void TestAll(const Args& args) {
     asapo::Error err;
     auto producer = CreateProducer(args);
-    if (producer==nullptr) {
+    if (producer == nullptr) {
         std::cout << "Error CreateProducer: " << err << std::endl;
         exit(EXIT_FAILURE);
     }
diff --git a/tests/automatic/producer/python_api/check_linux.sh b/tests/automatic/producer/python_api/check_linux.sh
index c86f6bd6a..51552b6eb 100644
--- a/tests/automatic/producer/python_api/check_linux.sh
+++ b/tests/automatic/producer/python_api/check_linux.sh
@@ -27,14 +27,19 @@ echo test > file1
 
 $1 $3 $data_source $beamtime_id  "127.0.0.1:8400" &> out || cat out
 cat out
-echo count successfully send, expect 15
-cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 15
+echo count successfully send, expect 17
+cat out | grep "successfuly sent" | wc -l | tee /dev/stderr | grep 17
+echo count wrong input, expect 11
+cat out | grep "wrong input" | wc -l | tee /dev/stderr | grep 11
+
+echo count wrong json, expect 2
+cat out | grep "JSON parse error" | wc -l | tee /dev/stderr | grep 2
 echo count same id, expect 4
 cat out | grep "already have record with same id" | wc -l | tee /dev/stderr | grep 4
 echo count duplicates, expect 6
 cat out | grep "duplicate" | wc -l | tee /dev/stderr | grep 6
-echo count data in callback, expect 3
-cat out | grep "'data':" | wc -l  | tee /dev/stderr | grep 3
+echo count data in callback, expect 6
+cat out | grep "'data':" | wc -l  | tee /dev/stderr | grep 6
 echo check found local io error
 cat out | grep "local i/o error"
 cat out | grep "Finished successfully"
diff --git a/tests/automatic/producer/python_api/check_windows.bat b/tests/automatic/producer/python_api/check_windows.bat
index 6dbbf441a..c854be878 100644
--- a/tests/automatic/producer/python_api/check_windows.bat
+++ b/tests/automatic/producer/python_api/check_windows.bat
@@ -18,7 +18,7 @@ set PYTHONPATH=%2
 type out
 set NUM=0
 for /F %%N in ('find /C "successfuly sent" ^< "out"') do set NUM=%%N
-echo %NUM% | findstr 15 || goto error
+echo %NUM% | findstr 17 || goto error
 
 for /F %%N in ('find /C "} wrong input: Bad request: already have record with same id" ^< "out"') do set NUM=%%N
 echo %NUM% | findstr 2 || goto error
diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py
index 64e30bc43..6f4bad158 100644
--- a/tests/automatic/producer/python_api/producer_api.py
+++ b/tests/automatic/producer/python_api/producer_api.py
@@ -143,6 +143,11 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_sou
                    user_meta='{"test_key1":"test_val"}',
                    ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_DATA | asapo_producer.INGEST_MODE_STORE_IN_FILESYSTEM,callback=callback)
 
+producer.send_beamtime_meta('{"data":"bt_meta"}', callback = callback)
+producer.send_stream_meta('{"data":"st_meta"}',stream = 'stream', callback = callback)
+producer.send_stream_meta('bla',stream = 'stream', callback = callback)
+
+
 producer.wait_requests_finished(50000)
 n = producer.get_requests_queue_size()
 assert_eq(n, 0, "requests in queue")
@@ -174,6 +179,14 @@ producer.send_stream_finished_flag("stream/test $", 2, next_stream = "next_strea
 producer.wait_requests_finished(10000)
 
 
+#check meta
+bt_meta = producer.get_beamtime_meta()
+stream_meta = producer.get_stream_meta(stream = 'stream')
+assert_eq(bt_meta['data'], 'bt_meta', "beamtime meta")
+assert_eq(stream_meta['data'], 'st_meta', "stream meta")
+no_meta = producer.get_stream_meta(stream = 'notexist')
+assert_eq(no_meta, None, "no meta")
+
 #stream infos
 info = producer.stream_info()
 assert_eq(info['lastId'], 10, "stream_info last id")
-- 
GitLab