From 688ba445348fe98fb6c0ce7ef1062837aa56e6b0 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Tue, 13 Aug 2019 16:29:04 +0200
Subject: [PATCH] start working at ingest modes

---
 common/cpp/include/common/data_structs.h      | 11 +++++++++
 .../dummy_data_producer.cpp                   |  7 +++---
 producer/api/include/producer/common.h        | 15 ++++++++----
 producer/api/include/producer/producer.h      |  8 +++----
 producer/api/src/producer_impl.cpp            | 21 ++++++++--------
 producer/api/src/producer_impl.h              |  9 +++----
 producer/api/unittests/test_producer.cpp      |  2 +-
 producer/api/unittests/test_producer_impl.cpp | 24 +++++++++----------
 .../src/main_eventmon.cpp                     |  2 +-
 9 files changed, 59 insertions(+), 40 deletions(-)

diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h
index 56020baac..1d32acf48 100644
--- a/common/cpp/include/common/data_structs.h
+++ b/common/cpp/include/common/data_structs.h
@@ -56,5 +56,16 @@ struct SourceCredentials {
     };
 };
 
+enum IngestModeFlags : uint64_t {
+    kTransferData = 1 << 0,
+    kTransferMetaDataOnly = 1 << 1,
+    kStoreInCache = 1 << 2,
+    kStoreInFilesystem = 1 << 3,
+};
+
+const uint64_t kDefaultIngestMode = kTransferData | kStoreInCache | kStoreInFilesystem;
+
+
+
 }
 #endif //ASAPO_FILE_INFO_H
diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index 1f026bc18..a7b6c2dab 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -146,8 +146,9 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
         if (!stream.empty()) {
             event_header.file_name = stream + "/" + event_header.file_name;
         }
