Skip to content
Snippets Groups Projects
Commit 9fbf3fee authored by Juergen Hannappel's avatar Juergen Hannappel
Browse files

add producer functions with wrapper

parent 6ce2f328
No related branches found
No related tags found
No related merge requests found
......@@ -154,6 +154,8 @@ enum class MetaIngestOp : uint64_t {
struct MetaIngestMode {
MetaIngestOp op;
bool upsert;
MetaIngestMode() = default;
MetaIngestMode(MetaIngestOp aOp, bool aUpsert): op(aOp), upsert(aUpsert) {};
uint64_t Encode() {
return static_cast<uint64_t>(op) + 10 * static_cast<uint64_t>(upsert);
}
......
......@@ -17,6 +17,13 @@ enum AsapoRequestHandlerType {
kFilesystem
};
//! c version of asapo::MetaIngestOp
enum AsapoMetaIngestOp {
kInsert = 1,
kReplace = 2,
kUpdate = 3
};
AsapoProducerHandle asapo_create_producer(const char* endpoint,
uint8_t n_processing_threads,
AsapoRequestHandlerType type,
......@@ -64,10 +71,23 @@ int asapo_producer_send_file(AsapoProducerHandle producer,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer,
uint64_t last_id,
const char* stream,
uint64_t last_id,
const char* next_stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
int asapo_producer_send_beamtime_metadata(AsapoProducerHandle producer,
const char* metadata,
AsapoMetaIngestOp mode,
AsapoBool upsert,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
int asapo_producer_send_stream_metadata(AsapoProducerHandle producer,
const char* metadata,
AsapoMetaIngestOp mode,
AsapoBool upsert,
const char* stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
#endif
......@@ -64,6 +64,14 @@ extern "C" {
uint64_t timeout_ms,
AsapoErrorHandle* error);
#define BUILD_WRAPPER asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void { \
auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload); \
auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>(err.release()); \
callback(payLoadHandle, errorHandle); \
delete errorHandle; \
delete payLoadHandle; \
}
int asapo_producer_send(AsapoProducerHandle producer,
const AsapoMessageHeaderHandle message_header,
AsapoMessageDataHandle data,
......@@ -71,13 +79,7 @@ extern "C" {
const char* stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error) {
asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void {
auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload);
auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>;
callback(payLoadHandle, errorHandle);
err = std::move(errorHandle->handle);
delete payLoadHandle;
};
BUILD_WRAPPER;
auto err = producer->handle->Send(*message_header->handle,
std::move(data->handle),
ingest_mode,
......@@ -91,13 +93,58 @@ extern "C" {
uint64_t ingest_mode,
const char* stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
AsapoErrorHandle* error) {
BUILD_WRAPPER;
auto err = producer->handle->SendFile(*message_header->handle,
file_name,
ingest_mode,
stream,
wrapper);
return process_error(error, std::move(err));
}
int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer,
uint64_t last_id,
const char* stream,
uint64_t last_id,
const char* next_stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
AsapoErrorHandle* error) {
BUILD_WRAPPER;
auto err = producer->handle->SendStreamFinishedFlag(stream,
last_id,
next_stream,
wrapper);
return process_error(error, std::move(err));
}
int asapo_producer_send_beamtime_metadata(AsapoProducerHandle producer,
const char* metadata,
AsapoMetaIngestOp mode,
AsapoBool upsert,
AsapoRequestCallback callback,
AsapoErrorHandle* error) {
BUILD_WRAPPER;
asapo::MetaIngestMode im(static_cast<asapo::MetaIngestOp>(mode), upsert != 0);
auto err = producer->handle->SendBeamtimeMetadata(metadata,
im,
wrapper);
return process_error(error, std::move(err));
}
int asapo_producer_send_stream_metadata(AsapoProducerHandle producer,
const char* metadata,
AsapoMetaIngestOp mode,
AsapoBool upsert,
const char* stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error) {
BUILD_WRAPPER;
asapo::MetaIngestMode im(static_cast<asapo::MetaIngestOp>(mode), upsert != 0);
auto err = producer->handle->SendStreamMetadata(metadata,
im,
stream,
wrapper);
return process_error(error, std::move(err));
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment