From 5cab728b4380f97f36d98568da75b718e1a057be Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 21 Aug 2019 16:37:44 +0200
Subject: [PATCH] producer python api for send_file ready

---
 common/cpp/include/common/networking.h        |  9 +++
 .../api/cpp/include/producer/producer_error.h |  6 ++
 producer/api/cpp/src/producer_impl.cpp        |  8 +++
 producer/api/cpp/src/producer_request.cpp     |  9 ++-
 producer/api/cpp/src/producer_request.h       |  1 +
 producer/api/cpp/src/request_handler_tcp.cpp  | 15 ++---
 producer/api/cpp/unittests/test_producer.cpp  |  4 +-
 .../api/cpp/unittests/test_producer_impl.cpp  | 38 ++++++++++-
 .../test_request_handler_filesystem.cpp       |  4 ++
 .../unittests/test_request_handler_tcp.cpp    | 47 +++++++++++---
 producer/api/python/CMakeLists_Linux.cmake    |  4 +-
 producer/api/python/asapo_producer.pxd        | 27 ++++++--
 producer/api/python/asapo_producer.pyx.in     | 62 +++++++++++++-----
 producer/api/python/asapo_wrappers.h          | 23 +++----
 .../python_tests/producer/authorizer.json.tpl | 10 +++
 .../python_tests/producer/authorizer.nmd      | 55 ++++++++++++++++
 .../manual/python_tests/producer/clean_db.sh  |  6 ++
 .../python_tests/producer/discovery.json.tpl  | 10 +++
 .../python_tests/producer/discovery.nmd       | 49 +++++++++++++++
 tests/manual/python_tests/producer/file1      |  1 +
 .../python_tests/producer/nginx.conf.tpl      | 42 +++++++++++++
 tests/manual/python_tests/producer/nginx.nmd  | 63 +++++++++++++++++++
 .../python_tests/producer/receiver.json.tpl   | 22 +++++++
 .../manual/python_tests/producer/receiver.nmd | 47 ++++++++++++++
 tests/manual/python_tests/producer/run.sh     |  2 +
 .../python_tests/producer/start_services.sh   |  6 ++
 .../python_tests/producer/stop_services.sh    |  6 ++
 tests/manual/python_tests/producer/test.py    | 43 +++++++++----
 28 files changed, 551 insertions(+), 68 deletions(-)
 create mode 100644 tests/manual/python_tests/producer/authorizer.json.tpl
 create mode 100644 tests/manual/python_tests/producer/authorizer.nmd
 create mode 100755 tests/manual/python_tests/producer/clean_db.sh
 create mode 100644 tests/manual/python_tests/producer/discovery.json.tpl
 create mode 100644 tests/manual/python_tests/producer/discovery.nmd
 create mode 100644 tests/manual/python_tests/producer/file1
 create mode 100644 tests/manual/python_tests/producer/nginx.conf.tpl
 create mode 100644 tests/manual/python_tests/producer/nginx.nmd
 create mode 100644 tests/manual/python_tests/producer/receiver.json.tpl
 create mode 100644 tests/manual/python_tests/producer/receiver.nmd
 create mode 100755 tests/manual/python_tests/producer/start_services.sh
 create mode 100755 tests/manual/python_tests/producer/stop_services.sh

diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h
index d5a5edb59..9ce7f962e 100644
--- a/common/cpp/include/common/networking.h
+++ b/common/cpp/include/common/networking.h
@@ -61,8 +61,17 @@ struct GenericRequestHeader {
     uint64_t    meta_size;
     CustomRequestData    custom_data;
     char        message[kMaxMessageSize];
+    std::string Json() {
+        std::string s = "{\"id\":" + std::to_string(data_id) + ","
+                        "\"buffer\":\"" + std::string(message)+"\""
+            + "}";
+        return s;
+    };
+
 };
 