+        event_header.user_metadata = std::move(meta);
         if (images_in_set == 1) {
-            auto err = producer->SendData(event_header, std::move(buffer), std::move(meta), &ProcessAfterSend);
+            auto err = producer->SendData(event_header, std::move(buffer), asapo::kDefaultIngestMode, &ProcessAfterSend);
             if (err) {
                 std::cerr << "Cannot send file: " << err << std::endl;
                 return false;
@@ -162,8 +163,8 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
                 if (!stream.empty()) {
                     event_header.file_name = stream + "/" + event_header.file_name;
                 }
-
-                auto err = producer->SendData(event_header, std::move(buffer), meta, &ProcessAfterSend);
+                event_header.user_metadata = meta;
+                auto err = producer->SendData(event_header, std::move(buffer), asapo::kDefaultIngestMode, &ProcessAfterSend);
                 if (err) {
                     std::cerr << "Cannot send file: " << err << std::endl;
                     return false;
diff --git a/producer/api/include/producer/common.h b/producer/api/include/producer/common.h
index 1e3ba0028..ba32d6925 100644
--- a/producer/api/include/producer/common.h
+++ b/producer/api/include/producer/common.h
@@ -21,14 +21,19 @@ enum class RequestHandlerType {
 
 struct EventHeader {
     EventHeader() {};
-    EventHeader(uint64_t file_id_i, uint64_t file_size_i, std::string file_name_i, uint64_t expected_subset_id_i = 0,
-                uint64_t expected_subset_size_i = 0 ):
-        file_id{file_id_i}, file_size{file_size_i}, file_name{std::move(file_name_i)},
-        subset_id{expected_subset_id_i},
-        subset_size{expected_subset_size_i} {};
+    EventHeader(uint64_t file_id_i, uint64_t file_size_i, std::string file_name_i,
+                std::string user_metadata_i = "",
+                uint64_t subset_id_i = 0,
+                uint64_t subset_size_i = 0 ):
+        file_id{file_id_i}, file_size{file_size_i},
+        file_name{std::move(file_name_i)},
+        user_metadata{std::move(user_metadata_i)},
+        subset_id{subset_id_i},
+        subset_size{subset_size_i} {};
     uint64_t file_id = 0;
     uint64_t file_size = 0;
     std::string file_name;
+    std::string user_metadata;
     uint64_t subset_id = 0;
     uint64_t subset_size = 0;
 };
diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h
index 4ed90b2dd..d60ad09ab 100644
--- a/producer/api/include/producer/producer.h
+++ b/producer/api/include/producer/producer.h
@@ -26,12 +26,11 @@ class Producer {
 
     //! Sends data to the receiver
     /*!
-      \param event_header - A stucture with the meta information (file name, size).
+      \param event_header - A stucture with the meta information (file name, size, a string with user metadata (JSON format)).
       \param data - A pointer to the data to send
-      \param metadata - A string with metadata (JSON format)
       \return Error - Will be nullptr on success
     */
-    virtual Error SendData(const EventHeader& event_header, FileData data, std::string metadata,
+    virtual Error SendData(const EventHeader& event_header, FileData data, uint64_t injest_mode,
                            RequestCallback callback) = 0;
     //! Sends files to the receiver
     /*!
@@ -39,7 +38,8 @@ class Producer {
       \param file name - A full path of the file to send
       \return Error - Will be nullptr on success
     */
-    virtual Error SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) = 0;
+    virtual Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t injest_mode,
+                           RequestCallback callback) = 0;
 
     //! Sends metadata for the current beamtime to the receiver
     /*!
diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp
index af08b782f..17b434c9d 100644
--- a/producer/api/src/producer_impl.cpp
+++ b/producer/api/src/producer_impl.cpp
@@ -29,9 +29,9 @@ ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, a
     request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get(), log__});
 }
 
-GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header, uint64_t meta_size) {
+GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const EventHeader& event_header) {
     GenericRequestHeader request{kOpcodeTransferData, event_header.file_id, event_header.file_size,
-                                 meta_size, std::move(event_header.file_name)};
+                                 event_header.user_metadata.size(), std::move(event_header.file_name)};
     if (event_header.subset_id != 0) {
         request.op_code = kOpcodeTransferSubsetData;
         request.custom_data[0] = event_header.subset_id;
@@ -58,8 +58,8 @@ Error CheckProducerRequest(const EventHeader& event_header) {
 
 Error ProducerImpl::Send(const EventHeader& event_header,
                          FileData data,
-                         std::string metadata,
                          std::string full_path,
+                         uint64_t injest_mode,
                          RequestCallback callback) {
     auto err = CheckProducerRequest(event_header);
     if (err) {
@@ -67,23 +67,24 @@ Error ProducerImpl::Send(const EventHeader& event_header,
         return err;
     }
 
-    auto request_header = GenerateNextSendRequest(event_header, metadata.size());
+    auto request_header = GenerateNextSendRequest(event_header);
 
     return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{source_cred_string_, std::move(request_header),
-                std::move(data), std::move(metadata), std::move(full_path), callback}
+                std::move(data), std::move(event_header.user_metadata), std::move(full_path), callback}
     });
 
 }
 
 
-Error ProducerImpl::SendData(const EventHeader& event_header, FileData data, std::string metadata,
-                             RequestCallback callback) {
-    return Send(event_header, std::move(data), std::move(metadata), "", callback);
+Error ProducerImpl::SendData(const EventHeader& event_header, FileData data,
+                             uint64_t injest_mode, RequestCallback callback) {
+    return Send(std::move(event_header), std::move(data), "", injest_mode, callback);
 }
 
 
-Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) {
-    return Send(event_header, nullptr, "", std::move(full_path), callback);
+Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, uint64_t injest_mode,
+                             RequestCallback callback) {
+    return Send(event_header, nullptr, std::move(full_path), injest_mode, callback);
 }
 
 
diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h
index fc3d10fc8..487943b3d 100644
--- a/producer/api/src/producer_impl.h
+++ b/producer/api/src/producer_impl.h
@@ -28,8 +28,9 @@ class ProducerImpl : public Producer {
     void SetLogLevel(LogLevel level) override;
     void EnableLocalLog(bool enable) override;
     void EnableRemoteLog(bool enable) override;
-    Error SendData(const EventHeader& event_header, FileData data, std::string metadata, RequestCallback callback) override;
-    Error SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) override;
+    Error SendData(const EventHeader& event_header, FileData data, uint64_t injest_mode, RequestCallback callback) override;
+    Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t injest_mode,
+                   RequestCallback callback) override;
     AbstractLogger* log__;
     std::unique_ptr<RequestPool> request_pool__;
 
@@ -38,9 +39,9 @@ class ProducerImpl : public Producer {
     Error SendMetaData(const std::string& metadata, RequestCallback callback) override;
 
   private:
-    Error Send(const EventHeader& event_header, FileData data, std::string metadata, std::string full_path,
+    Error Send(const EventHeader& event_header, FileData data, std::string full_path, uint64_t injest_mode,
                RequestCallback callback);
-    GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header, uint64_t meta_size);
+    GenericRequestHeader GenerateNextSendRequest(const EventHeader& event_header);
     std::string source_cred_string_;
 };
 
diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp
index bdafe317b..134691656 100644
--- a/producer/api/unittests/test_producer.cpp
+++ b/producer/api/unittests/test_producer.cpp
@@ -56,7 +56,7 @@ TEST(Producer, SimpleWorkflowWihoutConnection) {
                                                 &err);
 
     asapo::EventHeader event_header{1, 1, ""};
-    auto err_send = producer->SendData(event_header, nullptr, "", nullptr);
+    auto err_send = producer->SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
 
     std::this_thread::sleep_for(std::chrono::milliseconds(100));
     ASSERT_THAT(producer, Ne(nullptr));
diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp
index 832761096..aa0df2348 100644
--- a/producer/api/unittests/test_producer_impl.cpp
+++ b/producer/api/unittests/test_producer_impl.cpp
@@ -91,14 +91,14 @@ TEST_F(ProducerImplTests, SendReturnsError) {
     EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return(
             asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release()));
     asapo::EventHeader event_header{1, 1, ""};
-    auto err = producer.SendData(event_header, nullptr, "", nullptr);
+    auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull));
 }
 
 TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) {
     std::string long_string(asapo::kMaxMessageSize + 100, 'a');
     asapo::EventHeader event_header{1, 1, long_string};
-    auto err = producer.SendData(event_header, nullptr, "", nullptr);
+    auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong));
 }
 
@@ -106,14 +106,14 @@ TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) {
 TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) {
     EXPECT_CALL(mock_logger, Error(testing::HasSubstr("error checking")));
     asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize + 1, ""};
-    auto err = producer.SendData(event_header, nullptr, "", nullptr);
+    auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge));
 }
 
 TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) {
     EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset size")));
-    asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize, "", 1};
-    auto err = producer.SendData(event_header, nullptr, "", nullptr);
+    asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize, "", "", 1};
+    auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kErrorSubsetSize));
 }
 
@@ -126,8 +126,8 @@ TEST_F(ProducerImplTests, UsesDefaultStream) {
                                         expected_id, expected_size, expected_name, 0, 0))).WillOnce(Return(
                                                     nullptr));
 
-    asapo::EventHeader event_header{expected_id, expected_size, expected_name};
-    auto err = producer.SendData(event_header, nullptr, expected_metadata, nullptr);
+    asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata};
+    auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode , nullptr);
 
     ASSERT_THAT(err, Eq(nullptr));
 }
@@ -141,8 +141,8 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) {
                                         expected_id, expected_size, expected_name, 0, 0))).WillOnce(Return(
                                                     nullptr));
 
-    asapo::EventHeader event_header{expected_id, expected_size, expected_name};
-    auto err = producer.SendData(event_header, nullptr, expected_metadata, nullptr);
+    asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata};
+    auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode, nullptr);
 
     ASSERT_THAT(err, Eq(nullptr));
 }
@@ -155,8 +155,8 @@ TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) {
                                         expected_subset_id, expected_subset_size))).WillOnce(Return(
                                                     nullptr));
 
-    asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_subset_id, expected_subset_size};
-    auto err = producer.SendData(event_header, nullptr, expected_metadata, nullptr);
+    asapo::EventHeader event_header{expected_id, expected_size, expected_name, expected_metadata, expected_subset_id, expected_subset_size};
+    auto err = producer.SendData(event_header, nullptr, asapo::kDefaultIngestMode , nullptr);
 
     ASSERT_THAT(err, Eq(nullptr));
 }
@@ -187,7 +187,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) {
                                                     nullptr));
 
     asapo::EventHeader event_header{expected_id, 0, expected_name};
-    auto err = producer.SendFile(event_header, expected_fullpath, nullptr);
+    auto err = producer.SendFile(event_header, expected_fullpath, asapo::kDefaultIngestMode, nullptr);
 
     ASSERT_THAT(err, Eq(nullptr));
 }
diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp
index db22a15f7..e23fb03e6 100644
--- a/producer/event_monitor_producer/src/main_eventmon.cpp
+++ b/producer/event_monitor_producer/src/main_eventmon.cpp
@@ -135,7 +135,7 @@ int main (int argc, char* argv[]) {
         event_header.file_id = ++i;
         HandleSubsets(&event_header);
         producer->SendFile(event_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator +
-                           event_header.file_name, ProcessAfterSend);
+                           event_header.file_name, asapo::kDefaultIngestMode, ProcessAfterSend);
     }
 
     logger->Info("Producer exit. Processed " + std::to_string(i) + " files");
-- 
GitLab