From 9fbf3feea895749588ffa2516811b8ea4e95fe7d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=BCrgen=20Hannappel?= <juergen.hannappel@desy.de>
Date: Wed, 7 Jul 2021 12:17:28 +0200
Subject: [PATCH] add producer functions with wrapper

---
 .../cpp/include/asapo/common/data_structs.h   |  2 +
 producer/api/c/include/asapo/producer_c.h     | 22 +++++-
 producer/api/cpp/src/producer_c_glue.cpp      | 67 ++++++++++++++++---
 3 files changed, 80 insertions(+), 11 deletions(-)

diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h
index 1c771e9a9..b864e310d 100644
--- a/common/cpp/include/asapo/common/data_structs.h
+++ b/common/cpp/include/asapo/common/data_structs.h
@@ -154,6 +154,8 @@ enum class MetaIngestOp : uint64_t {
 struct MetaIngestMode {
     MetaIngestOp op;
     bool upsert;
+    MetaIngestMode() = default;
+    MetaIngestMode(MetaIngestOp aOp, bool aUpsert): op(aOp), upsert(aUpsert) {};
     uint64_t Encode() {
         return static_cast<uint64_t>(op) + 10 * static_cast<uint64_t>(upsert);
     }
diff --git a/producer/api/c/include/asapo/producer_c.h b/producer/api/c/include/asapo/producer_c.h
index 157b6361b..2561a7086 100644
--- a/producer/api/c/include/asapo/producer_c.h
+++ b/producer/api/c/include/asapo/producer_c.h
@@ -17,6 +17,13 @@ enum AsapoRequestHandlerType {
     kFilesystem
 };
 
+//! c version of asapo::MetaIngestOp
+enum AsapoMetaIngestOp {
+    kInsert = 1,
+    kReplace = 2,
+    kUpdate = 3
+};
+
 AsapoProducerHandle asapo_create_producer(const char* endpoint,
                                           uint8_t n_processing_threads,
                                           AsapoRequestHandlerType type,
@@ -64,10 +71,23 @@ int asapo_producer_send_file(AsapoProducerHandle producer,
                              AsapoRequestCallback callback,
                              AsapoErrorHandle* error);
 int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer,
-                                             uint64_t last_id,
                                              const char* stream,
+                                             uint64_t last_id,
                                              const char* next_stream,
                                              AsapoRequestCallback callback,
                                              AsapoErrorHandle* error);
+int asapo_producer_send_beamtime_metadata(AsapoProducerHandle producer,
+                                          const char* metadata,
+                                          AsapoMetaIngestOp mode,
+                                          AsapoBool upsert,
+                                          AsapoRequestCallback callback,
+                                          AsapoErrorHandle* error);
+int asapo_producer_send_stream_metadata(AsapoProducerHandle producer,
+                                        const char* metadata,
+                                        AsapoMetaIngestOp mode,
+                                        AsapoBool upsert,
+                                        const char* stream,
+                                        AsapoRequestCallback callback,
+                                        AsapoErrorHandle* error);
 
 #endif
diff --git a/producer/api/cpp/src/producer_c_glue.cpp b/producer/api/cpp/src/producer_c_glue.cpp
index 66692eff5..472f44b9e 100644
--- a/producer/api/cpp/src/producer_c_glue.cpp
+++ b/producer/api/cpp/src/producer_c_glue.cpp
@@ -64,6 +64,14 @@ extern "C" {
             uint64_t timeout_ms,
             AsapoErrorHandle* error);
 
+#define BUILD_WRAPPER asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void { \
+            auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload); \
+            auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>(err.release()); \
+            callback(payLoadHandle, errorHandle); \
+            delete errorHandle; \
+            delete payLoadHandle; \
+        }
+
     int asapo_producer_send(AsapoProducerHandle producer,
                             const AsapoMessageHeaderHandle message_header,
                             AsapoMessageDataHandle data,
@@ -71,13 +79,7 @@ extern "C" {
                             const char* stream,
                             AsapoRequestCallback callback,
                             AsapoErrorHandle* error) {
-        asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void {
-            auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload);
-            auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>;
-            callback(payLoadHandle, errorHandle);
-            err = std::move(errorHandle->handle);
-            delete payLoadHandle;
-        };
+        BUILD_WRAPPER;
         auto err = producer->handle->Send(*message_header->handle,
                                           std::move(data->handle),
                                           ingest_mode,
@@ -91,13 +93,58 @@ extern "C" {
                                  uint64_t ingest_mode,
                                  const char* stream,
                                  AsapoRequestCallback callback,
-                                 AsapoErrorHandle* error);
+                                 AsapoErrorHandle* error) {
+        BUILD_WRAPPER;
+        auto err = producer->handle->SendFile(*message_header->handle,
+                                              file_name,
+                                              ingest_mode,
+                                              stream,
+                                              wrapper);
+        return process_error(error, std::move(err));
+    }
     int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer,
-                                                 uint64_t last_id,
                                                  const char* stream,
+                                                 uint64_t last_id,
                                                  const char* next_stream,
                                                  AsapoRequestCallback callback,
-                                                 AsapoErrorHandle* error);
+                                                 AsapoErrorHandle* error) {
+        BUILD_WRAPPER;
+        auto err = producer->handle->SendStreamFinishedFlag(stream,
+                   last_id,
+                   next_stream,
+                   wrapper);
+        return process_error(error, std::move(err));
+    }
+    int asapo_producer_send_beamtime_metadata(AsapoProducerHandle producer,
+                                              const char* metadata,
+                                              AsapoMetaIngestOp mode,
+                                              AsapoBool upsert,
+                                              AsapoRequestCallback callback,
+                                              AsapoErrorHandle* error) {
+        BUILD_WRAPPER;
+        asapo::MetaIngestMode im(static_cast<asapo::MetaIngestOp>(mode), upsert != 0);
+        auto err = producer->handle->SendBeamtimeMetadata(metadata,
+                                                          im,
+                                                          wrapper);
+        return process_error(error, std::move(err));
+
+    }
+    int asapo_producer_send_stream_metadata(AsapoProducerHandle producer,
+                                            const char* metadata,
+                                            AsapoMetaIngestOp mode,
+                                            AsapoBool upsert,
+                                            const char* stream,
+                                            AsapoRequestCallback callback,
+                                            AsapoErrorHandle* error) {
+        BUILD_WRAPPER;
+        asapo::MetaIngestMode im(static_cast<asapo::MetaIngestOp>(mode), upsert != 0);
+        auto err = producer->handle->SendStreamMetadata(metadata,
+                                                        im,
+                                                        stream,
+                                                        wrapper);
+        return process_error(error, std::move(err));
+
+    }
 
 
 
-- 
GitLab