+
+
 struct GenericNetworkResponse {
     Opcode              op_code;
     NetworkErrorCode    error_code;
diff --git a/producer/api/cpp/include/producer/producer_error.h b/producer/api/cpp/include/producer/producer_error.h
index 856af4d5e..3ceded777 100644
--- a/producer/api/cpp/include/producer/producer_error.h
+++ b/producer/api/cpp/include/producer/producer_error.h
@@ -10,6 +10,7 @@ enum class ProducerErrorType {
     kConnectionNotReady,
     kFileTooLarge,
     kFileNameTooLong,
+    kEmptyFileName,
     kBeamtimeIdTooLong,
     kBeamtimeAlreadySet,
     kFileIdAlreadyInUse,
@@ -51,6 +52,11 @@ auto const kFileNameTooLong = ProducerErrorTemplate {
     "filename too long", ProducerErrorType::kFileNameTooLong
 };
 
+auto const kEmptyFileName = ProducerErrorTemplate {
+    "empty filename", ProducerErrorType::kEmptyFileName
+};
+
+
 auto const kCredentialsTooLong = ProducerErrorTemplate {
     "beamtime id too long", ProducerErrorType::kBeamtimeIdTooLong
 };
diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp
index ba82eb7b1..78d5c3fb3 100644
--- a/producer/api/cpp/src/producer_impl.cpp
+++ b/producer/api/cpp/src/producer_impl.cpp
@@ -66,6 +66,10 @@ Error CheckProducerRequest(const EventHeader& event_header, uint64_t injest_mode
         return ProducerErrorTemplates::kFileNameTooLong.Generate();
     }
 
+    if (event_header.file_name.empty() ) {
+        return ProducerErrorTemplates::kEmptyFileName.Generate();
+    }
+
     if (event_header.subset_id > 0 && event_header.subset_size == 0) {
         return ProducerErrorTemplates::kErrorSubsetSize.Generate();
     }
@@ -101,6 +105,10 @@ Error ProducerImpl::SendData(const EventHeader& event_header, FileData data,
 
 Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, uint64_t injest_mode,
                              RequestCallback callback) {
+    if (full_path.empty()) {
+        return ProducerErrorTemplates::kEmptyFileName.Generate();
+    }
+
     return Send(event_header, nullptr, std::move(full_path), injest_mode, callback);
 }
 
diff --git a/producer/api/cpp/src/producer_request.cpp b/producer/api/cpp/src/producer_request.cpp
index 734e3fbc6..6a7857f33 100644
--- a/producer/api/cpp/src/producer_request.cpp
+++ b/producer/api/cpp/src/producer_request.cpp
@@ -3,7 +3,7 @@
 namespace asapo {
 
 Error ProducerRequest::ReadDataFromFileIfNeeded(const IO* io) {
-    if (data != nullptr || original_filepath.empty()) {
+    if (data != nullptr || original_filepath.empty() || !NeedSendData()) {
         return nullptr;
     }
     Error err;
@@ -24,4 +24,11 @@ ProducerRequest::ProducerRequest(std::string source_credentials,
     callback{callback} {
 }
 
+bool ProducerRequest::NeedSendData() const {
+    if (header.op_code == kOpcodeTransferData || header.op_code == kOpcodeTransferSubsetData) {
+        return header.custom_data[kPosInjestMode] & IngestModeFlags::kTransferData;
+    }
+    return true;
+}
+
 }
\ No newline at end of file
diff --git a/producer/api/cpp/src/producer_request.h b/producer/api/cpp/src/producer_request.h
index 28beafab7..dba048b34 100644
--- a/producer/api/cpp/src/producer_request.h
+++ b/producer/api/cpp/src/producer_request.h
@@ -21,6 +21,7 @@ class ProducerRequest : public GenericRequest {
     std::string original_filepath;
     RequestCallback callback;
     Error ReadDataFromFileIfNeeded(const IO* io);
+    bool NeedSendData() const;
 };
 
 }
diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp
index 5ad76ffe9..21c26ef9a 100644
--- a/producer/api/cpp/src/request_handler_tcp.cpp
+++ b/producer/api/cpp/src/request_handler_tcp.cpp
@@ -48,14 +48,6 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const
     return nullptr;
 }
 
-bool NeedSendData(const ProducerRequest* request) {
-    if (request->header.op_code == kOpcodeTransferData || request->header.op_code == kOpcodeTransferSubsetData) {
-        return request->header.custom_data[kPosInjestMode] & IngestModeFlags::kTransferData;
-    }
-
-    return true;
-}
-
 Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) {
     Error io_error;
     io__->Send(sd_, &(request->header), sizeof(request->header), &io_error);
@@ -63,7 +55,7 @@ Error RequestHandlerTcp::SendRequestContent(const ProducerRequest* request) {
         return io_error;
     }
 
-    if (NeedSendData(request)) {
+    if (request->NeedSendData()) {
         io__->Send(sd_, (void*) request->data.get(), (size_t)request->header.data_size, &io_error);
     }
 
@@ -209,7 +201,10 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request) {
     auto producer_request = static_cast<ProducerRequest*>(request);
     auto err = producer_request->ReadDataFromFileIfNeeded(io__.get());
     if (err) {
-        return err;
+        if (producer_request->callback) {
+            producer_request->callback(producer_request->header, std::move(err));
+        }
+        return nullptr;
     }
 
     if (NeedRebalance()) {
diff --git a/producer/api/cpp/unittests/test_producer.cpp b/producer/api/cpp/unittests/test_producer.cpp
index 11359b693..8e2e83602 100644
--- a/producer/api/cpp/unittests/test_producer.cpp
+++ b/producer/api/cpp/unittests/test_producer.cpp
@@ -53,7 +53,7 @@ TEST(CreateProducer, TooManyThreads) {
 TEST(CreateProducer, ZeroThreads) {
     asapo::Error err;
     std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", 0,
-                                                                        asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err);
+                                                asapo::RequestHandlerType::kTcp, SourceCredentials{"bt", "", ""}, &err);
     ASSERT_THAT(producer, Eq(nullptr));
     ASSERT_THAT(err, Ne(nullptr));
 }
@@ -65,7 +65,7 @@ TEST(Producer, SimpleWorkflowWihoutConnection) {
                                                 SourceCredentials{"bt", "", ""},
                                                 &err);
 
-    asapo::EventHeader event_header{1, 1, ""};
+    asapo::EventHeader event_header{1, 1, "test"};
     auto err_send = producer->SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
 
     std::this_thread::sleep_for(std::chrono::milliseconds(100));
diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp
index e3f071ff6..90a736513 100644
--- a/producer/api/cpp/unittests/test_producer_impl.cpp
+++ b/producer/api/cpp/unittests/test_producer_impl.cpp
@@ -90,7 +90,7 @@ class ProducerImplTests : public testing::Test {
 TEST_F(ProducerImplTests, SendReturnsError) {
     EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return(
             asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release()));
-    asapo::EventHeader event_header{1, 1, ""};
+    asapo::EventHeader event_header{1, 1, "test"};
     auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull));
 }
@@ -102,6 +102,14 @@ TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) {
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong));
 }
 
+TEST_F(ProducerImplTests, ErrorIfFileEmpty) {
+    std::string long_string(asapo::kMaxMessageSize + 100, 'a');
+    asapo::EventHeader event_header{1, 1, ""};
+    auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName));
+}
+
+
 TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) {
     EXPECT_CALL(mock_logger, Error(testing::HasSubstr("error checking")));
     asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize + 1, ""};
@@ -111,7 +119,7 @@ TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) {
 
 TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) {
     EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset size")));
-    asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize, "", "", 1};
+    asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize, "test", "", 1};
     auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kErrorSubsetSize));
 }
@@ -196,6 +204,32 @@ TEST_F(ProducerImplTests, OKAddingSendMetaDataRequest) {
     ASSERT_THAT(err, Eq(nullptr));
 }
 
+
+TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) {
+    producer.SetCredentials(expected_credentials);
+
+    EXPECT_CALL(mock_pull, AddRequest_t(_)).Times(0);
+
+    asapo::EventHeader event_header{expected_id, 0, expected_name};
+    auto err = producer.SendFile(event_header, "", expected_injest_mode, nullptr);
+
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName));
+
+}
+
+TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) {
+    producer.SetCredentials(expected_credentials);
+
+    EXPECT_CALL(mock_pull, AddRequest_t(_)).Times(0);
+
+    asapo::EventHeader event_header{expected_id, 0, ""};
+    auto err = producer.SendFile(event_header, expected_fullpath, expected_injest_mode, nullptr);
+
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kEmptyFileName));
+
+}
+
+
 TEST_F(ProducerImplTests, OKSendingSendFileRequest) {
     producer.SetCredentials(expected_credentials);
 
diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp
index bce8b8a92..603bbf49e 100644
--- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp
+++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp
@@ -71,6 +71,10 @@ class RequestHandlerFilesystemTests : public testing::Test {
     asapo::RequestHandlerFilesystem request_handler{expected_destination, expected_thread_id};
 
     void SetUp() override {
+        request.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode;
+        request_filesend.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode;
+        request_nocallback.header.custom_data[asapo::kPosInjestMode] = asapo::kDefaultIngestMode;
+
         request_handler.log__ = &mock_logger;
         request_handler.io__.reset(&mock_io);
     }
diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
index 70cf17069..0c9caf4bb 100644
--- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp
+++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
@@ -15,6 +15,8 @@
 
 #include "mocking.h"
 
+#include <functional>
+
 namespace {
 
 using ::testing::Return;
@@ -62,16 +64,23 @@ class RequestHandlerTcpTests : public testing::Test {
 
     asapo::Error callback_err;
     asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_meta_size, expected_file_name};
-    bool called = false;
+    bool callback_called = false;
     asapo::GenericRequestHeader callback_header;
+
+
     asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, expected_metadata, "", [this](asapo::GenericRequestHeader header, asapo::Error err) {
-        called = true;
+        callback_called = true;
         callback_err = std::move(err);
         callback_header = header;
     }};
 
     std::string expected_origin_fullpath = std::string("origin/") + expected_file_name;
-    asapo::ProducerRequest request_filesend{expected_beamtime_id, header, nullptr, expected_metadata, expected_origin_fullpath, nullptr};
+    asapo::ProducerRequest request_filesend{expected_beamtime_id, header, nullptr, expected_metadata,
+    expected_origin_fullpath, [this](asapo::GenericRequestHeader header, asapo::Error err) {
+        callback_called = true;
+        callback_err = std::move(err);
+        callback_header = header;
+    }};
 
 
     asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, expected_metadata,  "", nullptr};
@@ -626,7 +635,7 @@ void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode e
     request_handler.PrepareProcessingRequestLocked();
     auto err = request_handler.ProcessRequestUnlocked(&request);
     ASSERT_THAT(callback_err, Eq(err_template));
-    ASSERT_THAT(called, Eq(true));
+    ASSERT_THAT(callback_called, Eq(true));
     ASSERT_THAT(err, Eq(nullptr));
 }
 
@@ -650,7 +659,7 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) {
     auto err = request_handler.ProcessRequestUnlocked(&request_nocallback);
 
     ASSERT_THAT(err, Eq(nullptr));
-    ASSERT_THAT(called, Eq(false));
+    ASSERT_THAT(callback_called, Eq(false));
 }
 
 TEST_F(RequestHandlerTcpTests, FileRequestErrorOnReadData) {
@@ -665,7 +674,10 @@ TEST_F(RequestHandlerTcpTests, FileRequestErrorOnReadData) {
         ));
 
     auto err = request_handler.ProcessRequestUnlocked(&request_filesend);
-    ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError));
+    ASSERT_THAT(callback_err, Eq(asapo::IOErrorTemplates::kUnknownIOError));
+    ASSERT_THAT(callback_called, Eq(true));
+    ASSERT_THAT(err, Eq(nullptr));
+
 }
 
 
@@ -689,6 +701,8 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) {
     ASSERT_THAT(err, Eq(nullptr));
 }
 
+
+
 TEST_F(RequestHandlerTcpTests, SendOK) {
     ExpectOKConnect(true);
     ExpectOKAuthorize(true);
@@ -700,7 +714,7 @@ TEST_F(RequestHandlerTcpTests, SendOK) {
 
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(callback_err, Eq(nullptr));
-    ASSERT_THAT(called, Eq(true));
+    ASSERT_THAT(callback_called, Eq(true));
     ASSERT_THAT(callback_header.data_size, Eq(header.data_size));
     ASSERT_THAT(callback_header.op_code, Eq(header.op_code));
     ASSERT_THAT(callback_header.data_id, Eq(header.data_id));
@@ -743,5 +757,24 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) {
     ASSERT_THAT(callback_header.custom_data[asapo::kPosInjestMode], Eq(injest_mode));
 }
 
+TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) {
+    ExpectOKConnect(true);
+    ExpectOKAuthorize(true);
+    ExpectOKSendHeader(true);
+    ExpectOKSendMetaData(true);
+    ExpectOKReceive();
+
+    request_handler.PrepareProcessingRequestLocked();
+
+    EXPECT_CALL(mock_io, GetDataFromFile_t(_,_,_)).Times(0);
+
+    auto injest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly;
+
+    request_filesend.header.custom_data[asapo::kPosInjestMode] = injest_mode;
+    auto err = request_handler.ProcessRequestUnlocked(&request_filesend);
+    ASSERT_THAT(err, Eq(nullptr));
+}
+
+
 
 }
diff --git a/producer/api/python/CMakeLists_Linux.cmake b/producer/api/python/CMakeLists_Linux.cmake
index f01baa6ec..95f3fb3a0 100644
--- a/producer/api/python/CMakeLists_Linux.cmake
+++ b/producer/api/python/CMakeLists_Linux.cmake
@@ -16,10 +16,10 @@ set (ASAPO_PRODUCER_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../cpp/include)
 configure_files(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR} @ONLY)
 
 ADD_CUSTOM_TARGET(python-lib2-producer ALL
-        COMMAND python setup.py build_ext --inplace)
+        COMMAND python setup.py build_ext --inplace --force )
 
 ADD_CUSTOM_TARGET(python-lib3-producer ALL
-        COMMAND python3 setup.py build_ext --inplace)
+        COMMAND python3 setup.py build_ext --inplace --force)
 
 ADD_DEPENDENCIES(python-lib2-producer asapo-producer)
 ADD_DEPENDENCIES(python-lib3-producer asapo-producer)
diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd
index 3dcbdd19d..abc6b065c 100644
--- a/producer/api/python/asapo_producer.pxd
+++ b/producer/api/python/asapo_producer.pxd
@@ -26,6 +26,16 @@ cdef extern from "asapo_producer.h" namespace "asapo":
   RequestHandlerType RequestHandlerType_Tcp "asapo::RequestHandlerType::kTcp"
 
 
+cdef extern from "asapo_producer.h" namespace "asapo":
+  cppclass LogLevel:
+    pass
+  LogLevel LogLevel_None "asapo::LogLevel::None"
+  LogLevel LogLevel_Error "asapo::LogLevel::Error"
+  LogLevel LogLevel_Info "asapo::LogLevel::Info"
+  LogLevel LogLevel_Debug "asapo::LogLevel::Debug"
+  LogLevel LogLevel_Warning "asapo::LogLevel::Warning"
+
+
 cdef extern from "asapo_producer.h" namespace "asapo":
   struct  SourceCredentials:
     string beamtime_id
@@ -52,7 +62,7 @@ cdef extern from "asapo_producer.h" namespace "asapo":
 
 cdef extern from "asapo_producer.h" namespace "asapo":
   struct  GenericRequestHeader:
-    pass
+    string Json()
 
 cdef extern from "asapo_producer.h" namespace "asapo":
   cppclass RequestCallback:
@@ -60,10 +70,9 @@ cdef extern from "asapo_producer.h" namespace "asapo":
 
 
 cdef extern from "asapo_wrappers.h" namespace "asapo":
