Skip to content
Snippets Groups Projects
Commit 6ce2f328 authored by Juergen Hannappel's avatar Juergen Hannappel
Browse files

partially compiling producer glue

parent 7314865f
Branches
Tags
No related merge requests found
#ifndef __COMMON_C_H__
#define __COMMON_C_H__
typedef int AsapoBool;
#ifndef __COMMON_C_INTERFACE_IMPLEMENTATION__
typedef void* AsapoConsumerHandle;
typedef void* AsapoSourceCredentialsHandle;
typedef void* AsapoErrorHandle;
typedef void* AsapoStringHandle;
typedef void* AsapoStreamInfoHandle;
typedef void* AsapoStreamInfosHandle;
typedef void* AsapoMessageDataHandle;
#endif
void asapo_free_handle(void** handle);
void* asapo_new_handle();
void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t max_size);
AsapoBool asapo_is_error(AsapoErrorHandle err);
const char* asapo_string_c_str(const AsapoStringHandle str);
size_t asapo_string_size(const AsapoStringHandle str);
AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos,
size_t index);
size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos);
uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info);
const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info);
AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info);
const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info);
void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info,
struct timespec* stamp);
void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info,
struct timespec* stamp);
const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data);
#endif
#ifndef __COMMON_C_GLUE_H__
#define __COMMON_C_GLUE_H__
#include <memory>
#include "asapo/common/data_structs.h"
#include "asapo/common/error.h"
#define __COMMON_C_INTERFACE_IMPLEMENTATION__
class AsapoHandle {
public:
......@@ -22,4 +25,63 @@ class AsapoHandlerHolder final : public AsapoHandle {
bool manage_memory_{true};
};
//! handle for credentials to access a source from a producer
/// created by asapo_create_source_credentials()
/// free after use with asapo_free_handle()
/// \sa asapo::SourceCredentials
typedef AsapoHandlerHolder<asapo::SourceCredentials>* AsapoSourceCredentialsHandle;
//! handle for an asapo error
/// needs to be cleared after use with asapo_free_handle()
/// text version of an error: asapo_error_explain()
/// enum value of the error: asapo_error_get_type(), \sa ::AsapoErrorType asapo::ConsumerErrorType
typedef AsapoHandlerHolder<asapo::ErrorInterface>* AsapoErrorHandle;
//! handle for string return types
/// return type of several functions
/// free after use with asapo_free_handle()
/// a const pointer to the content can be obtained with asapo_string_c_str()
typedef AsapoHandlerHolder<std::string>* AsapoStringHandle;
//! handle for info about a stream,
/// may be set via asapo_stream_infos_get_info()
/// \sa asapo::StreamInfo asapo_stream_info_get_last_id() asapo_stream_info_get_name() asapo_stream_info_get_ffinished() asapo_stream_info_get_next_stream() asapo_stream_info_get_timestamp_created() asapo_stream_info_get_timestamp_last_entry()
typedef AsapoHandlerHolder<asapo::StreamInfo>* AsapoStreamInfoHandle;
//! handle for a set of stream infos
/// touch only with proper functions and use asapo_free_handle() to delete,
/// created by asapo_consumer_get_stream_list()
/// \sa asapo_free_handle() asapo_stream_infos_get_item() asapo_stream_infos_get_size()
typedef AsapoHandlerHolder<asapo::StreamInfos>* AsapoStreamInfosHandle;
//! handle for data recieved by the consumer
/// set as outout parameter via asapo_consumer_get_next(), asapo_consumer_get_last()
/// free after use with asapo_free_handle()
/// access to the data is granted via asapo_message_data_get_as_chars()
typedef AsapoHandlerHolder<uint8_t[]>* AsapoMessageDataHandle;
int process_error(AsapoErrorHandle* error, asapo::Error err,
const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr);
AsapoHandle* handle_or_null(AsapoHandle* handle, AsapoErrorHandle* error, asapo::Error err,
const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr);
template <typename T> AsapoHandlerHolder<T>* handle_or_null_t(T* object,
AsapoErrorHandle* error,
asapo::Error err,
const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) {
if (process_error(error, std::move(err), p_exclude_err_template) < 0) {
return nullptr;
} else {
return new AsapoHandlerHolder<T>(object);
}
}
#endif
#define __COMMON_C_INTERFACE_IMPLEMENTATION__
#include "asapo/asapo_common_c_glue.h
//! handle for credentials to access a source from a producer
/// created by asapo_create_source_credentials()
/// free after use with asapo_free_handle()
/// \sa asapo::SourceCredentials
typedef AsapoHandlerHolder<asapo::SourceCredentials>* AsapoSourceCredentialsHandle;
int process_error(AsapoErrorHandle* error, asapo::Error err,
const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) {
int retval = (err == nullptr || (p_exclude_err_template != nullptr && err == *p_exclude_err_template)) ? 0 : -1;
if (error == nullptr) {
return retval;
}
if (*error == nullptr) {
*error = new AsapoHandlerHolder<asapo::ErrorInterface> {err.release()};
} else {
(*error)->handle = std::move(err);
}
return retval;
}
AsapoHandle* handle_or_null(AsapoHandle* handle, AsapoErrorHandle* error, asapo::Error err,
const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) {
if (process_error(error, std::move(err), p_exclude_err_template) < 0) {
if (handle != nullptr) {
delete handle;
}
return nullptr;
} else {
return handle;
}
}
extern "C" {
......@@ -26,4 +49,114 @@ extern "C" {
return NULL;
}
/// \copydoc asapo::ErrorInterface::Explain()
/// \param[out] buf will be filled with the explanation
/// \param[in] maxSize max size of buf in bytes
void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t maxSize) {
if (error->handle) {
strncpy(buf, error->handle->Explain().c_str(), maxSize - 1);
buf[maxSize] = '\0';
} else {
static std::string msg("no error");
std::copy_n(msg.begin(), std::max(msg.size(), maxSize), buf);
buf[std::max(maxSize - 1, msg.size())] = '\0';
}
}
//! give a pointer to the content of the asapoString
/// \param[in] str the handle of the asapoString in question
/// \return const char pointer to the content
const char* asapo_string_c_str(const AsapoStringHandle str) {
return str->handle->c_str();
}
//! give the size of an asapoString
/// \param[in] str the handle of the asapoString in question
/// \return the number of bytes in the string , not counting the final nul byte.
size_t asapo_string_size(const AsapoStringHandle str) {
return str->handle->size();
}
//! get one stream info from a stream infos handle
/// \param[in] infos handle for stream infos
/// \param[in] index index od info to get, starts at 0
/// \return handle to stream info
AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos,
size_t index) {
return new AsapoHandlerHolder<asapo::StreamInfo> {&(infos->handle->at(index)), false};
}
//! get size (number of elements) of a stream infos handle
/// \param[in] infos handle for stream infos
/// \return number of elements in the handle
size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos) {
return infos->handle->size();
}
//! get last id from the stream info object
/// \param[in] info handle of the stream info object
/// \return last id
/// \sa asapo::StreamInfo
uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info) {
return info->handle->last_id;
}
//! get stream name from the stream info object
/// \param[in] info handle of the stream info object
/// \return pointer to the name string, valid until asapoStreamInfos object is deleted
/// \sa asapo::StreamInfo
const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info) {
return info->handle->name.c_str();
}
//! get finished state from the stream info object
/// \param[in] info handle of the stream info object
/// \return finised state, 0 = false
/// \sa asapo::StreamInfo
AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info) {
return info->handle->finished;
}
//! get next stream name? from the stream info object
/// \param[in] info handle of the stream info object
/// \return pointer to the name string, valid until asapoStreamInfos object is deleted
/// \sa asapo::StreamInfo
const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info) {
return info->handle->next_stream.c_str();
}
//! get creation time from the stream info object
/// \param[in] info handle of the stream info object
/// \param[out] stamp creation timestamp as timespec
/// \sa asapo::StreamInfo
void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info,
struct timespec* stamp) {
time_point_to_time_spec(info->handle->timestamp_created, stamp);
}
//! get time of last entry from the stream info object
/// \param[in] info handle of the stream info object
/// \param[out] stamp last entry timestamp as timespec
/// \sa asapo::StreamInfo
void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info,
struct timespec* stamp) {
time_point_to_time_spec(info->handle->timestamp_lastentry, stamp);
}
//! give acess to data
/// \param[in] data the handle of the data
/// \return const char pointer to the data blob, valid until deletion or reuse of data
const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data) {
return reinterpret_cast<const char*>(data->handle.get());
}
//! wraps asapo::SourceCredentials::SourceCredentials()
/// \copydoc asapo::SourceCredentials::SourceCredentials()
AsapoSourceCredentialsHandle asapo_create_source_credentials(enum AsapoSourceType type,
const char* beamtime,
const char* beamline,
const char* data_source,
const char* token) {
auto retval = new asapo::SourceCredentials(static_cast<asapo::SourceType>(type),
beamtime, beamline,
data_source, token);
return new AsapoHandlerHolder<asapo::SourceCredentials> {retval};
}
}
......@@ -2,16 +2,10 @@
#define __CONSUMER_C_H__
#ifndef __CONSUMER_C_INTERFACE_IMPLEMENTATION__
typedef int AsapoBool;
#include <asapo/common_c.h>
typedef void* AsapoSourceCredentialsHandle;
typedef void* AsapoErrorHandle;
typedef void* AsapoConsumerHandle;
typedef void* AsapoMessageMetaHandle;
typedef void* AsapoMessageMetasHandle;
typedef void* AsapoMessageDataHandle;
typedef void* AsapoStringHandle;
typedef void* AsapoStreamInfoHandle;
typedef void* AsapoStreamInfosHandle;
typedef void* AsapoIdListHandle;
typedef void* AsapoDataSetHandle;
typedef void* AsapoPartialErrorDataHandle;
......@@ -52,20 +46,18 @@ enum AsapoNetworkConnectionType {
kAsapoTcp,
kFabric
};
void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t max_size);
enum AsapoConsumerErrorType asapo_error_get_type(const AsapoErrorHandle error);
AsapoConsumerHandle asapo_create_consumer(const char* server_name,
const char* source_path,
AsapoBool has_filesysytem,
AsapoSourceCredentialsHandle source,
AsapoErrorHandle* error);
AsapoBool asapo_is_error(AsapoErrorHandle err);
AsapoStringHandle asapo_consumer_generate_new_group_id(AsapoConsumerHandle consumer, AsapoErrorHandle* err);
const char* asapo_string_c_str(const AsapoStringHandle str);
size_t asapo_string_size(const AsapoStringHandle str);
void asapo_consumer_set_timeout(AsapoConsumerHandle consumer, uint64_t timeout_ms);
int asapo_consumer_reset_last_read_marker(AsapoConsumerHandle consumer,
......@@ -107,9 +99,6 @@ AsapoStreamInfosHandle asapo_consumer_get_stream_list(AsapoConsumerHandle consum
const char* from,
enum AsapoStreamFilter filter,
AsapoErrorHandle* error);
AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos,
size_t index);
size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos);
int asapo_consumer_delete_stream(AsapoConsumerHandle consumer,
const char* stream,
......@@ -182,14 +171,6 @@ const char* asapo_message_meta_get_metadata(const AsapoMessageMetaHandle md);
uint64_t asapo_message_meta_get_buf_id(const AsapoMessageMetaHandle md);
uint64_t asapo_message_meta_get_dataset_substream(const AsapoMessageMetaHandle md);
uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info);
const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info);
AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info);
const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info);
void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info,
struct timespec* stamp);
void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info,
struct timespec* stamp);
uint64_t asapo_dataset_get_id(const AsapoDataSetHandle set);
uint64_t asapo_dataset_get_expected_size(const AsapoDataSetHandle set);
......@@ -213,7 +194,4 @@ uint64_t asapo_consumer_error_get_id_max(const AsapoConsumerErrorDataHandle erro
const char* asapo_consumer_error_get_next_stream(const AsapoConsumerErrorDataHandle error_payload);
void asapo_free_handle(void** handle);
void* asapo_new_handle();
#endif
......@@ -13,18 +13,6 @@ typedef int AsapoBool;
/// \sa asapo::Consumer
typedef AsapoHandlerHolder<asapo::Consumer>* AsapoConsumerHandle;
//! handle for credentials to access a source from a consumer
/// created by asapo_create_source_credentials()
/// free after use with asapo_free_handle()
/// \sa asapo::SourceCredentials
typedef AsapoHandlerHolder<asapo::SourceCredentials>* AsapoSourceCredentialsHandle;
//! handle for an asapo error
/// needs to be cleared after use with asapo_free_handle()
/// text version of an error: asapo_error_explain()
/// enum value of the error: asapo_error_get_type(), \sa ::AsapoErrorType asapo::ConsumerErrorType
typedef AsapoHandlerHolder<asapo::ErrorInterface>* AsapoErrorHandle;
//! handle for metadata of a message
/// create with asapo_new_handle()
/// free after use with asapo_free_handle()
......@@ -38,28 +26,7 @@ typedef AsapoHandlerHolder<asapo::MessageMeta>* AsapoMessageMetaHandle;
/// \sa asapo::MessageMetas
typedef AsapoHandlerHolder<asapo::MessageMetas>* AsapoMessageMetasHandle;
//! handle for data recieved by the consumer
/// set as outout parameter via asapo_consumer_get_next(), asapo_consumer_get_last()
/// free after use with asapo_free_handle()
/// access to the data is granted via asapo_message_data_get_as_chars()
typedef AsapoHandlerHolder<uint8_t[]>* AsapoMessageDataHandle;
//! handle for string return types
/// return type of several functions
/// free after use with asapo_free_handle()
/// a const pointer to the content can be obtained with asapo_string_c_str()
typedef AsapoHandlerHolder<std::string>* AsapoStringHandle;
//! handle for info about a stream,
/// may be set via asapo_stream_infos_get_info()
/// \sa asapo::StreamInfo asapo_stream_info_get_last_id() asapo_stream_info_get_name() asapo_stream_info_get_ffinished() asapo_stream_info_get_next_stream() asapo_stream_info_get_timestamp_created() asapo_stream_info_get_timestamp_last_entry()
typedef AsapoHandlerHolder<asapo::StreamInfo>* AsapoStreamInfoHandle;
//! handle for a set of stream infos
/// touch only with proper functions and use asapo_free_handle() to delete,
/// created by asapo_consumer_get_stream_list()
/// \sa asapo_free_handle() asapo_stream_infos_get_item() asapo_stream_infos_get_size()
typedef AsapoHandlerHolder<asapo::StreamInfos>* AsapoStreamInfosHandle;
//! handle for message id lists
/// touch only with proper functions and use asapo_free_handle() to delete,
......@@ -86,37 +53,10 @@ typedef AsapoHandlerHolder<asapo::ConsumerErrorData>* AsapoConsumerErrorDataHand
#include <algorithm>
template<typename t>
constexpr bool operator==(unsigned lhs, t rhs) {
return lhs == static_cast<typename std::underlying_type<t>::type>(rhs);
}
int process_error(AsapoErrorHandle* error, asapo::Error err,
const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) {
int retval = (err == nullptr || (p_exclude_err_template != nullptr && err == *p_exclude_err_template)) ? 0 : -1;
if (error == nullptr) {
return retval;
}
if (*error == nullptr) {
*error = new AsapoHandlerHolder<asapo::ErrorInterface> {err.release()};
} else {
(*error)->handle = std::move(err);
}
return retval;
}
AsapoHandle* handle_or_null(AsapoHandle* handle, AsapoErrorHandle* error, asapo::Error err,
const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) {
if (process_error(error, std::move(err), p_exclude_err_template) < 0) {
if (handle != nullptr) {
delete handle;
}
return nullptr;
} else {
return handle;
}
constexpr bool operator==(const unsigned int& lhs, const t& rhs) {
return lhs == static_cast<typename std::make_unsigned<typename std::underlying_type<t>::type>::type>(rhs);
}
#define dataGetterStart \
asapo::MessageData d; \
asapo::MessageMeta* fi = info ? new asapo::MessageMeta : nullptr; \
......@@ -169,19 +109,6 @@ extern "C" {
return err != nullptr && err->handle != nullptr;
}
/// \copydoc asapo::ErrorInterface::Explain()
/// \param[out] buf will be filled with the explanation
/// \param[in] maxSize max size of buf in bytes
void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t maxSize) {
if (error->handle) {
strncpy(buf, error->handle->Explain().c_str(), maxSize - 1);
buf[maxSize] = '\0';
} else {
static std::string msg("no error");
std::copy_n(msg.begin(), std::max(msg.size(), maxSize), buf);
buf[std::max(maxSize - 1, msg.size())] = '\0';
}
}
enum AsapoConsumerErrorType asapo_error_get_type(const AsapoErrorHandle error) {
auto consumer_err =
......@@ -194,6 +121,7 @@ extern "C" {
}
}
//! creata a consumer
/// \copydoc asapo::ConsumerFactory::CreateConsumer
/// \param[out] error NULL or pointer to error handle to be set
......@@ -227,18 +155,6 @@ extern "C" {
return static_cast<AsapoStringHandle>(handle_or_null(retval, error, std::move(err)));
}
//! give a pointer to the content of the asapoString
/// \param[in] str the handle of the asapoString in question
/// \return const char pointer to the content
const char* asapo_string_c_str(const AsapoStringHandle str) {
return str->handle->c_str();
}
//! give the size of an asapoString
/// \param[in] str the handle of the asapoString in question
/// \return the number of bytes in the string , not counting the final nul byte.
size_t asapo_string_size(const AsapoStringHandle str) {
return str->handle->size();
}
//! wraps asapo::Consumer::SetTimeout()
/// \copydoc asapo::Consumer::SetTimeout()
......@@ -340,7 +256,9 @@ extern "C" {
//! wraps asapo::Consumer::CurrentConnectionType()
/// \copydoc asapo::Consumer::CurrentConnectionType()
/// \param[in] consumer the handle of the consumer concerned
enum AsapoNetworkConnectionType asapo_consumer_current_connection_type(AsapoConsumerHandle consumer);
enum AsapoNetworkConnectionType asapo_consumer_current_connection_type(AsapoConsumerHandle consumer) {
return static_cast<AsapoNetworkConnectionType>(consumer->handle->CurrentConnectionType());
}
//! get list of streams, wraps asapo::Consumer::GetStreamList()
......@@ -364,21 +282,6 @@ extern "C" {
return static_cast<AsapoStreamInfosHandle>(handle_or_null(retval, error, std::move(err)));
}
//! get one stream info from a stream infos handle
/// \param[in] infos handle for stream infos
/// \param[in] index index od info to get, starts at 0
/// \return handle to stream info
AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos,
size_t index) {
return new AsapoHandlerHolder<asapo::StreamInfo> {&(infos->handle->at(index)), false};
}
//! get size (number of elements) of a stream infos handle
/// \param[in] infos handle for stream infos
/// \return number of elements in the handle
size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos) {
return infos->handle->size();
}
//! wraps asapo::Consumer::DeleteStream()
/// \copydoc asapo::Consumer::DeleteStream()
......@@ -592,25 +495,6 @@ extern "C" {
consumer->handle->SetResendNacs(resend, delay_ms, resend_attempts);
}
//! give acess to data
/// \param[in] data the handle of the data
/// \return const char pointer to the data blob, valid until deletion or reuse of data
const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data) {
return reinterpret_cast<const char*>(data->handle.get());
}
//! wraps asapo::SourceCredentials::SourceCredentials()
/// \copydoc asapo::SourceCredentials::SourceCredentials()
AsapoSourceCredentialsHandle asapo_create_source_credentials(enum AsapoSourceType type,
const char* beamtime,
const char* beamline,
const char* data_source,
const char* token) {
auto retval = new asapo::SourceCredentials(static_cast<asapo::SourceType>(type),
beamtime, beamline,
data_source, token);
return new AsapoHandlerHolder<asapo::SourceCredentials> {retval};
}
//! get name from the metadata object
/// \param[in] md handle of the metadata object
......@@ -671,50 +555,6 @@ extern "C" {
return md->handle->dataset_substream;
}
//! get last id from the stream info object
/// \param[in] info handle of the stream info object
/// \return last id
/// \sa asapo::StreamInfo
uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info) {
return info->handle->last_id;
}
//! get stream name from the stream info object
/// \param[in] info handle of the stream info object
/// \return pointer to the name string, valid until asapoStreamInfos object is deleted
/// \sa asapo::StreamInfo
const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info) {
return info->handle->name.c_str();
}
//! get finished state from the stream info object
/// \param[in] info handle of the stream info object
/// \return finised state, 0 = false
/// \sa asapo::StreamInfo
AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info) {
return info->handle->finished;
}
//! get next stream name? from the stream info object
/// \param[in] info handle of the stream info object
/// \return pointer to the name string, valid until asapoStreamInfos object is deleted
/// \sa asapo::StreamInfo
const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info) {
return info->handle->next_stream.c_str();
}
//! get creation time from the stream info object
/// \param[in] info handle of the stream info object
/// \param[out] stamp creation timestamp as timespec
/// \sa asapo::StreamInfo
void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info,
struct timespec* stamp) {
time_point_to_time_spec(info->handle->timestamp_created, stamp);
}
//! get time of last entry from the stream info object
/// \param[in] info handle of the stream info object
/// \param[out] stamp last entry timestamp as timespec
/// \sa asapo::StreamInfo
void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info,
struct timespec* stamp) {
time_point_to_time_spec(info->handle->timestamp_lastentry, stamp);
}
//! get id from data set object
/// \param[in] set handle of the data set object
......
#ifndef __PRODUCER_C_H__
#define __PRODUCER_C_H__
#include "asapo/common_c.h"
#include "asapo/common/common_c.h"
#ifndef __PRODUCER_C_INTERFACE_IMPLEMENTATION__
typedef void* AsapoProducerHandle;
typedef void* AsapoRequestCallbackPayloadHandle;
typedef void* AsapoMessageHeaderHandle;
#endif
typedef void(*AsapoRequestCallback)(AsapoRequestCallbackPayloadHandle, AsapoErrorHandle);
//! c version of asapo::RequestHandlerType
enum AsapoRequestHandlerType {
kTcp,
kFilesystem
};
AsapoProducerHandle asapo_create_producer(const char* endpoint,
uint8_t n_processing_threads,
AsapoRequestHandlerType type,
AsapoSourceCredentialsHandle source_cred,
uint64_t timeout_ms,
AsapoErrorHandle* error);
int asapo_producer_get_version_info(AsapoProducerHandle producer,
AsapoStringHandle client_info,
AsapoStringHandle server_info,
AsapoBool* supported,
AsapoErrorHandle* error);
AsapoStreamInfoHandle asapo_producer_get_stream_info(AsapoProducerHandle producer,
const char* stream,
uint64_t timeout_ms,
AsapoErrorHandle* error);
AsapoStringHandle asapo_producer_get_stream_meta(AsapoProducerHandle producer,
const char* stream,
uint64_t timeout_ms,
AsapoErrorHandle* error);
AsapoStringHandle asapo_producer_get_beamtime_meta(AsapoProducerHandle producer,
uint64_t timeout_ms,
AsapoErrorHandle* error);
int asapo_producer_delete_stream(AsapoProducerHandle producer,
const char* stream,
uint64_t timeout_ms,
AsapoBool delete_meta,
AsapoBool error_on_not_exist,
AsapoErrorHandle* error);
AsapoStreamInfoHandle asapo_producer_get_last_stream(AsapoProducerHandle producer,
uint64_t timeout_ms,
AsapoErrorHandle* error);
int asapo_producer_send(AsapoProducerHandle producer,
const AsapoMessageHeaderHandle message_header,
AsapoMessageDataHandle data,
uint64_t ingest_mode,
const char* stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
int asapo_producer_send_file(AsapoProducerHandle producer,
const AsapoMessageHeaderHandle message_header,
const char* file_name,
uint64_t ingest_mode,
const char* stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer,
uint64_t last_id,
const char* stream,
const char* next_stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
#endif
......@@ -16,7 +16,7 @@ set(SOURCE_FILES
################################
add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:json_parser>
$<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version>)
target_include_directories(${TARGET_NAME} PUBLIC include ${ASAPO_CXX_COMMON_INCLUDE_DIR})
target_include_directories(${TARGET_NAME} PUBLIC ../c/include include ${ASAPO_CXX_COMMON_INCLUDE_DIR})
target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
################################
......
#define __PRODUCER_C_INTERFACE_IMPLEMENTATION__
#include "asapo/common/internal/asapo_common_c_glue.h"
#include "asapo/asapo_producer.h"
......@@ -10,8 +9,97 @@
/// all operations are done with asapo_producer_xxx() functions
/// \sa asapo::Producer
typedef AsapoHandlerHolder<asapo::Producer>* AsapoProducerHandle;
typedef AsapoHandlerHolder<asapo::RequestCallbackPayload>* AsapoRequestCallbackPayloadHandle;
typedef AsapoHandlerHolder<asapo::MessageHeader>* AsapoMessageHeaderHandle;
extern "C" {
#include "asapo/producer_c.h"
AsapoProducerHandle asapo_create_producer(const char* endpoint,
uint8_t n_processing_threads,
AsapoRequestHandlerType type,
AsapoSourceCredentialsHandle source_cred,
uint64_t timeout_ms,
AsapoErrorHandle* error) {
asapo::Error err;
auto c = asapo::Producer::Create(endpoint,
n_processing_threads,
static_cast<asapo::RequestHandlerType>(type),
*(source_cred->handle),
timeout_ms,
&err);
return handle_or_null_t(c.release(), error, std::move(err));
}
int asapo_producer_get_version_info(AsapoProducerHandle producer,
AsapoStringHandle client_info,
AsapoStringHandle server_info,
AsapoBool* supported,
AsapoErrorHandle* error) {
bool supp;
auto err = producer->handle->GetVersionInfo(client_info->handle.get(),
server_info->handle.get(),
&supp);
*supported = supp;
return process_error(error, std::move(err));
}
AsapoStreamInfoHandle asapo_producer_get_stream_info(AsapoProducerHandle producer,
const char* stream,
uint64_t timeout_ms,
AsapoErrorHandle* error);
AsapoStringHandle asapo_producer_get_stream_meta(AsapoProducerHandle producer,
const char* stream,
uint64_t timeout_ms,
AsapoErrorHandle* error);
AsapoStringHandle asapo_producer_get_beamtime_meta(AsapoProducerHandle producer,
uint64_t timeout_ms,
AsapoErrorHandle* error);
int asapo_producer_delete_stream(AsapoProducerHandle producer,
const char* stream,
uint64_t timeout_ms,
AsapoBool delete_meta,
AsapoBool error_on_not_exist,
AsapoErrorHandle* error);
AsapoStreamInfoHandle asapo_producer_get_last_stream(AsapoProducerHandle producer,
uint64_t timeout_ms,
AsapoErrorHandle* error);
int asapo_producer_send(AsapoProducerHandle producer,
const AsapoMessageHeaderHandle message_header,
AsapoMessageDataHandle data,
uint64_t ingest_mode,
const char* stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error) {
asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void {
auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload);
auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>;
callback(payLoadHandle, errorHandle);
err = std::move(errorHandle->handle);
delete payLoadHandle;
};
auto err = producer->handle->Send(*message_header->handle,
std::move(data->handle),
ingest_mode,
stream,
wrapper);
return process_error(error, std::move(err));
}
int asapo_producer_send_file(AsapoProducerHandle producer,
const AsapoMessageHeaderHandle message_header,
const char* file_name,
uint64_t ingest_mode,
const char* stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer,
uint64_t last_id,
const char* stream,
const char* next_stream,
AsapoRequestCallback callback,
AsapoErrorHandle* error);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment