From 6ce2f3287fa5f2640dcdeef0bbb81c41afc7368e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Hannappel?= <juergen.hannappel@desy.de> Date: Tue, 6 Jul 2021 18:18:59 +0200 Subject: [PATCH] partially compiling producer glue --- common/cpp/include/asapo/common/common_c.h | 37 +++- .../common/internal/asapo_common_c_glue.h | 62 +++++++ common/cpp/src/common/common_c_glue.cpp | 143 ++++++++++++++- consumer/api/c/include/asapo/consumer_c.h | 28 +-- consumer/api/cpp/src/consumer_c_glue.cpp | 172 +----------------- producer/api/c/include/asapo/producer_c.h | 65 ++++++- producer/api/cpp/CMakeLists.txt | 2 +- producer/api/cpp/src/producer_c_glue.cpp | 90 ++++++++- 8 files changed, 399 insertions(+), 200 deletions(-) diff --git a/common/cpp/include/asapo/common/common_c.h b/common/cpp/include/asapo/common/common_c.h index a08e9ac2a..54a6f6b4a 100644 --- a/common/cpp/include/asapo/common/common_c.h +++ b/common/cpp/include/asapo/common/common_c.h @@ -1,8 +1,43 @@ #ifndef __COMMON_C_H__ #define __COMMON_C_H__ + +typedef int AsapoBool; #ifndef __COMMON_C_INTERFACE_IMPLEMENTATION__ -typedef void* AsapoConsumerHandle; +typedef void* AsapoSourceCredentialsHandle; +typedef void* AsapoErrorHandle; +typedef void* AsapoStringHandle; +typedef void* AsapoStreamInfoHandle; +typedef void* AsapoStreamInfosHandle; +typedef void* AsapoMessageDataHandle; #endif + +void asapo_free_handle(void** handle); +void* asapo_new_handle(); + + +void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t max_size); +AsapoBool asapo_is_error(AsapoErrorHandle err); + +const char* asapo_string_c_str(const AsapoStringHandle str); +size_t asapo_string_size(const AsapoStringHandle str); + +AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos, + size_t index); +size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos); + +uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info); +const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info); +AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info); +const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info); +void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info, + struct timespec* stamp); +void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info, + struct timespec* stamp); + + + +const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data); + #endif diff --git a/common/cpp/include/asapo/common/internal/asapo_common_c_glue.h b/common/cpp/include/asapo/common/internal/asapo_common_c_glue.h index 889a32115..f561f84bd 100644 --- a/common/cpp/include/asapo/common/internal/asapo_common_c_glue.h +++ b/common/cpp/include/asapo/common/internal/asapo_common_c_glue.h @@ -1,6 +1,9 @@ #ifndef __COMMON_C_GLUE_H__ #define __COMMON_C_GLUE_H__ #include <memory> +#include "asapo/common/data_structs.h" +#include "asapo/common/error.h" +#define __COMMON_C_INTERFACE_IMPLEMENTATION__ class AsapoHandle { public: @@ -22,4 +25,63 @@ class AsapoHandlerHolder final : public AsapoHandle { bool manage_memory_{true}; }; + +//! handle for credentials to access a source from a producer +/// created by asapo_create_source_credentials() +/// free after use with asapo_free_handle() +/// \sa asapo::SourceCredentials +typedef AsapoHandlerHolder<asapo::SourceCredentials>* AsapoSourceCredentialsHandle; + + +//! handle for an asapo error +/// needs to be cleared after use with asapo_free_handle() +/// text version of an error: asapo_error_explain() +/// enum value of the error: asapo_error_get_type(), \sa ::AsapoErrorType asapo::ConsumerErrorType +typedef AsapoHandlerHolder<asapo::ErrorInterface>* AsapoErrorHandle; + + +//! handle for string return types +/// return type of several functions +/// free after use with asapo_free_handle() +/// a const pointer to the content can be obtained with asapo_string_c_str() +typedef AsapoHandlerHolder<std::string>* AsapoStringHandle; + +//! handle for info about a stream, +/// may be set via asapo_stream_infos_get_info() +/// \sa asapo::StreamInfo asapo_stream_info_get_last_id() asapo_stream_info_get_name() asapo_stream_info_get_ffinished() asapo_stream_info_get_next_stream() asapo_stream_info_get_timestamp_created() asapo_stream_info_get_timestamp_last_entry() +typedef AsapoHandlerHolder<asapo::StreamInfo>* AsapoStreamInfoHandle; + +//! handle for a set of stream infos +/// touch only with proper functions and use asapo_free_handle() to delete, +/// created by asapo_consumer_get_stream_list() +/// \sa asapo_free_handle() asapo_stream_infos_get_item() asapo_stream_infos_get_size() +typedef AsapoHandlerHolder<asapo::StreamInfos>* AsapoStreamInfosHandle; + +//! handle for data recieved by the consumer +/// set as outout parameter via asapo_consumer_get_next(), asapo_consumer_get_last() +/// free after use with asapo_free_handle() +/// access to the data is granted via asapo_message_data_get_as_chars() +typedef AsapoHandlerHolder<uint8_t[]>* AsapoMessageDataHandle; + + + +int process_error(AsapoErrorHandle* error, asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr); + + +AsapoHandle* handle_or_null(AsapoHandle* handle, AsapoErrorHandle* error, asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr); + +template <typename T> AsapoHandlerHolder<T>* handle_or_null_t(T* object, + AsapoErrorHandle* error, + asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) { + if (process_error(error, std::move(err), p_exclude_err_template) < 0) { + return nullptr; + } else { + return new AsapoHandlerHolder<T>(object); + } +} + + #endif diff --git a/common/cpp/src/common/common_c_glue.cpp b/common/cpp/src/common/common_c_glue.cpp index 1b60c1dfb..626deb835 100644 --- a/common/cpp/src/common/common_c_glue.cpp +++ b/common/cpp/src/common/common_c_glue.cpp @@ -1,11 +1,34 @@ #define __COMMON_C_INTERFACE_IMPLEMENTATION__ #include "asapo/asapo_common_c_glue.h -//! handle for credentials to access a source from a producer -/// created by asapo_create_source_credentials() -/// free after use with asapo_free_handle() -/// \sa asapo::SourceCredentials -typedef AsapoHandlerHolder<asapo::SourceCredentials>* AsapoSourceCredentialsHandle; + +int process_error(AsapoErrorHandle* error, asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) { + int retval = (err == nullptr || (p_exclude_err_template != nullptr && err == *p_exclude_err_template)) ? 0 : -1; + if (error == nullptr) { + return retval; + } + if (*error == nullptr) { + *error = new AsapoHandlerHolder<asapo::ErrorInterface> {err.release()}; + } else { + (*error)->handle = std::move(err); + } + return retval; +} + +AsapoHandle* handle_or_null(AsapoHandle* handle, AsapoErrorHandle* error, asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) { + if (process_error(error, std::move(err), p_exclude_err_template) < 0) { + if (handle != nullptr) { + delete handle; + } + return nullptr; + } else { + return handle; + } +} + + extern "C" { @@ -26,4 +49,114 @@ extern "C" { return NULL; } +/// \copydoc asapo::ErrorInterface::Explain() +/// \param[out] buf will be filled with the explanation +/// \param[in] maxSize max size of buf in bytes + void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t maxSize) { + if (error->handle) { + strncpy(buf, error->handle->Explain().c_str(), maxSize - 1); + buf[maxSize] = '\0'; + } else { + static std::string msg("no error"); + std::copy_n(msg.begin(), std::max(msg.size(), maxSize), buf); + buf[std::max(maxSize - 1, msg.size())] = '\0'; + } + } + + + //! give a pointer to the content of the asapoString +/// \param[in] str the handle of the asapoString in question +/// \return const char pointer to the content + const char* asapo_string_c_str(const AsapoStringHandle str) { + return str->handle->c_str(); + } +//! give the size of an asapoString +/// \param[in] str the handle of the asapoString in question +/// \return the number of bytes in the string , not counting the final nul byte. + size_t asapo_string_size(const AsapoStringHandle str) { + return str->handle->size(); + } + +//! get one stream info from a stream infos handle +/// \param[in] infos handle for stream infos +/// \param[in] index index od info to get, starts at 0 +/// \return handle to stream info + AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos, + size_t index) { + return new AsapoHandlerHolder<asapo::StreamInfo> {&(infos->handle->at(index)), false}; + } + +//! get size (number of elements) of a stream infos handle +/// \param[in] infos handle for stream infos +/// \return number of elements in the handle + size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos) { + return infos->handle->size(); + } + +//! get last id from the stream info object +/// \param[in] info handle of the stream info object +/// \return last id +/// \sa asapo::StreamInfo + uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info) { + return info->handle->last_id; + } +//! get stream name from the stream info object +/// \param[in] info handle of the stream info object +/// \return pointer to the name string, valid until asapoStreamInfos object is deleted +/// \sa asapo::StreamInfo + const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info) { + return info->handle->name.c_str(); + } +//! get finished state from the stream info object +/// \param[in] info handle of the stream info object +/// \return finised state, 0 = false +/// \sa asapo::StreamInfo + AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info) { + return info->handle->finished; + } +//! get next stream name? from the stream info object +/// \param[in] info handle of the stream info object +/// \return pointer to the name string, valid until asapoStreamInfos object is deleted +/// \sa asapo::StreamInfo + const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info) { + return info->handle->next_stream.c_str(); + } +//! get creation time from the stream info object +/// \param[in] info handle of the stream info object +/// \param[out] stamp creation timestamp as timespec +/// \sa asapo::StreamInfo + void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info, + struct timespec* stamp) { + time_point_to_time_spec(info->handle->timestamp_created, stamp); + } +//! get time of last entry from the stream info object +/// \param[in] info handle of the stream info object +/// \param[out] stamp last entry timestamp as timespec +/// \sa asapo::StreamInfo + void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info, + struct timespec* stamp) { + time_point_to_time_spec(info->handle->timestamp_lastentry, stamp); + } + +//! give acess to data +/// \param[in] data the handle of the data +/// \return const char pointer to the data blob, valid until deletion or reuse of data + const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data) { + return reinterpret_cast<const char*>(data->handle.get()); + } + +//! wraps asapo::SourceCredentials::SourceCredentials() +/// \copydoc asapo::SourceCredentials::SourceCredentials() + AsapoSourceCredentialsHandle asapo_create_source_credentials(enum AsapoSourceType type, + const char* beamtime, + const char* beamline, + const char* data_source, + const char* token) { + auto retval = new asapo::SourceCredentials(static_cast<asapo::SourceType>(type), + beamtime, beamline, + data_source, token); + return new AsapoHandlerHolder<asapo::SourceCredentials> {retval}; + } + + } diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h index f033896c6..d6ef254ce 100644 --- a/consumer/api/c/include/asapo/consumer_c.h +++ b/consumer/api/c/include/asapo/consumer_c.h @@ -2,16 +2,10 @@ #define __CONSUMER_C_H__ #ifndef __CONSUMER_C_INTERFACE_IMPLEMENTATION__ -typedef int AsapoBool; #include <asapo/common_c.h> -typedef void* AsapoSourceCredentialsHandle; -typedef void* AsapoErrorHandle; +typedef void* AsapoConsumerHandle; typedef void* AsapoMessageMetaHandle; typedef void* AsapoMessageMetasHandle; -typedef void* AsapoMessageDataHandle; -typedef void* AsapoStringHandle; -typedef void* AsapoStreamInfoHandle; -typedef void* AsapoStreamInfosHandle; typedef void* AsapoIdListHandle; typedef void* AsapoDataSetHandle; typedef void* AsapoPartialErrorDataHandle; @@ -52,20 +46,18 @@ enum AsapoNetworkConnectionType { kAsapoTcp, kFabric }; -void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t max_size); + enum AsapoConsumerErrorType asapo_error_get_type(const AsapoErrorHandle error); + AsapoConsumerHandle asapo_create_consumer(const char* server_name, const char* source_path, AsapoBool has_filesysytem, AsapoSourceCredentialsHandle source, AsapoErrorHandle* error); -AsapoBool asapo_is_error(AsapoErrorHandle err); AsapoStringHandle asapo_consumer_generate_new_group_id(AsapoConsumerHandle consumer, AsapoErrorHandle* err); -const char* asapo_string_c_str(const AsapoStringHandle str); -size_t asapo_string_size(const AsapoStringHandle str); void asapo_consumer_set_timeout(AsapoConsumerHandle consumer, uint64_t timeout_ms); int asapo_consumer_reset_last_read_marker(AsapoConsumerHandle consumer, @@ -107,9 +99,6 @@ AsapoStreamInfosHandle asapo_consumer_get_stream_list(AsapoConsumerHandle consum const char* from, enum AsapoStreamFilter filter, AsapoErrorHandle* error); -AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos, - size_t index); -size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos); int asapo_consumer_delete_stream(AsapoConsumerHandle consumer, const char* stream, @@ -182,14 +171,6 @@ const char* asapo_message_meta_get_metadata(const AsapoMessageMetaHandle md); uint64_t asapo_message_meta_get_buf_id(const AsapoMessageMetaHandle md); uint64_t asapo_message_meta_get_dataset_substream(const AsapoMessageMetaHandle md); -uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info); -const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info); -AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info); -const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info); -void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info, - struct timespec* stamp); -void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info, - struct timespec* stamp); uint64_t asapo_dataset_get_id(const AsapoDataSetHandle set); uint64_t asapo_dataset_get_expected_size(const AsapoDataSetHandle set); @@ -213,7 +194,4 @@ uint64_t asapo_consumer_error_get_id_max(const AsapoConsumerErrorDataHandle erro const char* asapo_consumer_error_get_next_stream(const AsapoConsumerErrorDataHandle error_payload); -void asapo_free_handle(void** handle); -void* asapo_new_handle(); - #endif diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index 3c8d11fe6..bcc75654c 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -13,18 +13,6 @@ typedef int AsapoBool; /// \sa asapo::Consumer typedef AsapoHandlerHolder<asapo::Consumer>* AsapoConsumerHandle; -//! handle for credentials to access a source from a consumer -/// created by asapo_create_source_credentials() -/// free after use with asapo_free_handle() -/// \sa asapo::SourceCredentials -typedef AsapoHandlerHolder<asapo::SourceCredentials>* AsapoSourceCredentialsHandle; - -//! handle for an asapo error -/// needs to be cleared after use with asapo_free_handle() -/// text version of an error: asapo_error_explain() -/// enum value of the error: asapo_error_get_type(), \sa ::AsapoErrorType asapo::ConsumerErrorType -typedef AsapoHandlerHolder<asapo::ErrorInterface>* AsapoErrorHandle; - //! handle for metadata of a message /// create with asapo_new_handle() /// free after use with asapo_free_handle() @@ -38,28 +26,7 @@ typedef AsapoHandlerHolder<asapo::MessageMeta>* AsapoMessageMetaHandle; /// \sa asapo::MessageMetas typedef AsapoHandlerHolder<asapo::MessageMetas>* AsapoMessageMetasHandle; -//! handle for data recieved by the consumer -/// set as outout parameter via asapo_consumer_get_next(), asapo_consumer_get_last() -/// free after use with asapo_free_handle() -/// access to the data is granted via asapo_message_data_get_as_chars() -typedef AsapoHandlerHolder<uint8_t[]>* AsapoMessageDataHandle; - -//! handle for string return types -/// return type of several functions -/// free after use with asapo_free_handle() -/// a const pointer to the content can be obtained with asapo_string_c_str() -typedef AsapoHandlerHolder<std::string>* AsapoStringHandle; - -//! handle for info about a stream, -/// may be set via asapo_stream_infos_get_info() -/// \sa asapo::StreamInfo asapo_stream_info_get_last_id() asapo_stream_info_get_name() asapo_stream_info_get_ffinished() asapo_stream_info_get_next_stream() asapo_stream_info_get_timestamp_created() asapo_stream_info_get_timestamp_last_entry() -typedef AsapoHandlerHolder<asapo::StreamInfo>* AsapoStreamInfoHandle; -//! handle for a set of stream infos -/// touch only with proper functions and use asapo_free_handle() to delete, -/// created by asapo_consumer_get_stream_list() -/// \sa asapo_free_handle() asapo_stream_infos_get_item() asapo_stream_infos_get_size() -typedef AsapoHandlerHolder<asapo::StreamInfos>* AsapoStreamInfosHandle; //! handle for message id lists /// touch only with proper functions and use asapo_free_handle() to delete, @@ -86,37 +53,10 @@ typedef AsapoHandlerHolder<asapo::ConsumerErrorData>* AsapoConsumerErrorDataHand #include <algorithm> template<typename t> -constexpr bool operator==(unsigned lhs, t rhs) { - return lhs == static_cast<typename std::underlying_type<t>::type>(rhs); -} - -int process_error(AsapoErrorHandle* error, asapo::Error err, - const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) { - int retval = (err == nullptr || (p_exclude_err_template != nullptr && err == *p_exclude_err_template)) ? 0 : -1; - if (error == nullptr) { - return retval; - } - if (*error == nullptr) { - *error = new AsapoHandlerHolder<asapo::ErrorInterface> {err.release()}; - } else { - (*error)->handle = std::move(err); - } - return retval; -} - -AsapoHandle* handle_or_null(AsapoHandle* handle, AsapoErrorHandle* error, asapo::Error err, - const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) { - if (process_error(error, std::move(err), p_exclude_err_template) < 0) { - if (handle != nullptr) { - delete handle; - } - return nullptr; - } else { - return handle; - } +constexpr bool operator==(const unsigned int& lhs, const t& rhs) { + return lhs == static_cast<typename std::make_unsigned<typename std::underlying_type<t>::type>::type>(rhs); } - #define dataGetterStart \ asapo::MessageData d; \ asapo::MessageMeta* fi = info ? new asapo::MessageMeta : nullptr; \ @@ -169,19 +109,6 @@ extern "C" { return err != nullptr && err->handle != nullptr; } -/// \copydoc asapo::ErrorInterface::Explain() -/// \param[out] buf will be filled with the explanation -/// \param[in] maxSize max size of buf in bytes - void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t maxSize) { - if (error->handle) { - strncpy(buf, error->handle->Explain().c_str(), maxSize - 1); - buf[maxSize] = '\0'; - } else { - static std::string msg("no error"); - std::copy_n(msg.begin(), std::max(msg.size(), maxSize), buf); - buf[std::max(maxSize - 1, msg.size())] = '\0'; - } - } enum AsapoConsumerErrorType asapo_error_get_type(const AsapoErrorHandle error) { auto consumer_err = @@ -194,6 +121,7 @@ extern "C" { } } + //! creata a consumer /// \copydoc asapo::ConsumerFactory::CreateConsumer /// \param[out] error NULL or pointer to error handle to be set @@ -227,18 +155,6 @@ extern "C" { return static_cast<AsapoStringHandle>(handle_or_null(retval, error, std::move(err))); } -//! give a pointer to the content of the asapoString -/// \param[in] str the handle of the asapoString in question -/// \return const char pointer to the content - const char* asapo_string_c_str(const AsapoStringHandle str) { - return str->handle->c_str(); - } -//! give the size of an asapoString -/// \param[in] str the handle of the asapoString in question -/// \return the number of bytes in the string , not counting the final nul byte. - size_t asapo_string_size(const AsapoStringHandle str) { - return str->handle->size(); - } //! wraps asapo::Consumer::SetTimeout() /// \copydoc asapo::Consumer::SetTimeout() @@ -340,7 +256,9 @@ extern "C" { //! wraps asapo::Consumer::CurrentConnectionType() /// \copydoc asapo::Consumer::CurrentConnectionType() /// \param[in] consumer the handle of the consumer concerned - enum AsapoNetworkConnectionType asapo_consumer_current_connection_type(AsapoConsumerHandle consumer); + enum AsapoNetworkConnectionType asapo_consumer_current_connection_type(AsapoConsumerHandle consumer) { + return static_cast<AsapoNetworkConnectionType>(consumer->handle->CurrentConnectionType()); + } //! get list of streams, wraps asapo::Consumer::GetStreamList() @@ -364,21 +282,6 @@ extern "C" { return static_cast<AsapoStreamInfosHandle>(handle_or_null(retval, error, std::move(err))); } -//! get one stream info from a stream infos handle -/// \param[in] infos handle for stream infos -/// \param[in] index index od info to get, starts at 0 -/// \return handle to stream info - AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos, - size_t index) { - return new AsapoHandlerHolder<asapo::StreamInfo> {&(infos->handle->at(index)), false}; - } - -//! get size (number of elements) of a stream infos handle -/// \param[in] infos handle for stream infos -/// \return number of elements in the handle - size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos) { - return infos->handle->size(); - } //! wraps asapo::Consumer::DeleteStream() /// \copydoc asapo::Consumer::DeleteStream() @@ -592,25 +495,6 @@ extern "C" { consumer->handle->SetResendNacs(resend, delay_ms, resend_attempts); } -//! give acess to data -/// \param[in] data the handle of the data -/// \return const char pointer to the data blob, valid until deletion or reuse of data - const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data) { - return reinterpret_cast<const char*>(data->handle.get()); - } - -//! wraps asapo::SourceCredentials::SourceCredentials() -/// \copydoc asapo::SourceCredentials::SourceCredentials() - AsapoSourceCredentialsHandle asapo_create_source_credentials(enum AsapoSourceType type, - const char* beamtime, - const char* beamline, - const char* data_source, - const char* token) { - auto retval = new asapo::SourceCredentials(static_cast<asapo::SourceType>(type), - beamtime, beamline, - data_source, token); - return new AsapoHandlerHolder<asapo::SourceCredentials> {retval}; - } //! get name from the metadata object /// \param[in] md handle of the metadata object @@ -671,50 +555,6 @@ extern "C" { return md->handle->dataset_substream; } -//! get last id from the stream info object -/// \param[in] info handle of the stream info object -/// \return last id -/// \sa asapo::StreamInfo - uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info) { - return info->handle->last_id; - } -//! get stream name from the stream info object -/// \param[in] info handle of the stream info object -/// \return pointer to the name string, valid until asapoStreamInfos object is deleted -/// \sa asapo::StreamInfo - const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info) { - return info->handle->name.c_str(); - } -//! get finished state from the stream info object -/// \param[in] info handle of the stream info object -/// \return finised state, 0 = false -/// \sa asapo::StreamInfo - AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info) { - return info->handle->finished; - } -//! get next stream name? from the stream info object -/// \param[in] info handle of the stream info object -/// \return pointer to the name string, valid until asapoStreamInfos object is deleted -/// \sa asapo::StreamInfo - const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info) { - return info->handle->next_stream.c_str(); - } -//! get creation time from the stream info object -/// \param[in] info handle of the stream info object -/// \param[out] stamp creation timestamp as timespec -/// \sa asapo::StreamInfo - void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info, - struct timespec* stamp) { - time_point_to_time_spec(info->handle->timestamp_created, stamp); - } -//! get time of last entry from the stream info object -/// \param[in] info handle of the stream info object -/// \param[out] stamp last entry timestamp as timespec -/// \sa asapo::StreamInfo - void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info, - struct timespec* stamp) { - time_point_to_time_spec(info->handle->timestamp_lastentry, stamp); - } //! get id from data set object /// \param[in] set handle of the data set object diff --git a/producer/api/c/include/asapo/producer_c.h b/producer/api/c/include/asapo/producer_c.h index cc4d942e2..157b6361b 100644 --- a/producer/api/c/include/asapo/producer_c.h +++ b/producer/api/c/include/asapo/producer_c.h @@ -1,10 +1,73 @@ #ifndef __PRODUCER_C_H__ #define __PRODUCER_C_H__ -#include "asapo/common_c.h" +#include "asapo/common/common_c.h" #ifndef __PRODUCER_C_INTERFACE_IMPLEMENTATION__ typedef void* AsapoProducerHandle; +typedef void* AsapoRequestCallbackPayloadHandle; +typedef void* AsapoMessageHeaderHandle; #endif +typedef void(*AsapoRequestCallback)(AsapoRequestCallbackPayloadHandle, AsapoErrorHandle); + + +//! c version of asapo::RequestHandlerType +enum AsapoRequestHandlerType { + kTcp, + kFilesystem +}; + +AsapoProducerHandle asapo_create_producer(const char* endpoint, + uint8_t n_processing_threads, + AsapoRequestHandlerType type, + AsapoSourceCredentialsHandle source_cred, + uint64_t timeout_ms, + AsapoErrorHandle* error); +int asapo_producer_get_version_info(AsapoProducerHandle producer, + AsapoStringHandle client_info, + AsapoStringHandle server_info, + AsapoBool* supported, + AsapoErrorHandle* error); +AsapoStreamInfoHandle asapo_producer_get_stream_info(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error); +AsapoStringHandle asapo_producer_get_stream_meta(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error); +AsapoStringHandle asapo_producer_get_beamtime_meta(AsapoProducerHandle producer, + uint64_t timeout_ms, + AsapoErrorHandle* error); +int asapo_producer_delete_stream(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoBool delete_meta, + AsapoBool error_on_not_exist, + AsapoErrorHandle* error); +AsapoStreamInfoHandle asapo_producer_get_last_stream(AsapoProducerHandle producer, + uint64_t timeout_ms, + AsapoErrorHandle* error); + +int asapo_producer_send(AsapoProducerHandle producer, + const AsapoMessageHeaderHandle message_header, + AsapoMessageDataHandle data, + uint64_t ingest_mode, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); +int asapo_producer_send_file(AsapoProducerHandle producer, + const AsapoMessageHeaderHandle message_header, + const char* file_name, + uint64_t ingest_mode, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); +int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer, + uint64_t last_id, + const char* stream, + const char* next_stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); #endif diff --git a/producer/api/cpp/CMakeLists.txt b/producer/api/cpp/CMakeLists.txt index f7bbf1bb4..a607b6d31 100644 --- a/producer/api/cpp/CMakeLists.txt +++ b/producer/api/cpp/CMakeLists.txt @@ -16,7 +16,7 @@ set(SOURCE_FILES ################################ add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version>) -target_include_directories(${TARGET_NAME} PUBLIC include ${ASAPO_CXX_COMMON_INCLUDE_DIR}) +target_include_directories(${TARGET_NAME} PUBLIC ../c/include include ${ASAPO_CXX_COMMON_INCLUDE_DIR}) target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) ################################ diff --git a/producer/api/cpp/src/producer_c_glue.cpp b/producer/api/cpp/src/producer_c_glue.cpp index ad46ac2da..66692eff5 100644 --- a/producer/api/cpp/src/producer_c_glue.cpp +++ b/producer/api/cpp/src/producer_c_glue.cpp @@ -1,5 +1,4 @@ #define __PRODUCER_C_INTERFACE_IMPLEMENTATION__ - #include "asapo/common/internal/asapo_common_c_glue.h" #include "asapo/asapo_producer.h" @@ -10,8 +9,97 @@ /// all operations are done with asapo_producer_xxx() functions /// \sa asapo::Producer typedef AsapoHandlerHolder<asapo::Producer>* AsapoProducerHandle; +typedef AsapoHandlerHolder<asapo::RequestCallbackPayload>* AsapoRequestCallbackPayloadHandle; +typedef AsapoHandlerHolder<asapo::MessageHeader>* AsapoMessageHeaderHandle; extern "C" { +#include "asapo/producer_c.h" + AsapoProducerHandle asapo_create_producer(const char* endpoint, + uint8_t n_processing_threads, + AsapoRequestHandlerType type, + AsapoSourceCredentialsHandle source_cred, + uint64_t timeout_ms, + AsapoErrorHandle* error) { + asapo::Error err; + auto c = asapo::Producer::Create(endpoint, + n_processing_threads, + static_cast<asapo::RequestHandlerType>(type), + *(source_cred->handle), + timeout_ms, + &err); + return handle_or_null_t(c.release(), error, std::move(err)); + + } + int asapo_producer_get_version_info(AsapoProducerHandle producer, + AsapoStringHandle client_info, + AsapoStringHandle server_info, + AsapoBool* supported, + AsapoErrorHandle* error) { + bool supp; + auto err = producer->handle->GetVersionInfo(client_info->handle.get(), + server_info->handle.get(), + &supp); + *supported = supp; + return process_error(error, std::move(err)); + } + + AsapoStreamInfoHandle asapo_producer_get_stream_info(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error); + AsapoStringHandle asapo_producer_get_stream_meta(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error); + AsapoStringHandle asapo_producer_get_beamtime_meta(AsapoProducerHandle producer, + uint64_t timeout_ms, + AsapoErrorHandle* error); + int asapo_producer_delete_stream(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoBool delete_meta, + AsapoBool error_on_not_exist, + AsapoErrorHandle* error); + AsapoStreamInfoHandle asapo_producer_get_last_stream(AsapoProducerHandle producer, + uint64_t timeout_ms, + AsapoErrorHandle* error); + + int asapo_producer_send(AsapoProducerHandle producer, + const AsapoMessageHeaderHandle message_header, + AsapoMessageDataHandle data, + uint64_t ingest_mode, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error) { + asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void { + auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload); + auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>; + callback(payLoadHandle, errorHandle); + err = std::move(errorHandle->handle); + delete payLoadHandle; + }; + auto err = producer->handle->Send(*message_header->handle, + std::move(data->handle), + ingest_mode, + stream, + wrapper); + return process_error(error, std::move(err)); + } + int asapo_producer_send_file(AsapoProducerHandle producer, + const AsapoMessageHeaderHandle message_header, + const char* file_name, + uint64_t ingest_mode, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); + int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer, + uint64_t last_id, + const char* stream, + const char* next_stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); + + } -- GitLab