-    cdef cppclass function_wrapper:
-        ctypedef void (*cy_callback) (void*, GenericRequestHeader, Error)
-        @staticmethod
-        RequestCallback make_std_function(cy_callback, void*)
+    cppclass RequestCallbackCython:
+      pass
+    RequestCallback unwrap_callback(RequestCallbackCython, void*,void*)
 
 
 cdef extern from "asapo_producer.h" namespace "asapo":
@@ -71,4 +80,12 @@ cdef extern from "asapo_producer.h" namespace "asapo":
         @staticmethod
         unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,Error* error)
         Error SendFile(const EventHeader& event_header, string full_path, uint64_t injest_mode,RequestCallback callback)
+        void SetLogLevel(LogLevel level)
+
+cdef extern from "asapo_producer.h" namespace "asapo":
+    uint64_t kDefaultIngestMode
+    enum IngestModeFlags:
+        kTransferData
+        kTransferMetaDataOnly
+        kStoreInFilesystem
 
diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index 4ef65b113..d5d14af01 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -9,6 +9,12 @@ from libcpp.memory cimport unique_ptr
 
 np.import_array()
 
+DEFAULT_INJEST_MODE = kDefaultIngestMode
+INJEST_MODE_TRANSFER_DATA = kTransferData
+INJEST_MODE_TRANSFER_METADATA_ONLY = kTransferMetaDataOnly
+INJEST_MODE_STORE_IN_FILESYSTEM = kStoreInFilesystem
+
+
 cdef extern from "numpy/ndarraytypes.h":
     void PyArray_ENABLEFLAGS(np.ndarray arr, int flags)
 
@@ -27,26 +33,53 @@ cdef bytes _bytes(s):
     else:
         raise TypeError("Could not convert to unicode.")
 
-ctypedef void (*cb_type)(void*, GenericRequestHeader, Error)
-
-
 cdef class PyProducer:
     cdef unique_ptr[Producer] c_producer
-    cdef object py_callback
-    def send_file(self,string fname):
+    def set_log_level(self,level):
+         cdef LogLevel log_level
+         log_level = LogLevel_Info
+         if level == "debug" :
+            log_level = LogLevel_Debug
+         elif level == "info" :
+            log_level = LogLevel_Info
+         elif level == "none" :
+            log_level = LogLevel_None
+         elif level == "warn" :
+            log_level = LogLevel_Warning
+         else:
+            print("wrong loglevel mode: "+level)
+            return
+         self.c_producer.get().SetLogLevel(log_level)
+    def send_file(self,int id,string local_path,string exposed_path,user_meta=None,subset=None,injest_mode = DEFAULT_INJEST_MODE,callback=None):
         cdef EventHeader event_header
-        err = self.c_producer.get().SendFile(event_header, _bytes(fname), 1,
-        	function_wrapper.make_std_function(<cb_type>self.c_callback, <void*>self))
+        event_header.file_id = id
+        event_header.file_size = 0
+        event_header.file_name = exposed_path
+        event_header.user_metadata = user_meta if user_meta!=None else ""
+        if subset == None:
+            event_header.subset_id = 0
+            event_header.subset_size = 0
+        else:
+            event_header.subset_id = subset[0]
+            event_header.subset_size = subset[1]
+        err = self.c_producer.get().SendFile(event_header, _bytes(local_path), injest_mode,
+            unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
         cdef err_str = GetErrorString(&err)
         if err_str.strip():
             return err_str
         else:
-            return "ok"
-    cdef void c_callback(self,GenericRequestHeader header, Error err) with gil:
-        self.py_callback(1,2)
-        return
+            return None
+    cdef void c_callback(self,py_callback,GenericRequestHeader header, Error err) with gil:
+        info_str = _str(header.Json())
+        info = json.loads(info_str)
+        cdef err_str = GetErrorString(&err)
+        if err_str.strip():
+           py_err = err_str
+        else:
+           py_err = None
+        py_callback(info,py_err)
     @staticmethod
-    def create_producer(endpoint,beamtime_id,stream,token,nthreads,py_callback):
+    def create_producer(endpoint,beamtime_id,stream,token,nthreads):
         pyProd = PyProducer()
         cdef Error err
         cdef SourceCredentials source
@@ -54,15 +87,14 @@ cdef class PyProducer:
         source.user_token = token
         source.stream = stream
         pyProd.c_producer = Producer.Create(endpoint,nthreads,RequestHandlerType_Tcp,source,&err)
-        pyProd.py_callback = py_callback
         cdef err_str = GetErrorString(&err)
         if err_str.strip():
             return None,err_str
         else:
             return pyProd,None
 
-def create_producer(endpoint,beamtime_id,stream,token,nthreads,py_callback):
-    return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads,py_callback)
+def create_producer(endpoint,beamtime_id,stream,token,nthreads):
+    return PyProducer.create_producer(_bytes(endpoint),_bytes(beamtime_id),_bytes(stream),_bytes(token),nthreads)
 
 
 __version__ = "@ASAPO_VERSION_PYTHON@"
diff --git a/producer/api/python/asapo_wrappers.h b/producer/api/python/asapo_wrappers.h
index 034e0fc87..ffaa4b184 100644
--- a/producer/api/python/asapo_wrappers.h
+++ b/producer/api/python/asapo_wrappers.h
@@ -14,20 +14,17 @@ inline std::string GetErrorString(asapo::Error* err) {
     return "";
 }
 
