diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index 1c771e9a96cee57d3ce00ccfb41b5dda3a3ffa4b..b864e310da8a0180d8bb5eab9543a7941a8703f6 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -154,6 +154,8 @@ enum class MetaIngestOp : uint64_t { struct MetaIngestMode { MetaIngestOp op; bool upsert; + MetaIngestMode() = default; + MetaIngestMode(MetaIngestOp aOp, bool aUpsert): op(aOp), upsert(aUpsert) {}; uint64_t Encode() { return static_cast<uint64_t>(op) + 10 * static_cast<uint64_t>(upsert); } diff --git a/producer/api/c/include/asapo/producer_c.h b/producer/api/c/include/asapo/producer_c.h index 157b6361b251c95a733769438d6465e7ffbed233..2561a7086a0ebe75c50a88cdb30f2379a4514b98 100644 --- a/producer/api/c/include/asapo/producer_c.h +++ b/producer/api/c/include/asapo/producer_c.h @@ -17,6 +17,13 @@ enum AsapoRequestHandlerType { kFilesystem }; +//! c version of asapo::MetaIngestOp +enum AsapoMetaIngestOp { + kInsert = 1, + kReplace = 2, + kUpdate = 3 +}; + AsapoProducerHandle asapo_create_producer(const char* endpoint, uint8_t n_processing_threads, AsapoRequestHandlerType type, @@ -64,10 +71,23 @@ int asapo_producer_send_file(AsapoProducerHandle producer, AsapoRequestCallback callback, AsapoErrorHandle* error); int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer, - uint64_t last_id, const char* stream, + uint64_t last_id, const char* next_stream, AsapoRequestCallback callback, AsapoErrorHandle* error); +int asapo_producer_send_beamtime_metadata(AsapoProducerHandle producer, + const char* metadata, + AsapoMetaIngestOp mode, + AsapoBool upsert, + AsapoRequestCallback callback, + AsapoErrorHandle* error); +int asapo_producer_send_stream_metadata(AsapoProducerHandle producer, + const char* metadata, + AsapoMetaIngestOp mode, + AsapoBool upsert, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); #endif diff --git a/producer/api/cpp/src/producer_c_glue.cpp b/producer/api/cpp/src/producer_c_glue.cpp index 66692eff5f27e120311d5d3a6d481a51ec4e0c09..472f44b9e1863e46c7b6161485e5abc5d10056d3 100644 --- a/producer/api/cpp/src/producer_c_glue.cpp +++ b/producer/api/cpp/src/producer_c_glue.cpp @@ -64,6 +64,14 @@ extern "C" { uint64_t timeout_ms, AsapoErrorHandle* error); +#define BUILD_WRAPPER asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void { \ + auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload); \ + auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>(err.release()); \ + callback(payLoadHandle, errorHandle); \ + delete errorHandle; \ + delete payLoadHandle; \ + } + int asapo_producer_send(AsapoProducerHandle producer, const AsapoMessageHeaderHandle message_header, AsapoMessageDataHandle data, @@ -71,13 +79,7 @@ extern "C" { const char* stream, AsapoRequestCallback callback, AsapoErrorHandle* error) { - asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void { - auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload); - auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>; - callback(payLoadHandle, errorHandle); - err = std::move(errorHandle->handle); - delete payLoadHandle; - }; + BUILD_WRAPPER; auto err = producer->handle->Send(*message_header->handle, std::move(data->handle), ingest_mode, @@ -91,13 +93,58 @@ extern "C" { uint64_t ingest_mode, const char* stream, AsapoRequestCallback callback, - AsapoErrorHandle* error); + AsapoErrorHandle* error) { + BUILD_WRAPPER; + auto err = producer->handle->SendFile(*message_header->handle, + file_name, + ingest_mode, + stream, + wrapper); + return process_error(error, std::move(err)); + } int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer, - uint64_t last_id, const char* stream, + uint64_t last_id, const char* next_stream, AsapoRequestCallback callback, - AsapoErrorHandle* error); + AsapoErrorHandle* error) { + BUILD_WRAPPER; + auto err = producer->handle->SendStreamFinishedFlag(stream, + last_id, + next_stream, + wrapper); + return process_error(error, std::move(err)); + } + int asapo_producer_send_beamtime_metadata(AsapoProducerHandle producer, + const char* metadata, + AsapoMetaIngestOp mode, + AsapoBool upsert, + AsapoRequestCallback callback, + AsapoErrorHandle* error) { + BUILD_WRAPPER; + asapo::MetaIngestMode im(static_cast<asapo::MetaIngestOp>(mode), upsert != 0); + auto err = producer->handle->SendBeamtimeMetadata(metadata, + im, + wrapper); + return process_error(error, std::move(err)); + + } + int asapo_producer_send_stream_metadata(AsapoProducerHandle producer, + const char* metadata, + AsapoMetaIngestOp mode, + AsapoBool upsert, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error) { + BUILD_WRAPPER; + asapo::MetaIngestMode im(static_cast<asapo::MetaIngestOp>(mode), upsert != 0); + auto err = producer->handle->SendStreamMetadata(metadata, + im, + stream, + wrapper); + return process_error(error, std::move(err)); + + }