-using cy_callback = void (*)(void*, GenericRequestHeader header, Error err);
-
-class function_wrapper {
-  public:
-    static
-    RequestCallback make_std_function(cy_callback callback, void* c_self)
-    {
-        RequestCallback wrapper = [=](GenericRequestHeader header, Error err) -> void
-        {
-          callback(c_self, header, std::move(err));
-        };
-        return wrapper;
+using RequestCallbackCython = void (*)(void*, void*, GenericRequestHeader header, Error err);
+
+RequestCallback unwrap_callback(RequestCallbackCython callback, void* c_self, void* py_func) {
+    if (py_func == NULL) {
+        return nullptr;
     }
-};
+    RequestCallback wrapper = [ = ](GenericRequestHeader header, Error err) -> void {
+        callback(c_self, py_func, header, std::move(err));
+    };
+    return wrapper;
+}
 
 }
 
diff --git a/tests/manual/python_tests/producer/authorizer.json.tpl b/tests/manual/python_tests/producer/authorizer.json.tpl
new file mode 100644
index 000000000..7c3a796d2
--- /dev/null
+++ b/tests/manual/python_tests/producer/authorizer.json.tpl
@@ -0,0 +1,10 @@
+{
+  "Port": {{ env "NOMAD_PORT_authorizer" }},
+  "LogLevel":"debug",
+  "AlwaysAllowedBeamtimes":[{"BeamtimeId":"asapo_test","Beamline":"test"},
+  {"BeamtimeId":"asapo_test1","Beamline":"test1"},
+  {"BeamtimeId":"asapo_test2","Beamline":"test2"}],
+  "SecretFile":"auth_secret.key"
+}
+
+
diff --git a/tests/manual/python_tests/producer/authorizer.nmd b/tests/manual/python_tests/producer/authorizer.nmd
new file mode 100644
index 000000000..8b32105cf
--- /dev/null
+++ b/tests/manual/python_tests/producer/authorizer.nmd
@@ -0,0 +1,55 @@
+job "authorizer" {
+  datacenters = ["dc1"]
+
+  type = "service"
+
+  group "group" {
+    count = 1
+
+    task "authorizer" {
+      driver = "raw_exec"
+
+      config {
+        command = "/home/yakubov/projects/asapo/cmake-build-debug/authorizer/asapo-authorizer",
+        args =  ["-config","${NOMAD_TASK_DIR}/authorizer.json"]
+      }
+
+      resources {
+        cpu    = 500 # 500 MHz
+        memory = 256 # 256MB
+        network {
+          port "authorizer" {
+            static = "5007"
+          }
+        }
+      }
+
+      service {
+        name = "authorizer"
+        port = "authorizer"
+        check {
+          name     = "alive"
+          type     = "http"
+          path     = "/health-check"
+          interval = "10s"
+          timeout  = "2s"
+          initial_status =   "passing"
+        }
+      }
+
+      template {
+         source        = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/authorizer.json.tpl"
+         destination   = "local/authorizer.json"
+         change_mode   = "signal"
+         change_signal = "SIGHUP"
+      }
+
+      template {
+         source        = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/auth_secret.key"
+         destination   = "auth_secret.key"
+         change_mode   = "signal"
+         change_signal = "SIGHUP"
+      }
+    }
+  }
+}
diff --git a/tests/manual/python_tests/producer/clean_db.sh b/tests/manual/python_tests/producer/clean_db.sh
new file mode 100755
index 000000000..326ee510d
--- /dev/null
+++ b/tests/manual/python_tests/producer/clean_db.sh
@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+
+beamtime_id=asapo_test1
+
+
+echo "db.dropDatabase()" | mongo ${beamtime_id}_detector
diff --git a/tests/manual/python_tests/producer/discovery.json.tpl b/tests/manual/python_tests/producer/discovery.json.tpl
new file mode 100644
index 000000000..25cba6782
--- /dev/null
+++ b/tests/manual/python_tests/producer/discovery.json.tpl
@@ -0,0 +1,10 @@
+{
+  "Mode": "consul",
+  "Receiver": {
+    "MaxConnections": 32
+  },
+  "Port": {{ env "NOMAD_PORT_discovery" }},
+  "LogLevel":"debug"
+}
+
+
diff --git a/tests/manual/python_tests/producer/discovery.nmd b/tests/manual/python_tests/producer/discovery.nmd
new file mode 100644
index 000000000..f95980bae
--- /dev/null
+++ b/tests/manual/python_tests/producer/discovery.nmd
@@ -0,0 +1,49 @@
+job "discovery" {
+  datacenters = ["dc1"]
+
+  type = "service"
+
+  group "group" {
+    count = 1
+
+    task "discovery" {
+      driver = "raw_exec"
+
+      config {
+        command = "/home/yakubov/projects/asapo/cmake-build-debug/discovery/asapo-discovery",
+        args =  ["-config","${NOMAD_TASK_DIR}/discovery.json"]
+      }
+
+      resources {
+        cpu    = 500 # 500 MHz
+        memory = 256 # 256MB
+        network {
+          port "discovery" {
+            static = "5006"
+          }
+        }
+      }
+
+      service {
+        name = "discovery"
+        port = "discovery"
+        check {
+          name     = "alive"
+          type     = "http"
+          path     = "/receivers"
+          interval = "10s"
+          timeout  = "2s"
+          initial_status =   "passing"
+        }
+      }
+
+      template {
+         source        = "/home/yakubov/projects/asapo/tests/manual/python_tests/producer/discovery.json.tpl"
+         destination   = "local/discovery.json"
+         change_mode   = "signal"
+         change_signal = "SIGHUP"
+      }
+
+    }
+  }
+}
diff --git a/tests/manual/python_tests/producer/file1 b/tests/manual/python_tests/producer/file1
new file mode 100644
index 000000000..9daeafb98
--- /dev/null
+++ b/tests/manual/python_tests/producer/file1
@@ -0,0 +1 @@
+test
diff --git a/tests/manual/python_tests/producer/nginx.conf.tpl b/tests/manual/python_tests/producer/nginx.conf.tpl
new file mode 100644
index 000000000..a545307b3
--- /dev/null
+++ b/tests/manual/python_tests/producer/nginx.conf.tpl
@@ -0,0 +1,42 @@
+worker_processes  1;
+daemon off;
+
+events {
+    worker_connections  1024;
+}
+
+http {
+#    include       mime.types;
+#    default_type  application/octet-stream;
+
+#    sendfile        on;
+#    tcp_nopush     on;
+
+#    keepalive_timeout  0;
+#    keepalive_timeout  65;
+
+    resolver 127.0.0.1:8600 valid=1s;
+    server {
+        listen       {{ env "NOMAD_PORT_nginx" }};
+          set $discovery_endpoint discovery.service.asapo;
+          set $authorizer_endpoint authorizer.service.asapo;
+       #   set $fluentd_endpoint localhost;
+          location /discovery/ {
+            rewrite ^/discovery(/.*) $1 break;
+            proxy_pass http://$discovery_endpoint:5006$uri;
+          }
+          location /logs/ {
+             # rewrite ^/logs(/.*) $1 break;
+              proxy_pass http://localhost:9880/asapo;
+          }
+           location /authorizer/ {
+            rewrite ^/authorizer(/.*) $1 break;
+            proxy_pass http://$authorizer_endpoint:5007$uri;
+                    }
+
+
+	location /nginx-health {
+  	  return 200 "healthy\n";
+	}
+    }
+}
diff --git a/tests/manual/python_tests/producer/nginx.nmd b/tests/manual/python_tests/producer/nginx.nmd
new file mode 100644
index 000000000..b424e5387
--- /dev/null
+++ b/tests/manual/python_tests/producer/nginx.nmd
@@ -0,0 +1,63 @@
+job "nginx" {
+  datacenters = ["dc1"]
+
+  type = "service"
+
+  update {
+    max_parallel = 1
+    min_healthy_time = "10s"
+    healthy_deadline = "3m"
+    auto_revert = false
+  }
+
+  group "nginx" {
+    count = 1
+
+    restart {
+      attempts = 2
+      interval = "30m"
+      delay = "15s"
+      mode = "fail"
+    }
+
+    task "nginx" {
+      driver = "raw_exec"
+
+      config {
+        command = "nginx",
+        args =  ["-c","${NOMAD_TASK_DIR}/nginx.conf"]
+      }
+
+      resources {
+        cpu    = 500 # 500 MHz
+        memory = 256 # 256MB
+        network {
+          mbits = 10
+          port "nginx" {
+          static = 8400
+          }
+        }
+      }
+
+      service {
+        port = "nginx"
+        name = "nginx"
+        check {
+          name     = "alive"
+          type     = "http"
+	  path     = "/nginx-health"
+          timeout  = "2s"
+	  interval = "10s"
+        }
+      }
+
+      template {
+         source        = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/nginx.conf.tpl"
+         destination   = "local/nginx.conf"
+         change_mode   = "restart"
+      }
+
+
+   }
+  }
+}
diff --git a/tests/manual/python_tests/producer/receiver.json.tpl b/tests/manual/python_tests/producer/receiver.json.tpl
new file mode 100644
index 000000000..b361ba851
--- /dev/null
+++ b/tests/manual/python_tests/producer/receiver.json.tpl
@@ -0,0 +1,22 @@
+{
+  "MonitorDbAddress":"localhost:8086",
+  "MonitorDbName": "db_test",
+  "BrokerDbAddress":"localhost:27017",
+  "DataServer": {
+    "NThreads": 2,
+    "ListenPort": {{ env "NOMAD_PORT_recv_ds" }}
+  },
+  "DataCache": {
+    "Use": true,
+    "SizeGB": 1,
+    "ReservedShare": 10
+  },
+  "AuthorizationServer": "localhost:8400/authorizer",
+  "AuthorizationInterval": 10000,
+  "ListenPort": {{ env "NOMAD_PORT_recv" }},
+  "Tag": "{{ env "NOMAD_ADDR_recv" }}",
+  "WriteToDisk": true,
+  "WriteToDb": true,
+  "LogLevel" : "debug",
+  "RootFolder" : "/tmp/asapo/receiver/files"
+}
diff --git a/tests/manual/python_tests/producer/receiver.nmd b/tests/manual/python_tests/producer/receiver.nmd
new file mode 100644
index 000000000..75fbca574
--- /dev/null
+++ b/tests/manual/python_tests/producer/receiver.nmd
@@ -0,0 +1,47 @@
+job "receiver" {
+  datacenters = ["dc1"]
+
+  type = "service"
+
+  group "group" {
+    count = 1
+
+    task "receiver" {
+      driver = "raw_exec"
+
+      config {
+        command = "/home/yakubov/projects/asapo/cmake-build-debug/receiver/receiver",
+        args =  ["${NOMAD_TASK_DIR}/receiver.json"]
+      }
+
+      resources {
+        cpu    = 500 # 500 MHz
+        memory = 256 # 256MB
+        network {
+          port "recv" {}
+          port "recv_ds" {}
+        }
+      }
+
+      service {
+        name = "asapo-receiver"
+        port = "recv"
+        check {
+          name     = "alive"
+          type     = "tcp"
+          interval = "10000s"
+          timeout  = "2s"
+          initial_status =   "passing"
+        }
+      }
+
+      template {
+         source        = "/home/yakubov/projects/asapo/cmake-build-debug/tests/automatic/full_chain/simple_chain/receiver.json.tpl"
+         destination   = "local/receiver.json"
+         change_mode   = "signal"
+         change_signal = "SIGHUP"
+      }
+
+    }
+  }
+}
diff --git a/tests/manual/python_tests/producer/run.sh b/tests/manual/python_tests/producer/run.sh
index 52c6aec88..62b5fec56 100755
--- a/tests/manual/python_tests/producer/run.sh
+++ b/tests/manual/python_tests/producer/run.sh
@@ -1,4 +1,6 @@
 #!/usr/bin/env bash
 export PYTHONPATH=/home/yakubov/projects/asapo/cmake-build-debug/producer/api/python:${PYTHONPATH}
 
+mkdir -p /tmp/asapo/receiver/files/test1/asapo_test1
+
 python test.py
diff --git a/tests/manual/python_tests/producer/start_services.sh b/tests/manual/python_tests/producer/start_services.sh
new file mode 100755
index 000000000..bd0128b53
--- /dev/null
+++ b/tests/manual/python_tests/producer/start_services.sh
@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+
+nomad run authorizer.nmd
+nomad run discovery.nmd
+nomad run nginx.nmd
+nomad run receiver.nmd
diff --git a/tests/manual/python_tests/producer/stop_services.sh b/tests/manual/python_tests/producer/stop_services.sh
new file mode 100755
index 000000000..13c664ca5
--- /dev/null
+++ b/tests/manual/python_tests/producer/stop_services.sh
@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+
+nomad stop authorizer
+nomad stop discovery
+nomad stop nginx
+nomad stop receiver
diff --git a/tests/manual/python_tests/producer/test.py b/tests/manual/python_tests/producer/test.py
index 9e93ff976..0f9ed285e 100644
--- a/tests/manual/python_tests/producer/test.py
+++ b/tests/manual/python_tests/producer/test.py
@@ -2,28 +2,49 @@ from __future__ import print_function
 
 import asapo_producer
 import sys
-import json
 import time
 
-endpoint = "psana002:8400"
+#import threading
+#lock = threading.Lock()
+
+
+endpoint = "127.0.0.1:8400"
 beamtime = "asapo_test1"
-stream = "stream"
+stream = ""
 token = ""
-nthreads = 1
+nthreads = 8
 
-def callback(i,j):
-    print(i,j)
+def callback(header,err):
+#    lock.acquire() # just example, don't do this if not needed
+    if err is not None:
+        print("could not sent: ",header,err)
+    else:
+        print ("successfuly sent: ",header)
+#    lock.release()
 
+producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads)
+if err is not None:
+    print(err)
+    sys.exit(1)
 
-producer, err = asapo_producer.create_producer(endpoint,beamtime, stream, token, nthreads,callback)
+producer.set_log_level("info")
 
+#send single file
+err = producer.send_file(1, local_path = "./file1", exposed_path = "file1", user_meta = '{"test_key":"test_val"}', callback = callback)
 if err is not None:
     print(err)
-    sys.exit(1)
-else:
-    print(producer)
 
 
-producer.send_file("")
+#send subsets
+#producer.send_file(1, local_path = "./file1", exposed_path = "file1"",subset=(1,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+#producer.send_file(1, local_path = "./file1", exposed_path = "file1",subset=(1,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+
+#send meta only
+err = producer.send_file(2, local_path = "./file2",exposed_path = "./file2",
+                         injest_mode = asapo_producer.INJEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
+if err is not None:
+    print(err)
+
+time.sleep(1)
 
 
-- 
GitLab