diff --git a/common/cpp/include/asapo/common/common_c.h b/common/cpp/include/asapo/common/common_c.h new file mode 100644 index 0000000000000000000000000000000000000000..08cff2ad6c14c333959fc469e512871086e47a9e --- /dev/null +++ b/common/cpp/include/asapo/common/common_c.h @@ -0,0 +1,70 @@ +#ifndef __COMMON_C_H__ +#define __COMMON_C_H__ +#include <stddef.h> +#include <stdint.h> +#include <time.h> + +#define AsapoHandleSize 24 +typedef int AsapoBool; +#ifndef __COMMON_C_INTERFACE_IMPLEMENTATION__ +typedef struct { + char _[AsapoHandleSize]; +}* AsapoSourceCredentialsHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoErrorHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoStringHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoStreamInfoHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoStreamInfosHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoMessageDataHandle; +#endif + +//! c version of asapo::SourceType +enum AsapoSourceType { + kProcessed, + kRaw +}; + +#define asapo_free_handle(handle) asapo_free_handle__((void**)handle); +void asapo_free_handle__(void** handle); +void* asapo_new_handle(); + + +void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t max_size); +AsapoBool asapo_is_error(AsapoErrorHandle err); + +AsapoStringHandle asapo_string_from_c_str(const char* str); +const char* asapo_string_c_str(const AsapoStringHandle str); +size_t asapo_string_size(const AsapoStringHandle str); + +AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos, + size_t index); +size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos); + +uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info); +const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info); +AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info); +const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info); +void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info, + struct timespec* stamp); +void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info, + struct timespec* stamp); + +AsapoSourceCredentialsHandle asapo_create_source_credentials(enum AsapoSourceType type, + const char* beamtime, + const char* beamline, + const char* data_source, + const char* token); + + +const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data); + +#endif diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h index 1c771e9a96cee57d3ce00ccfb41b5dda3a3ffa4b..b864e310da8a0180d8bb5eab9543a7941a8703f6 100644 --- a/common/cpp/include/asapo/common/data_structs.h +++ b/common/cpp/include/asapo/common/data_structs.h @@ -154,6 +154,8 @@ enum class MetaIngestOp : uint64_t { struct MetaIngestMode { MetaIngestOp op; bool upsert; + MetaIngestMode() = default; + MetaIngestMode(MetaIngestOp aOp, bool aUpsert): op(aOp), upsert(aUpsert) {}; uint64_t Encode() { return static_cast<uint64_t>(op) + 10 * static_cast<uint64_t>(upsert); } diff --git a/common/cpp/include/asapo/common/internal/asapo_common_c_glue.h b/common/cpp/include/asapo/common/internal/asapo_common_c_glue.h new file mode 100644 index 0000000000000000000000000000000000000000..50e4153ef32976706606d1595389a2f39046be78 --- /dev/null +++ b/common/cpp/include/asapo/common/internal/asapo_common_c_glue.h @@ -0,0 +1,118 @@ +#ifndef __COMMON_C_GLUE_H__ +#define __COMMON_C_GLUE_H__ +#include <memory> +#include "asapo/common/data_structs.h" +#include "asapo/common/error.h" +#define __COMMON_C_INTERFACE_IMPLEMENTATION__ + +class AsapoHandle { + public: + virtual ~AsapoHandle() {}; +}; + +template<class T> +class AsapoHandlerHolder final : public AsapoHandle { + public: + AsapoHandlerHolder(bool manage_memory = true) : handle{nullptr}, manage_memory_{manage_memory} {}; + AsapoHandlerHolder(std::unique_ptr<T>& handle_i, bool manage_memory = true) : handle{handle_i.release()}, manage_memory_{manage_memory} {}; + AsapoHandlerHolder(T* handle_i, bool manage_memory = true) : handle{handle_i}, manage_memory_{manage_memory} {}; + ~AsapoHandlerHolder() override { + if (!manage_memory_) { + handle.release(); + } + } + std::unique_ptr<T> handle{nullptr}; + protected: + bool manage_memory_{true}; +}; + +template<> +class AsapoHandlerHolder < std::string> final : public AsapoHandle { + public: + AsapoHandlerHolder(const std::string& handle_i) : handle{new std::string(handle_i)} {}; + ~AsapoHandlerHolder() override = default; + std::unique_ptr<std::string> handle{nullptr}; +}; +//! handle for credentials to access a source from a producer +/// created by asapo_create_source_credentials() +/// free after use with asapo_free_handle() +/// \sa asapo::SourceCredentials +typedef AsapoHandlerHolder<asapo::SourceCredentials>* AsapoSourceCredentialsHandle; + + +//! handle for an asapo error +/// needs to be cleared after use with asapo_free_handle() +/// text version of an error: asapo_error_explain() +/// enum value of the error: asapo_error_get_type(), \sa ::AsapoErrorType asapo::ConsumerErrorType +typedef AsapoHandlerHolder<asapo::ErrorInterface>* AsapoErrorHandle; + + +//! handle for string return types +/// return type of several functions +/// free after use with asapo_free_handle() +/// a const pointer to the content can be obtained with asapo_string_c_str() +typedef AsapoHandlerHolder<std::string>* AsapoStringHandle; + +//! handle for info about a stream, +/// may be set via asapo_stream_infos_get_info() +/// \sa asapo::StreamInfo asapo_stream_info_get_last_id() asapo_stream_info_get_name() asapo_stream_info_get_ffinished() asapo_stream_info_get_next_stream() asapo_stream_info_get_timestamp_created() asapo_stream_info_get_timestamp_last_entry() +typedef AsapoHandlerHolder<asapo::StreamInfo>* AsapoStreamInfoHandle; + +//! handle for a set of stream infos +/// touch only with proper functions and use asapo_free_handle() to delete, +/// created by asapo_consumer_get_stream_list() +/// \sa asapo_free_handle() asapo_stream_infos_get_item() asapo_stream_infos_get_size() +typedef AsapoHandlerHolder<asapo::StreamInfos>* AsapoStreamInfosHandle; + +//! handle for data recieved by the consumer +/// set as outout parameter via asapo_consumer_get_next(), asapo_consumer_get_last() +/// free after use with asapo_free_handle() +/// access to the data is granted via asapo_message_data_get_as_chars() +typedef AsapoHandlerHolder<uint8_t[]>* AsapoMessageDataHandle; + + + +int process_error(AsapoErrorHandle* error, asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr); + + + +template <typename T> AsapoHandlerHolder<T>* handle_or_null_t(T* object, + AsapoErrorHandle* error, + asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) { + if (process_error(error, std::move(err), p_exclude_err_template) < 0) { + if (object != nullptr) { + delete object; + } + return nullptr; + } else { + return new AsapoHandlerHolder<T>(object); + } +} +template <typename T> AsapoHandlerHolder<T>* handle_or_null_t(std::unique_ptr<T>& object, + AsapoErrorHandle* error, + asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) { + if (process_error(error, std::move(err), p_exclude_err_template) < 0) { + return nullptr; + } else { + return new AsapoHandlerHolder<T>(object); + } +} +AsapoHandlerHolder<std::string>* handle_or_null_t(const std::string& object, + AsapoErrorHandle* error, + asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr); + + +template<typename u, typename t> +constexpr bool operator==(const u& lhs, const t& rhs) { + return static_cast<typename std::make_unsigned<u>::type>(lhs) + == static_cast<typename std::make_unsigned<typename std::underlying_type<t>::type>::type>(rhs); +} + + +void time_point_to_time_spec(std::chrono::system_clock::time_point tp, + struct timespec* stamp); +#endif diff --git a/common/cpp/src/common/CMakeLists.txt b/common/cpp/src/common/CMakeLists.txt index 5a22d46dc262c06116f083de5cd4a337d0c23e10..c267e42e447dd1b0018c8e71580aeb188323c0a9 100644 --- a/common/cpp/src/common/CMakeLists.txt +++ b/common/cpp/src/common/CMakeLists.txt @@ -1,7 +1,11 @@ set(TARGET_NAME common) +set(SOURCE_FILES common_c_glue.cpp) + set(TEST_SOURCE_FILES ../../unittests/common/test_error.cpp) +add_library(${TARGET_NAME} OBJECT ${SOURCE_FILES}) + set(TEST_LIBRARIES "${TARGET_NAME};system_io") -include_directories(${ASAPO_CXX_COMMON_INCLUDE_DIR}) +include_directories(${ASAPO_CXX_COMMON_INCLUDE_DIR} ../../include) add_plain_unit_test(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}") diff --git a/common/cpp/src/common/common_c_glue.cpp b/common/cpp/src/common/common_c_glue.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7e3d8f4937629c478811144c3cf56e2819a03cb5 --- /dev/null +++ b/common/cpp/src/common/common_c_glue.cpp @@ -0,0 +1,189 @@ +#define __COMMON_C_INTERFACE_IMPLEMENTATION__ +#include "asapo/common/internal/asapo_common_c_glue.h" +#include <algorithm> + +AsapoHandlerHolder<std::string>* handle_or_null_t(const std::string& object, + AsapoErrorHandle* error, + asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template) { + if (process_error(error, std::move(err), p_exclude_err_template) < 0) { + return nullptr; + } else { + return new AsapoHandlerHolder<std::string>(object); + } +} + + + +int process_error(AsapoErrorHandle* error, asapo::Error err, + const asapo::ErrorTemplateInterface* p_exclude_err_template) { + int retval = (err == nullptr || (p_exclude_err_template != nullptr && err == *p_exclude_err_template)) ? 0 : -1; + if (error == nullptr) { + return retval; + } + if (*error == nullptr) { + *error = new AsapoHandlerHolder<asapo::ErrorInterface> {err.release()}; + } else { + (*error)->handle = std::move(err); + } + return retval; +} + + +void time_point_to_time_spec(std::chrono::system_clock::time_point tp, + struct timespec* stamp) { + stamp->tv_sec = std::chrono::duration_cast<std::chrono::seconds>(tp.time_since_epoch()).count(); + stamp->tv_nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(tp.time_since_epoch()).count() % 1000000000; +} + + + +extern "C" { +#include "asapo/common/common_c.h" + static_assert(kProcessed == asapo::SourceType::kProcessed&& + kRaw == asapo::SourceType::kRaw, + "incompatible bit reps between c++ and c for asapo::SourceType"); + + static_assert(AsapoHandleSize == sizeof(AsapoHandlerHolder<int>), + "AsapoHandleSize is not correct"); + + + AsapoStringHandle asapo_string_from_c_str(const char* str) { + return new AsapoHandlerHolder<std::string> {std::string{str}}; + } + + + +//! free handle memory, set handle to NULL +/// \param[in] pointer to an ASAPO handle + void asapo_free_handle__(void** handle) { + if (*handle == nullptr) { + return; + } + auto a_handle = static_cast<AsapoHandle*>(*handle); + delete a_handle; + *handle = nullptr; + } + +//! creates a new ASAPO handle +/// \return initialized ASAPO handle + void* asapo_new_handle() { + return NULL; + } + +/// \copydoc asapo::ErrorInterface::Explain() +/// \param[out] buf will be filled with the explanation +/// \param[in] maxSize max size of buf in bytes + void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t maxSize) { + if (error->handle) { + const auto& msg = error->handle->Explain(); + std::copy_n(msg.begin(), std::max(msg.size(), maxSize), buf); + buf[std::max(maxSize - 1, msg.size())] = '\0'; + } else { + static std::string msg("no error"); + std::copy_n(msg.begin(), std::max(msg.size(), maxSize), buf); + buf[std::max(maxSize - 1, msg.size())] = '\0'; + } + } + + + //! give a pointer to the content of the asapoString +/// \param[in] str the handle of the asapoString in question +/// \return const char pointer to the content + const char* asapo_string_c_str(const AsapoStringHandle str) { + return str->handle->c_str(); + } +//! give the size of an asapoString +/// \param[in] str the handle of the asapoString in question +/// \return the number of bytes in the string , not counting the final nul byte. + size_t asapo_string_size(const AsapoStringHandle str) { + return str->handle->size(); + } + +//! get one stream info from a stream infos handle +/// \param[in] infos handle for stream infos +/// \param[in] index index od info to get, starts at 0 +/// \return handle to stream info + AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos, + size_t index) { + return new AsapoHandlerHolder<asapo::StreamInfo> {&(infos->handle->at(index)), false}; + } + +//! get size (number of elements) of a stream infos handle +/// \param[in] infos handle for stream infos +/// \return number of elements in the handle + size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos) { + return infos->handle->size(); + } + +//! get last id from the stream info object +/// \param[in] info handle of the stream info object +/// \return last id +/// \sa asapo::StreamInfo + uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info) { + return info->handle->last_id; + } +//! get stream name from the stream info object +/// \param[in] info handle of the stream info object +/// \return pointer to the name string, valid until asapoStreamInfos object is deleted +/// \sa asapo::StreamInfo + const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info) { + return info->handle->name.c_str(); + } +//! get finished state from the stream info object +/// \param[in] info handle of the stream info object +/// \return finised state, 0 = false +/// \sa asapo::StreamInfo + AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info) { + return info->handle->finished; + } +//! get next stream name? from the stream info object +/// \param[in] info handle of the stream info object +/// \return pointer to the name string, valid until asapoStreamInfos object is deleted +/// \sa asapo::StreamInfo + const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info) { + return info->handle->next_stream.c_str(); + } +//! get creation time from the stream info object +/// \param[in] info handle of the stream info object +/// \param[out] stamp creation timestamp as timespec +/// \sa asapo::StreamInfo + void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info, + struct timespec* stamp) { + time_point_to_time_spec(info->handle->timestamp_created, stamp); + } +//! get time of last entry from the stream info object +/// \param[in] info handle of the stream info object +/// \param[out] stamp last entry timestamp as timespec +/// \sa asapo::StreamInfo + void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info, + struct timespec* stamp) { + time_point_to_time_spec(info->handle->timestamp_lastentry, stamp); + } + +//! give acess to data +/// \param[in] data the handle of the data +/// \return const char pointer to the data blob, valid until deletion or reuse of data + const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data) { + return reinterpret_cast<const char*>(data->handle.get()); + } + +//! wraps asapo::SourceCredentials::SourceCredentials() +/// \copydoc asapo::SourceCredentials::SourceCredentials() + AsapoSourceCredentialsHandle asapo_create_source_credentials(enum AsapoSourceType type, + const char* beamtime, + const char* beamline, + const char* data_source, + const char* token) { + auto retval = new asapo::SourceCredentials(static_cast<asapo::SourceType>(type), + beamtime, beamline, + data_source, token); + return new AsapoHandlerHolder<asapo::SourceCredentials> {retval}; + } + + + AsapoBool asapo_is_error(AsapoErrorHandle err) { + return err != nullptr && err->handle != nullptr; + } + +} diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h index f7366a696be8654bfc85f0f637925d40d690fad0..bc73c17e2ec3c7f8e9207e068c0549ca315dc5b2 100644 --- a/consumer/api/c/include/asapo/consumer_c.h +++ b/consumer/api/c/include/asapo/consumer_c.h @@ -1,22 +1,29 @@ #ifndef __CONSUMER_C_H__ #define __CONSUMER_C_H__ +#include <asapo/common/common_c.h> #ifndef __CONSUMER_C_INTERFACE_IMPLEMENTATION__ -typedef int AsapoBool; - -typedef void* AsapoConsumerHandle; -typedef void* AsapoSourceCredentialsHandle; -typedef void* AsapoErrorHandle; -typedef void* AsapoMessageMetaHandle; -typedef void* AsapoMessageMetasHandle; -typedef void* AsapoMessageDataHandle; -typedef void* AsapoStringHandle; -typedef void* AsapoStreamInfoHandle; -typedef void* AsapoStreamInfosHandle; -typedef void* AsapoIdListHandle; -typedef void* AsapoDataSetHandle; -typedef void* AsapoPartialErrorDataHandle; -typedef void* AsapoConsumerErrorDataHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoConsumerHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoMessageMetaHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoMessageMetasHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoIdListHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoDataSetHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoPartialErrorDataHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoConsumerErrorDataHandle; #include <time.h> #include <stdint.h> @@ -42,35 +49,24 @@ enum AsapoStreamFilter { kFinishedStreams, kUnfinishedStreams }; -//! c version of asapo::SourceType -enum AsapoSourceType { - kProcessed, - kRaw -}; //! c version of asapo::NetworkConnectionType enum AsapoNetworkConnectionType { kUndefined, kAsapoTcp, kFabric }; -void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t max_size); + enum AsapoConsumerErrorType asapo_error_get_type(const AsapoErrorHandle error); + AsapoConsumerHandle asapo_create_consumer(const char* server_name, const char* source_path, AsapoBool has_filesysytem, AsapoSourceCredentialsHandle source, AsapoErrorHandle* error); -AsapoBool asapo_is_error(AsapoErrorHandle err); AsapoStringHandle asapo_consumer_generate_new_group_id(AsapoConsumerHandle consumer, AsapoErrorHandle* err); -const char* asapo_string_c_str(const AsapoStringHandle str); - -AsapoStringHandle asapo_string_from_c_str(const char* str); - - -size_t asapo_string_size(const AsapoStringHandle str); void asapo_consumer_set_timeout(AsapoConsumerHandle consumer, uint64_t timeout_ms); int asapo_consumer_reset_last_read_marker(AsapoConsumerHandle consumer, @@ -112,9 +108,6 @@ AsapoStreamInfosHandle asapo_consumer_get_stream_list(AsapoConsumerHandle consum const char* from, enum AsapoStreamFilter filter, AsapoErrorHandle* error); -AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos, - size_t index); -size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos); int asapo_consumer_delete_stream(AsapoConsumerHandle consumer, const char* stream, @@ -171,12 +164,6 @@ void asapo_consumer_set_resend_nacs(AsapoConsumerHandle consumer, uint64_t resend_attempts); const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data); -AsapoSourceCredentialsHandle asapo_create_source_credentials(enum AsapoSourceType type, - const char* beamtime, - const char* beamline, - const char* data_source, - const char* token); - const char* asapo_message_meta_get_name(const AsapoMessageMetaHandle md); void asapo_message_meta_get_timestamp(const AsapoMessageMetaHandle md, struct timespec* stamp); @@ -187,14 +174,6 @@ const char* asapo_message_meta_get_metadata(const AsapoMessageMetaHandle md); uint64_t asapo_message_meta_get_buf_id(const AsapoMessageMetaHandle md); uint64_t asapo_message_meta_get_dataset_substream(const AsapoMessageMetaHandle md); -uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info); -const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info); -AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info); -const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info); -void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info, - struct timespec* stamp); -void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info, - struct timespec* stamp); uint64_t asapo_dataset_get_id(const AsapoDataSetHandle set); uint64_t asapo_dataset_get_expected_size(const AsapoDataSetHandle set); @@ -218,7 +197,4 @@ uint64_t asapo_consumer_error_get_id_max(const AsapoConsumerErrorDataHandle erro const char* asapo_consumer_error_get_next_stream(const AsapoConsumerErrorDataHandle error_payload); -void asapo_free_handle(void** handle); -void* asapo_new_handle(); - #endif diff --git a/consumer/api/cpp/CMakeLists.txt b/consumer/api/cpp/CMakeLists.txt index 2b21e8d206b2cdd09368d8bc0eccaa72dabff40d..1009f2347afa4dcb39021c12d5015f41a01bc4e3 100644 --- a/consumer/api/cpp/CMakeLists.txt +++ b/consumer/api/cpp/CMakeLists.txt @@ -25,7 +25,7 @@ target_include_directories(consumer_lib_objects SYSTEM PRIVATE ${LIBFABRIC_INCLU if (BUILD_STATIC_CLIENT_LIBS) add_library(${TARGET_NAME} STATIC $<TARGET_OBJECTS:consumer_lib_objects> $<TARGET_OBJECTS:asapo_fabric_objects> $<TARGET_OBJECTS:system_io> - $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:curl_http_client> ) + $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:common>) target_include_directories(${TARGET_NAME} SYSTEM PUBLIC $<BUILD_INTERFACE:${LIBFABRIC_INCLUDE_DIR}>) target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>) target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../c/include>) @@ -55,7 +55,7 @@ endif() if (BUILD_SHARED_CLIENT_LIBS) add_library(${TARGET_NAME}_shared SHARED $<TARGET_OBJECTS:consumer_lib_objects> $<TARGET_OBJECTS:asapo_fabric_objects> $<TARGET_OBJECTS:system_io> - $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:curl_http_client> ) + $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:common>) target_include_directories(${TARGET_NAME}_shared SYSTEM PRIVATE ${LIBFABRIC_INCLUDE_DIR}) target_include_directories(${TARGET_NAME}_shared PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include ${ASAPO_CXX_COMMON_INCLUDE_DIR}) diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index e5c28ef52deb167f3938bcabb9c5cb8905b9f853..969804d432dfcfc09a82ab6ddc0913a606ea3546 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -1,28 +1,10 @@ #define __CONSUMER_C_INTERFACE_IMPLEMENTATION__ #include "asapo/asapo_consumer.h" +#include "asapo/common/internal/asapo_common_c_glue.h" //! boolean type typedef int AsapoBool; -class AsapoHandle { - public: - virtual ~AsapoHandle() {}; -}; - -template<class T> -class AsapoHandlerHolder final : public AsapoHandle { - public: - AsapoHandlerHolder(bool manage_memory = true) : handle{nullptr}, manage_memory_{manage_memory} {}; - AsapoHandlerHolder(T* handle_i, bool manage_memory = true) : handle{handle_i}, manage_memory_{manage_memory} {}; - ~AsapoHandlerHolder() override { - if (!manage_memory_) { - handle.release(); - } - } - std::unique_ptr<T> handle{nullptr}; - protected: - bool manage_memory_{true}; -}; //! handle for an asapo consumer /// created by asapo_create_consumer() @@ -31,18 +13,6 @@ class AsapoHandlerHolder final : public AsapoHandle { /// \sa asapo::Consumer typedef AsapoHandlerHolder<asapo::Consumer>* AsapoConsumerHandle; -//! handle for credentials to access a source from a consumer -/// created by asapo_create_source_credentials() -/// free after use with asapo_free_handle() -/// \sa asapo::SourceCredentials -typedef AsapoHandlerHolder<asapo::SourceCredentials>* AsapoSourceCredentialsHandle; - -//! handle for an asapo error -/// needs to be cleared after use with asapo_free_handle() -/// text version of an error: asapo_error_explain() -/// enum value of the error: asapo_error_get_type(), \sa ::AsapoErrorType asapo::ConsumerErrorType -typedef AsapoHandlerHolder<asapo::ErrorInterface>* AsapoErrorHandle; - //! handle for metadata of a message /// create with asapo_new_handle() /// free after use with asapo_free_handle() @@ -56,28 +26,7 @@ typedef AsapoHandlerHolder<asapo::MessageMeta>* AsapoMessageMetaHandle; /// \sa asapo::MessageMetas typedef AsapoHandlerHolder<asapo::MessageMetas>* AsapoMessageMetasHandle; -//! handle for data recieved by the consumer -/// set as outout parameter via asapo_consumer_get_next(), asapo_consumer_get_last() -/// free after use with asapo_free_handle() -/// access to the data is granted via asapo_message_data_get_as_chars() -typedef AsapoHandlerHolder<uint8_t[]>* AsapoMessageDataHandle; - -//! handle for string return types -/// return type of several functions -/// free after use with asapo_free_handle() -/// a const pointer to the content can be obtained with asapo_string_c_str() -typedef AsapoHandlerHolder<std::string>* AsapoStringHandle; - -//! handle for info about a stream, -/// may be set via asapo_stream_infos_get_info() -/// \sa asapo::StreamInfo asapo_stream_info_get_last_id() asapo_stream_info_get_name() asapo_stream_info_get_ffinished() asapo_stream_info_get_next_stream() asapo_stream_info_get_timestamp_created() asapo_stream_info_get_timestamp_last_entry() -typedef AsapoHandlerHolder<asapo::StreamInfo>* AsapoStreamInfoHandle; -//! handle for a set of stream infos -/// touch only with proper functions and use asapo_free_handle() to delete, -/// created by asapo_consumer_get_stream_list() -/// \sa asapo_free_handle() asapo_stream_infos_get_item() asapo_stream_infos_get_size() -typedef AsapoHandlerHolder<asapo::StreamInfos>* AsapoStreamInfosHandle; //! handle for message id lists /// touch only with proper functions and use asapo_free_handle() to delete, @@ -102,39 +51,6 @@ typedef AsapoHandlerHolder<asapo::PartialErrorData>* AsapoPartialErrorDataHandle typedef AsapoHandlerHolder<asapo::ConsumerErrorData>* AsapoConsumerErrorDataHandle; #include <algorithm> - -template<typename t> -constexpr bool operator==(unsigned lhs, t rhs) { - return lhs == unsigned(static_cast<typename std::underlying_type<t>::type>(rhs)); -} - -int process_error(AsapoErrorHandle* error, asapo::Error err, - const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) { - int retval = (err == nullptr || (p_exclude_err_template != nullptr && err == *p_exclude_err_template)) ? 0 : -1; - if (error == nullptr) { - return retval; - } - if (*error == nullptr) { - *error = new AsapoHandlerHolder<asapo::ErrorInterface> {err.release()}; - } else { - (*error)->handle = std::move(err); - } - return retval; -} - -AsapoHandle* handle_or_null(AsapoHandle* handle, AsapoErrorHandle* error, asapo::Error err, - const asapo::ErrorTemplateInterface* p_exclude_err_template = nullptr) { - if (process_error(error, std::move(err), p_exclude_err_template) < 0) { - if (handle != nullptr) { - delete handle; - } - return nullptr; - } else { - return handle; - } -} - - #define dataGetterStart \ asapo::MessageData d; \ asapo::MessageMeta* fi = info ? new asapo::MessageMeta : nullptr; \ @@ -169,38 +85,11 @@ extern "C" { kFinishedStreams == asapo::StreamFilter::kFinishedStreams&& kUnfinishedStreams == asapo::StreamFilter::kUnfinishedStreams, "incompatible bit reps between c++ and c for asapo::StreamFilter"); - static_assert(kProcessed == asapo::SourceType::kProcessed&& - kRaw == asapo::SourceType::kRaw, - "incompatible bit reps between c++ and c for asapo::SourceType"); static_assert(kUndefined == asapo::NetworkConnectionType::kUndefined&& kAsapoTcp == asapo::NetworkConnectionType::kAsapoTcp&& kFabric == asapo::NetworkConnectionType::kFabric, "incompatible bit reps between c++ and c for asapo::NetworkConnectionType"); - static void time_point_to_time_spec(std::chrono::system_clock::time_point tp, - struct timespec* stamp) { - stamp->tv_sec = std::chrono::duration_cast<std::chrono::seconds>(tp.time_since_epoch()).count(); - stamp->tv_nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(tp.time_since_epoch()).count() % 1000000000; - } - - AsapoBool asapo_is_error(AsapoErrorHandle err) { - return err != nullptr && err->handle != nullptr; - } - -/// \copydoc asapo::ErrorInterface::Explain() -/// \param[out] buf will be filled with the explanation -/// \param[in] maxSize max size of buf in bytes - void asapo_error_explain(const AsapoErrorHandle error, char* buf, size_t maxSize) { - if (error->handle) { - strncpy(buf, error->handle->Explain().c_str(), maxSize - 1); - buf[maxSize] = '\0'; - } else { - static std::string msg("no error"); - std::copy_n(msg.begin(), std::max(msg.size(), maxSize), buf); - buf[std::max(maxSize - 1, msg.size())] = '\0'; - } - } - enum AsapoConsumerErrorType asapo_error_get_type(const AsapoErrorHandle error) { auto consumer_err = dynamic_cast<const asapo::ServiceError<asapo::ConsumerErrorType, @@ -212,6 +101,7 @@ extern "C" { } } + //! creata a consumer /// \copydoc asapo::ConsumerFactory::CreateConsumer /// \param[out] error NULL or pointer to error handle to be set @@ -228,8 +118,7 @@ extern "C" { has_filesysytem, *(source->handle), &err); - auto retval = new AsapoHandlerHolder<asapo::Consumer>(c.release()); - return static_cast<AsapoConsumerHandle>(handle_or_null(retval, error, std::move(err))); + return handle_or_null_t(c, error, std::move(err)); } //! wraps asapo::Consumer::GenerateNewGroupId() @@ -240,32 +129,12 @@ extern "C" { AsapoStringHandle asapo_consumer_generate_new_group_id(AsapoConsumerHandle consumer, AsapoErrorHandle* error) { asapo::Error err; - auto result = new std::string(consumer->handle->GenerateNewGroupId(&err)); - auto retval = new AsapoHandlerHolder<std::string> {result}; - return static_cast<AsapoStringHandle>(handle_or_null(retval, error, std::move(err))); + auto result = consumer->handle->GenerateNewGroupId(&err); + return handle_or_null_t(result, error, std::move(err)); } -//! creates AsapoStringHandle from C-String -/// \param[in] str C-string -/// \return AsapoStringHandle - AsapoStringHandle asapo_string_from_c_str(const char* str) { - auto retval = new AsapoHandlerHolder<std::string> {new std::string{str}}; - return static_cast<AsapoStringHandle>(retval); - } -//! give a pointer to the content of the asapoString -/// \param[in] str the handle of the asapoString in question -/// \return const char pointer to the content - const char* asapo_string_c_str(const AsapoStringHandle str) { - return str->handle->c_str(); - } -//! give the size of an asapoString -/// \param[in] str the handle of the asapoString in question -/// \return the number of bytes in the string , not counting the final nul byte. - size_t asapo_string_size(const AsapoStringHandle str) { - return str->handle->size(); - } //! wraps asapo::Consumer::SetTimeout() /// \copydoc asapo::Consumer::SetTimeout() @@ -344,8 +213,7 @@ extern "C" { from_id, to_id, stream, &err)); - auto retval = new AsapoHandlerHolder<asapo::IdList> {list}; - return static_cast<AsapoIdListHandle>(handle_or_null(retval, error, std::move(err))); + return handle_or_null_t(list, error, std::move(err)); } //! give number of items in an id list @@ -367,7 +235,9 @@ extern "C" { //! wraps asapo::Consumer::CurrentConnectionType() /// \copydoc asapo::Consumer::CurrentConnectionType() /// \param[in] consumer the handle of the consumer concerned - enum AsapoNetworkConnectionType asapo_consumer_current_connection_type(AsapoConsumerHandle consumer); + enum AsapoNetworkConnectionType asapo_consumer_current_connection_type(AsapoConsumerHandle consumer) { + return static_cast<AsapoNetworkConnectionType>(consumer->handle->CurrentConnectionType()); + } //! get list of streams, wraps asapo::Consumer::GetStreamList() @@ -387,25 +257,9 @@ extern "C" { auto info = new asapo::StreamInfos(consumer->handle->GetStreamList(from, static_cast<asapo::StreamFilter>(filter), &err)); - auto retval = new AsapoHandlerHolder<asapo::StreamInfos> {info}; - return static_cast<AsapoStreamInfosHandle>(handle_or_null(retval, error, std::move(err))); + return handle_or_null_t(info, error, std::move(err)); } -//! get one stream info from a stream infos handle -/// \param[in] infos handle for stream infos -/// \param[in] index index od info to get, starts at 0 -/// \return handle to stream info - AsapoStreamInfoHandle asapo_stream_infos_get_item(const AsapoStreamInfosHandle infos, - size_t index) { - return new AsapoHandlerHolder<asapo::StreamInfo> {&(infos->handle->at(index)), false}; - } - -//! get size (number of elements) of a stream infos handle -/// \param[in] infos handle for stream infos -/// \return number of elements in the handle - size_t asapo_stream_infos_get_size(const AsapoStreamInfosHandle infos) { - return infos->handle->size(); - } //! wraps asapo::Consumer::DeleteStream() /// \copydoc asapo::Consumer::DeleteStream() @@ -460,9 +314,8 @@ extern "C" { AsapoStringHandle asapo_consumer_get_beamtime_meta(AsapoConsumerHandle consumer, AsapoErrorHandle* error) { asapo::Error err; - auto result = new std::string(consumer->handle->GetBeamtimeMeta(&err)); - auto retval = new AsapoHandlerHolder<std::string> {result}; - return static_cast<AsapoStringHandle>(handle_or_null(retval, error, std::move(err))); + auto result = consumer->handle->GetBeamtimeMeta(&err); + return handle_or_null_t(result, error, std::move(err)); } //! wraps asapo::Consumer::RetrieveData() @@ -495,9 +348,8 @@ extern "C" { AsapoErrorHandle* error) { asapo::Error err; auto result = new asapo::DataSet(consumer->handle->GetNextDataset(*group_id->handle, min_size, stream, &err)); - auto retval = new AsapoHandlerHolder<asapo::DataSet> {result}; - return static_cast<AsapoDataSetHandle>(handle_or_null(retval, error, std::move(err), - &asapo::ConsumerErrorTemplates::kPartialData)); + return handle_or_null_t(result, error, std::move(err), + &asapo::ConsumerErrorTemplates::kPartialData); } //! wraps asapo::Consumer::GetLastDataset() @@ -510,9 +362,8 @@ extern "C" { AsapoErrorHandle* error) { asapo::Error err; auto result = new asapo::DataSet(consumer->handle->GetLastDataset(min_size, stream, &err)); - auto retval = new AsapoHandlerHolder<asapo::DataSet> {result}; - return static_cast<AsapoDataSetHandle>(handle_or_null(retval, error, std::move(err), - &asapo::ConsumerErrorTemplates::kPartialData)); + return handle_or_null_t(result, error, std::move(err), + &asapo::ConsumerErrorTemplates::kPartialData); } //! wraps asapo::Consumer::GetLastAcknowledgedMessage() @@ -542,9 +393,8 @@ extern "C" { AsapoErrorHandle* error) { asapo::Error err; auto result = new asapo::DataSet(consumer->handle->GetDatasetById(id, min_size, stream, &err)); - auto retval = new AsapoHandlerHolder<asapo::DataSet> {result}; - return static_cast<AsapoDataSetHandle>(handle_or_null(retval, error, std::move(err), - &asapo::ConsumerErrorTemplates::kPartialData)); + return handle_or_null_t(result, error, std::move(err), + &asapo::ConsumerErrorTemplates::kPartialData); } //! wraps aspao::Consumer::GetById() @@ -605,8 +455,7 @@ extern "C" { AsapoErrorHandle* error) { asapo::Error err; auto result = new asapo::MessageMetas(consumer->handle->QueryMessages(query, stream, &err)); - auto retval = new AsapoHandlerHolder<asapo::MessageMetas> {result}; - return static_cast<AsapoMessageMetasHandle>(handle_or_null(retval, error, std::move(err))); + return handle_or_null_t(result, error, std::move(err)); } //! wraps aspao::Consumer::SetResendNacs() @@ -619,25 +468,6 @@ extern "C" { consumer->handle->SetResendNacs(resend, delay_ms, resend_attempts); } -//! give acess to data -/// \param[in] data the handle of the data -/// \return const char pointer to the data blob, valid until deletion or reuse of data - const char* asapo_message_data_get_as_chars(const AsapoMessageDataHandle data) { - return reinterpret_cast<const char*>(data->handle.get()); - } - -//! wraps asapo::SourceCredentials::SourceCredentials() -/// \copydoc asapo::SourceCredentials::SourceCredentials() - AsapoSourceCredentialsHandle asapo_create_source_credentials(enum AsapoSourceType type, - const char* beamtime, - const char* beamline, - const char* data_source, - const char* token) { - auto retval = new asapo::SourceCredentials(static_cast<asapo::SourceType>(type), - beamtime, beamline, - data_source, token); - return new AsapoHandlerHolder<asapo::SourceCredentials> {retval}; - } //! get name from the metadata object /// \param[in] md handle of the metadata object @@ -698,50 +528,6 @@ extern "C" { return md->handle->dataset_substream; } -//! get last id from the stream info object -/// \param[in] info handle of the stream info object -/// \return last id -/// \sa asapo::StreamInfo - uint64_t asapo_stream_info_get_last_id(const AsapoStreamInfoHandle info) { - return info->handle->last_id; - } -//! get stream name from the stream info object -/// \param[in] info handle of the stream info object -/// \return pointer to the name string, valid until asapoStreamInfos object is deleted -/// \sa asapo::StreamInfo - const char* asapo_stream_info_get_name(const AsapoStreamInfoHandle info) { - return info->handle->name.c_str(); - } -//! get finished state from the stream info object -/// \param[in] info handle of the stream info object -/// \return finised state, 0 = false -/// \sa asapo::StreamInfo - AsapoBool asapo_stream_info_get_ffinished(const AsapoStreamInfoHandle info) { - return info->handle->finished; - } -//! get next stream name? from the stream info object -/// \param[in] info handle of the stream info object -/// \return pointer to the name string, valid until asapoStreamInfos object is deleted -/// \sa asapo::StreamInfo - const char* asapo_stream_info_get_next_stream(const AsapoStreamInfoHandle info) { - return info->handle->next_stream.c_str(); - } -//! get creation time from the stream info object -/// \param[in] info handle of the stream info object -/// \param[out] stamp creation timestamp as timespec -/// \sa asapo::StreamInfo - void asapo_stream_info_get_timestamp_created(const AsapoStreamInfoHandle info, - struct timespec* stamp) { - time_point_to_time_spec(info->handle->timestamp_created, stamp); - } -//! get time of last entry from the stream info object -/// \param[in] info handle of the stream info object -/// \param[out] stamp last entry timestamp as timespec -/// \sa asapo::StreamInfo - void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfoHandle info, - struct timespec* stamp) { - time_point_to_time_spec(info->handle->timestamp_lastentry, stamp); - } //! get id from data set object /// \param[in] set handle of the data set object @@ -858,21 +644,4 @@ extern "C" { -//! free handle memory, set handle to NULL -/// \param[in] pointer to an ASAPO handle - void asapo_free_handle(void** handle) { - if (*handle == nullptr) { - return; - } - auto a_handle = static_cast<AsapoHandle*>(*handle); - delete a_handle; - *handle = nullptr; - } - -//! creates a new ASAPO handle -/// \return initialized ASAPO handle - void* asapo_new_handle() { - return NULL; - } - } diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/consume.cpp b/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/consume.cpp index ce4d34f8a00d2686cde5fdaee8f7220b8d45d89b..f9124fa9e3e3f3e2e3ca9b94f66c7bb34050ca9e 100644 --- a/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/consume.cpp +++ b/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/consume.cpp @@ -14,11 +14,13 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; // // or your endpoint auto beamtime = "asapo_test"; - auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; + auto token = + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; - auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; //set it according to your configuration. + auto path_to_files = + "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; //set it according to your configuration. - auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, asapo::SourceCredentials{asapo::SourceType::kProcessed,beamtime, "", "test_source", token}, &err); + auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", token}, &err); exit_if_error("Cannot create consumer", err); consumer->SetTimeout((uint64_t) 5000); @@ -27,7 +29,7 @@ int main(int argc, char* argv[]) { asapo::MessageMeta mm; asapo::MessageData data; - err = consumer->GetNext(group_id, &mm, &data,"default"); + err = consumer->GetNext(group_id, &mm, &data, "default"); exit_if_error("Cannot get next record", err); std::cout << "id: " << mm.id << std::endl; diff --git a/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/produce.cpp b/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/produce.cpp index b8261ace167f28cb807ebbb4b81696cd3554cd75..0a2cf248698597108450760116cc169f7ab5287c 100644 --- a/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/produce.cpp +++ b/docs/site/examples/frozen_versions/21.06.0/getting_started/cpp/produce.cpp @@ -25,14 +25,14 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; // or your endpoint auto beamtime = "asapo_test"; - auto producer = asapo::Producer::Create(endpoint, 1,asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{asapo::SourceType::kProcessed,beamtime, "", "test_source", ""}, 60000, &err); + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, + asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", ""}, 60000, &err); exit_if_error("Cannot start producer", err); std::string to_send = "hello"; auto send_size = to_send.size() + 1; auto buffer = asapo::MessageData(new uint8_t[send_size]); - memcpy(buffer.get(),to_send.c_str(),send_size); + memcpy(buffer.get(), to_send.c_str(), send_size); asapo::MessageHeader message_header{1, send_size, "processed/test_file"}; err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); diff --git a/docs/site/examples/getting_started/cpp/consume.cpp b/docs/site/examples/getting_started/cpp/consume.cpp index ce4d34f8a00d2686cde5fdaee8f7220b8d45d89b..f9124fa9e3e3f3e2e3ca9b94f66c7bb34050ca9e 100644 --- a/docs/site/examples/getting_started/cpp/consume.cpp +++ b/docs/site/examples/getting_started/cpp/consume.cpp @@ -14,11 +14,13 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; // // or your endpoint auto beamtime = "asapo_test"; - auto token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; + auto token = + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjk1NzE3MTAyMTYsImp0aSI6ImMzaXFhbGpmNDNhbGZwOHJua20wIiwic3ViIjoiYnRfYXNhcG9fdGVzdCIsIkV4dHJhQ2xhaW1zIjp7IkFjY2Vzc1R5cGVzIjpbIndyaXRlIiwicmVhZCJdfX0.dkWupPO-ysI4t-jtWiaElAzDyJF6T7hu_Wz_Au54mYU"; - auto path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; //set it according to your configuration. + auto path_to_files = + "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; //set it according to your configuration. - auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, asapo::SourceCredentials{asapo::SourceType::kProcessed,beamtime, "", "test_source", token}, &err); + auto consumer = asapo::ConsumerFactory::CreateConsumer(endpoint, path_to_files, true, asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", token}, &err); exit_if_error("Cannot create consumer", err); consumer->SetTimeout((uint64_t) 5000); @@ -27,7 +29,7 @@ int main(int argc, char* argv[]) { asapo::MessageMeta mm; asapo::MessageData data; - err = consumer->GetNext(group_id, &mm, &data,"default"); + err = consumer->GetNext(group_id, &mm, &data, "default"); exit_if_error("Cannot get next record", err); std::cout << "id: " << mm.id << std::endl; diff --git a/docs/site/examples/getting_started/cpp/produce.cpp b/docs/site/examples/getting_started/cpp/produce.cpp index b8261ace167f28cb807ebbb4b81696cd3554cd75..0a2cf248698597108450760116cc169f7ab5287c 100644 --- a/docs/site/examples/getting_started/cpp/produce.cpp +++ b/docs/site/examples/getting_started/cpp/produce.cpp @@ -25,14 +25,14 @@ int main(int argc, char* argv[]) { auto endpoint = "localhost:8400"; // or your endpoint auto beamtime = "asapo_test"; - auto producer = asapo::Producer::Create(endpoint, 1,asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{asapo::SourceType::kProcessed,beamtime, "", "test_source", ""}, 60000, &err); + auto producer = asapo::Producer::Create(endpoint, 1, asapo::RequestHandlerType::kTcp, + asapo::SourceCredentials{asapo::SourceType::kProcessed, beamtime, "", "test_source", ""}, 60000, &err); exit_if_error("Cannot start producer", err); std::string to_send = "hello"; auto send_size = to_send.size() + 1; auto buffer = asapo::MessageData(new uint8_t[send_size]); - memcpy(buffer.get(),to_send.c_str(),send_size); + memcpy(buffer.get(), to_send.c_str(), send_size); asapo::MessageHeader message_header{1, send_size, "processed/test_file"}; err = producer->Send(message_header, std::move(buffer), asapo::kDefaultIngestMode, "default", &ProcessAfterSend); diff --git a/examples/consumer/CMakeLists.txt b/examples/consumer/CMakeLists.txt index 0cf72e439d5fd58b000b3fd78c1a9fe0d98c760d..3d5359f708d9f0ed2a0a64f8edd5c207be82b261 100644 --- a/examples/consumer/CMakeLists.txt +++ b/examples/consumer/CMakeLists.txt @@ -1,7 +1,6 @@ find_package(Threads) add_subdirectory(getnext) -add_subdirectory(simple-consumer-c) if(BUILD_EXAMPLES AND BUILD_PYTHON) add_subdirectory(getnext_python) diff --git a/examples/consumer/simple-consumer-c/CMakeLists.txt b/examples/consumer/simple-consumer-c/CMakeLists.txt deleted file mode 100644 index cdecb5b90dbe60461e808e940007a6ffea48ee26..0000000000000000000000000000000000000000 --- a/examples/consumer/simple-consumer-c/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -cmake_minimum_required(VERSION 2.8) - -set(TARGET_NAME "asapo-consume-c") - -set(SOURCE_FILES consume.c) - -add_executable(${TARGET_NAME} ${SOURCE_FILES}) -target_link_libraries(${TARGET_NAME} asapo-consumer ${CMAKE_THREAD_LIBS_INIT}) diff --git a/examples/consumer/simple-consumer-c/consume.c b/examples/consumer/simple-consumer-c/consume.c deleted file mode 100644 index 6894c4a425fb1542363e666c6854915095d5c5ea..0000000000000000000000000000000000000000 --- a/examples/consumer/simple-consumer-c/consume.c +++ /dev/null @@ -1,54 +0,0 @@ -#include "asapo/consumer_c.h" - -#include <stdio.h> -#include <stdlib.h> - -void exit_if_error(const char *error_string, const AsapoErrorHandle err) { - if (err) { - char buf[1024]; - asapo_error_explain(err, buf, sizeof(buf)); - printf("%s %s\n", error_string, buf); - exit(EXIT_FAILURE); - } -} - -int main() { - AsapoErrorHandle err = asapo_new_handle(); - AsapoMessageMetaHandle mm = asapo_new_handle(); - AsapoMessageDataHandle data = asapo_new_handle(); - - const char *endpoint = "localhost:8400"; - const char *beamtime = "asapo_test"; - const char *token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJqdGkiOiJjMWY2OG0ydWlkZDE3dWxmaDN1ZyIsInN1YiI6ImJ0X2FzYXBvX3Rlc3QiLCJFeHRyYUNsYWltcyI6eyJBY2Nlc3NUeXBlcyI6WyJyZWFkIl19fQ.zo7ZDfY2sf4o9RYuXpxNR9kHLG594xr-SE5yLoyDC2Q"; - - const char * path_to_files = "/var/tmp/asapo/global_shared/data/test_facility/gpfs/test/2019/data/asapo_test"; //set it according to your configuration. - - AsapoSourceCredentialsHandle cred = asapo_create_source_credentials(kProcessed, - beamtime, - "", "", token); - AsapoConsumerHandle consumer = asapo_create_consumer(endpoint, - path_to_files, 1, - cred, - &err); - asapo_free_handle(&cred); - - exit_if_error("Cannot create consumer", err); - asapo_consumer_set_timeout(consumer, 1000ull); - - AsapoStringHandle group_id = asapo_consumer_generate_new_group_id(consumer, &err); - exit_if_error("Cannot create group id", err); - - asapo_consumer_get_next(consumer, group_id, &mm, &data, "default",&err); - exit_if_error("Cannot get next record", err); - - printf("id: %llu\n", (unsigned long long)asapo_message_meta_get_id(mm)); - printf("file name: %s\n", asapo_message_meta_get_name(mm)); - printf("file content: %s\n", asapo_message_data_get_as_chars(data)); - - asapo_free_handle(&mm); - asapo_free_handle(&data); - asapo_free_handle(&consumer); - asapo_free_handle(&group_id); - - return EXIT_SUCCESS; -} diff --git a/producer/CMakeLists.txt b/producer/CMakeLists.txt index 13d54a0fce3aff19cadcce3abb9214ec6d0774b5..6ca45fe5a2b9006083542b51002f57dd07f7f7f2 100644 --- a/producer/CMakeLists.txt +++ b/producer/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(api/cpp) +add_subdirectory(api/c) if (BUILD_PYTHON) add_subdirectory(api/python) diff --git a/producer/api/c/CMakeLists.txt b/producer/api/c/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..3aa21699d5633845cea87bd64fcc67cfb1c824cd --- /dev/null +++ b/producer/api/c/CMakeLists.txt @@ -0,0 +1 @@ +install(DIRECTORY include/ DESTINATION include) diff --git a/producer/api/c/include/asapo/producer_c.h b/producer/api/c/include/asapo/producer_c.h new file mode 100644 index 0000000000000000000000000000000000000000..ec90bdcb771017debb0241d10e3a9c971ddb6039 --- /dev/null +++ b/producer/api/c/include/asapo/producer_c.h @@ -0,0 +1,174 @@ +#ifndef __PRODUCER_C_H__ +#define __PRODUCER_C_H__ +#include "asapo/common/common_c.h" +#ifndef __PRODUCER_C_INTERFACE_IMPLEMENTATION__ + +typedef struct { + char _[AsapoHandleSize]; +}* AsapoProducerHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoRequestCallbackPayloadHandle; +typedef struct { + char _[AsapoHandleSize]; +}* AsapoMessageHeaderHandle; +#endif + +typedef void(*AsapoRequestCallback)(void*, AsapoRequestCallbackPayloadHandle, AsapoErrorHandle); +#define kMaxMessageSize 1024 +#define kMaxVersionSize 10 +#define kNCustomParams 3 + +//! c version opf asapo::Opcode +enum AsapoOpcode { + kOpcodeUnknownOp = 1, + kOpcodeTransferData, + kOpcodeTransferDatasetData, + kOpcodeStreamInfo, + kOpcodeLastStream, + kOpcodeGetBufferData, + kOpcodeAuthorize, + kOpcodeTransferMetaData, + kOpcodeDeleteStream, + kOpcodeGetMeta, + kOpcodeCount +}; + + +//! c version of asapo::GenericRequestHeader +struct AsapoGenericRequestHeader { + enum AsapoOpcode op_code; + uint64_t data_id; + uint64_t data_size; + uint64_t meta_size; + uint64_t custom_data[kNCustomParams]; + char message[kMaxMessageSize]; /* Can also be a binary message (e.g. MemoryRegionDetails) */ + char stream[kMaxMessageSize]; /* Must be a string (strcpy is used) */ + char api_version[kMaxVersionSize]; /* Must be a string (strcpy is used) */ +}; + +//! c version of asapo::RequestHandlerType +enum AsapoRequestHandlerType { + kTcp, + kFilesystem +}; + +//! c version of asapo::IngestModeFlags +enum AsapoIngestModeFlags { + kTransferData = 1 << 0, + kTransferMetaDataOnly = 1 << 1, + kStoreInFilesystem = 1 << 2, + kStoreInDatabase = 1 << 3, +}; + +const uint64_t kDefaultIngestMode = kTransferData | kStoreInFilesystem | kStoreInDatabase; + + +//! c version of asapo::MetaIngestOp +enum AsapoMetaIngestOp { + kInsert = 1, + kReplace = 2, + kUpdate = 3 +}; + +//! c version of asapo::LogLevel +enum AsapoLogLevel { + None, + Error, + Info, + Debug, + Warning +}; + + + +AsapoProducerHandle asapo_create_producer(const char* endpoint, + uint8_t n_processing_threads, + enum AsapoRequestHandlerType type, + AsapoSourceCredentialsHandle source_cred, + uint64_t timeout_ms, + AsapoErrorHandle* error); +int asapo_producer_get_version_info(AsapoProducerHandle producer, + AsapoStringHandle client_info, + AsapoStringHandle server_info, + AsapoBool* supported, + AsapoErrorHandle* error); +AsapoStreamInfoHandle asapo_producer_get_stream_info(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error); +AsapoStringHandle asapo_producer_get_stream_meta(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error); +AsapoStringHandle asapo_producer_get_beamtime_meta(AsapoProducerHandle producer, + uint64_t timeout_ms, + AsapoErrorHandle* error); +int asapo_producer_delete_stream(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoBool delete_meta, + AsapoBool error_on_not_exist, + AsapoErrorHandle* error); +AsapoStreamInfoHandle asapo_producer_get_last_stream(AsapoProducerHandle producer, + uint64_t timeout_ms, + AsapoErrorHandle* error); + +AsapoMessageHeaderHandle asapo_create_message_header(uint64_t message_id, + uint64_t data_size, + const char* file_name, + const char* user_metadata, + uint64_t dataset_substream, + uint64_t dataset_size, + AsapoBool auto_id); + +int asapo_producer_send(AsapoProducerHandle producer, + const AsapoMessageHeaderHandle message_header, + void* data, + uint64_t ingest_mode, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); +int asapo_producer_send_file(AsapoProducerHandle producer, + const AsapoMessageHeaderHandle message_header, + const char* file_name, + uint64_t ingest_mode, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); +int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer, + const char* stream, + uint64_t last_id, + const char* next_stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); +int asapo_producer_send_beamtime_metadata(AsapoProducerHandle producer, + const char* metadata, + enum AsapoMetaIngestOp mode, + AsapoBool upsert, + AsapoRequestCallback callback, + AsapoErrorHandle* error); +int asapo_producer_send_stream_metadata(AsapoProducerHandle producer, + const char* metadata, + enum AsapoMetaIngestOp mode, + AsapoBool upsert, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error); + +AsapoStringHandle asapo_request_callback_payload_get_response(AsapoRequestCallbackPayloadHandle handle); +const struct AsapoGenericRequestHeader* asapo_request_callback_payload_get_original_header( + AsapoRequestCallbackPayloadHandle handle); + +void asapo_producer_set_log_level(AsapoProducerHandle producer, enum AsapoLogLevel level); +void asapo_producer_enable_local_log(AsapoProducerHandle producer, AsapoBool enable); +void asapo_producer_enable_remote_log(AsapoProducerHandle producer, AsapoBool enable); +int asapo_producer_set_credentials(AsapoProducerHandle producer, AsapoSourceCredentialsHandle source_cred, + AsapoErrorHandle* error); +uint64_t asapo_producer_get_requests_queue_size(AsapoProducerHandle producer); +uint64_t asapo_producer_get_requests_queue_volume_mb(AsapoProducerHandle producer); +void asapo_producer_set_requests_queue_limits(AsapoProducerHandle producer, uint64_t size, uint64_t volume); +int asapo_producer_wait_requests_finished(AsapoProducerHandle producer, uint64_t timeout_ms, + AsapoErrorHandle* error); + +#endif diff --git a/producer/api/cpp/CMakeLists.txt b/producer/api/cpp/CMakeLists.txt index ecd38c19ee42ca19f64b022161d6320b17833fdf..7701657c2785e9a31bc8ec7b07512027adaf2ac5 100644 --- a/producer/api/cpp/CMakeLists.txt +++ b/producer/api/cpp/CMakeLists.txt @@ -7,7 +7,8 @@ set(SOURCE_FILES src/request_handler_filesystem.cpp src/receiver_discovery_service.cpp src/receiver_data_server_request_handler_factory.cpp - src/producer_request.cpp) + src/producer_request.cpp + src/producer_c_glue.cpp) string(REPLACE "v" "" PRODUCER_CLIENT_VER ${ASAPO_PRODUCER_PROTOCOL}) @@ -17,13 +18,14 @@ string(REPLACE "v" "" PRODUCER_CLIENT_VER ${ASAPO_PRODUCER_PROTOCOL}) # Library ################################ add_library(producer_lib_objects OBJECT ${SOURCE_FILES}) -target_include_directories(producer_lib_objects PRIVATE include ${ASAPO_CXX_COMMON_INCLUDE_DIR} ) +target_include_directories(producer_lib_objects PRIVATE ../c/include include ${ASAPO_CXX_COMMON_INCLUDE_DIR} ) target_include_directories(producer_lib_objects SYSTEM PRIVATE ${LIBFABRIC_INCLUDE_DIR}) if (BUILD_STATIC_CLIENT_LIBS) add_library(${TARGET_NAME} STATIC $<TARGET_OBJECTS:producer_lib_objects> $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:json_parser> - $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version>) + $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:common>) target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>) + target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../c/include>) target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${ASAPO_CXX_COMMON_INCLUDE_DIR}>) target_include_directories(${TARGET_NAME} INTERFACE $<INSTALL_INTERFACE:include>) target_link_libraries(${TARGET_NAME} CURL::libcurl ${CMAKE_THREAD_LIBS_INIT}) @@ -47,7 +49,7 @@ endif() if (BUILD_SHARED_CLIENT_LIBS) add_library(${TARGET_NAME}_shared SHARED $<TARGET_OBJECTS:producer_lib_objects> $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:json_parser> - $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version>) + $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool> $<TARGET_OBJECTS:data_structs> $<TARGET_OBJECTS:version> $<TARGET_OBJECTS:common>) target_include_directories(${TARGET_NAME}_shared PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include ${ASAPO_CXX_COMMON_INCLUDE_DIR}) target_link_libraries(${TARGET_NAME}_shared CURL::libcurl ${CMAKE_THREAD_LIBS_INIT}) diff --git a/producer/api/cpp/src/producer_c_glue.cpp b/producer/api/cpp/src/producer_c_glue.cpp new file mode 100644 index 0000000000000000000000000000000000000000..40920cb4a583a94de9592ce7ef7f6f5f9ba3fe64 --- /dev/null +++ b/producer/api/cpp/src/producer_c_glue.cpp @@ -0,0 +1,272 @@ +#define __PRODUCER_C_INTERFACE_IMPLEMENTATION__ +#include "asapo/common/internal/asapo_common_c_glue.h" +#include "asapo/asapo_producer.h" +#include <cstddef> + +//! handle for an asapo producer +/// created by asapo_create_producer() +/// free after use with asapo_free_handle() +/// all operations are done with asapo_producer_xxx() functions +/// \sa asapo::Producer +typedef AsapoHandlerHolder<asapo::Producer>* AsapoProducerHandle; +typedef AsapoHandlerHolder<asapo::RequestCallbackPayload>* AsapoRequestCallbackPayloadHandle; +typedef AsapoHandlerHolder<asapo::MessageHeader>* AsapoMessageHeaderHandle; + + + +extern "C" { +#include "asapo/producer_c.h" + static_assert(kOpcodeUnknownOp == asapo::Opcode::kOpcodeUnknownOp&& + kOpcodeTransferData == asapo::Opcode::kOpcodeTransferData&& + kOpcodeTransferDatasetData == asapo::Opcode::kOpcodeTransferDatasetData&& + kOpcodeStreamInfo == asapo::Opcode::kOpcodeStreamInfo&& + kOpcodeLastStream == asapo::Opcode::kOpcodeLastStream&& + kOpcodeGetBufferData == asapo::Opcode::kOpcodeGetBufferData&& + kOpcodeAuthorize == asapo::Opcode::kOpcodeAuthorize&& + kOpcodeTransferMetaData == asapo::Opcode::kOpcodeTransferMetaData&& + kOpcodeDeleteStream == asapo::Opcode::kOpcodeDeleteStream&& + kOpcodeGetMeta == asapo::Opcode::kOpcodeGetMeta&& + kOpcodeCount == asapo::Opcode::kOpcodeCount, + "incompatible bit reps between c++ and c for asapo::OpCode"); + static_assert(kTcp == asapo::RequestHandlerType:: kTcp&& + kFilesystem == asapo::RequestHandlerType:: kFilesystem, + "incompatible bit reps between c++ and c for asapo::RequestHandlerType"); + + static_assert(kInsert == asapo::MetaIngestOp::kInsert&& + kReplace == asapo::MetaIngestOp::kReplace&& + kUpdate == asapo::MetaIngestOp::kUpdate, + "incompatible bit reps between c++ and c for asapo::MetaIngestOp"); + + static_assert(None == asapo::LogLevel::None&& + Error == asapo::LogLevel::Error&& + Info == asapo::LogLevel::Info&& + Debug == asapo::LogLevel::Debug&& + Warning == asapo::LogLevel::Warning, + "incompatible bit reps between c++ and c for asapo::LogLevel"); + static_assert(sizeof(struct AsapoGenericRequestHeader) == sizeof(asapo::GenericRequestHeader), + "incompatible sizes for asapo::GenericRequestHeader"); + + static_assert(offsetof(AsapoGenericRequestHeader, op_code) == offsetof(asapo::GenericRequestHeader, op_code)&& + offsetof(AsapoGenericRequestHeader, data_id) == offsetof(asapo::GenericRequestHeader, data_id)&& + offsetof(AsapoGenericRequestHeader, data_size) == offsetof(asapo::GenericRequestHeader, data_size)&& + offsetof(AsapoGenericRequestHeader, meta_size) == offsetof(asapo::GenericRequestHeader, meta_size)&& + offsetof(AsapoGenericRequestHeader, custom_data) == offsetof(asapo::GenericRequestHeader, custom_data)&& + offsetof(AsapoGenericRequestHeader, message) == offsetof(asapo::GenericRequestHeader, message)&& + offsetof(AsapoGenericRequestHeader, stream) == offsetof(asapo::GenericRequestHeader, stream)&& + offsetof(AsapoGenericRequestHeader, api_version) == offsetof(asapo::GenericRequestHeader, api_version), + "incompatible field offsets for asapo::GenericRequestHeader"); + + AsapoProducerHandle asapo_create_producer(const char* endpoint, + uint8_t n_processing_threads, + AsapoRequestHandlerType type, + AsapoSourceCredentialsHandle source_cred, + uint64_t timeout_ms, + AsapoErrorHandle* error) { + asapo::Error err; + auto c = asapo::Producer::Create(endpoint, + n_processing_threads, + static_cast<asapo::RequestHandlerType>(type), + *(source_cred->handle), + timeout_ms, + &err); + return handle_or_null_t(c.release(), error, std::move(err)); + + } + int asapo_producer_get_version_info(AsapoProducerHandle producer, + AsapoStringHandle client_info, + AsapoStringHandle server_info, + AsapoBool* supported, + AsapoErrorHandle* error) { + bool supp; + auto err = producer->handle->GetVersionInfo(client_info->handle.get(), + server_info->handle.get(), + &supp); + *supported = supp; + return process_error(error, std::move(err)); + } + + AsapoStreamInfoHandle asapo_producer_get_stream_info(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error) { + asapo::Error err; + auto result = new asapo::StreamInfo(producer->handle->GetStreamInfo(stream, timeout_ms, &err)); + return handle_or_null_t(result, error, std::move(err)); + } + AsapoStringHandle asapo_producer_get_stream_meta(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoErrorHandle* error) { + asapo::Error err; + auto result = producer->handle->GetStreamMeta(stream, timeout_ms, &err); + return handle_or_null_t(result, error, std::move(err)); + + } + AsapoStringHandle asapo_producer_get_beamtime_meta(AsapoProducerHandle producer, + uint64_t timeout_ms, + AsapoErrorHandle* error) { + asapo::Error err; + auto result = producer->handle->GetBeamtimeMeta(timeout_ms, &err); + return handle_or_null_t(result, error, std::move(err)); + } + int asapo_producer_delete_stream(AsapoProducerHandle producer, + const char* stream, + uint64_t timeout_ms, + AsapoBool delete_meta, + AsapoBool error_on_not_exist, + AsapoErrorHandle* error) { + auto err = producer->handle->DeleteStream(stream, timeout_ms, + asapo::DeleteStreamOptions{static_cast<bool>(delete_meta), + static_cast<bool>(error_on_not_exist)}); + return process_error(error, std::move(err)); + } + AsapoStreamInfoHandle asapo_producer_get_last_stream(AsapoProducerHandle producer, + uint64_t timeout_ms, + AsapoErrorHandle* error) { + asapo::Error err; + auto result = new asapo::StreamInfo(producer->handle->GetLastStream(timeout_ms, &err)); + return handle_or_null_t(result, error, std::move(err)); + } + + AsapoMessageHeaderHandle asapo_create_message_header(uint64_t message_id, + uint64_t data_size, + const char* file_name, + const char* user_metadata, + uint64_t dataset_substream, + uint64_t dataset_size, + AsapoBool auto_id) { + return new AsapoHandlerHolder<asapo::MessageHeader>(new asapo::MessageHeader(message_id, + data_size, + file_name, + user_metadata, + dataset_substream, + dataset_size, + auto_id != 0)); + } + + extern "C++" asapo::RequestCallback GetWrapper(AsapoRequestCallback callback, bool release_data) { + return [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void { + void* data = release_data ? (void*) payload.data.release() : NULL; + auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload, false); + auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>(err.release()); + if (callback != NULL) { + callback(data, payLoadHandle, errorHandle); + } + delete errorHandle; + delete payLoadHandle; + }; + } + + int asapo_producer_send(AsapoProducerHandle producer, + const AsapoMessageHeaderHandle message_header, + void* data, + uint64_t ingest_mode, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error) { + auto wrapper = GetWrapper(callback, true); + auto err = producer->handle->Send__(*message_header->handle, + data, + ingest_mode, + stream, + wrapper); + return process_error(error, std::move(err)); + } + int asapo_producer_send_file(AsapoProducerHandle producer, + const AsapoMessageHeaderHandle message_header, + const char* file_name, + uint64_t ingest_mode, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error) { + auto wrapper = GetWrapper(callback, true); + auto err = producer->handle->SendFile(*message_header->handle, + file_name, + ingest_mode, + stream, + wrapper); + return process_error(error, std::move(err)); + } + int asapo_producer_send_stream_finished_flag(AsapoProducerHandle producer, + const char* stream, + uint64_t last_id, + const char* next_stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error) { + auto wrapper = GetWrapper(callback, false); + auto err = producer->handle->SendStreamFinishedFlag(stream, + last_id, + next_stream, + wrapper); + return process_error(error, std::move(err)); + } + int asapo_producer_send_beamtime_metadata(AsapoProducerHandle producer, + const char* metadata, + AsapoMetaIngestOp mode, + AsapoBool upsert, + AsapoRequestCallback callback, + AsapoErrorHandle* error) { + auto wrapper = GetWrapper(callback, false); + asapo::MetaIngestMode im(static_cast<asapo::MetaIngestOp>(mode), upsert != 0); + auto err = producer->handle->SendBeamtimeMetadata(metadata, + im, + wrapper); + return process_error(error, std::move(err)); + + } + int asapo_producer_send_stream_metadata(AsapoProducerHandle producer, + const char* metadata, + AsapoMetaIngestOp mode, + AsapoBool upsert, + const char* stream, + AsapoRequestCallback callback, + AsapoErrorHandle* error) { + auto wrapper = GetWrapper(callback, false); + asapo::MetaIngestMode im(static_cast<asapo::MetaIngestOp>(mode), upsert != 0); + auto err = producer->handle->SendStreamMetadata(metadata, + im, + stream, + wrapper); + return process_error(error, std::move(err)); + + } + AsapoStringHandle asapo_request_callback_payload_get_response(AsapoRequestCallbackPayloadHandle handle) { + return new typename std::remove_pointer<AsapoStringHandle>::type(handle->handle->response); + } + const AsapoGenericRequestHeader* asapo_request_callback_payload_get_original_header( + AsapoRequestCallbackPayloadHandle handle) { + return reinterpret_cast<AsapoGenericRequestHeader*>(&handle->handle->original_header); + } + + void asapo_producer_set_log_level(AsapoProducerHandle producer, AsapoLogLevel level) { + producer->handle->SetLogLevel(static_cast<asapo::LogLevel>(level)); + } + void asapo_producer_enable_local_log(AsapoProducerHandle producer, AsapoBool enable) { + producer->handle->EnableLocalLog(enable != 0); + } + void asapo_producer_enable_remote_log(AsapoProducerHandle producer, AsapoBool enable) { + producer->handle->EnableLocalLog(enable != 0); + } + int asapo_producer_set_credentials(AsapoProducerHandle producer, AsapoSourceCredentialsHandle source_cred, + AsapoErrorHandle* error) { + auto err = producer->handle->SetCredentials(*source_cred->handle.get()); + return process_error(error, std::move(err)); + } + uint64_t asapo_producer_get_requests_queue_size(AsapoProducerHandle producer) { + return producer->handle->GetRequestsQueueSize(); + } + uint64_t asapo_producer_get_requests_queue_volume_mb(AsapoProducerHandle producer) { + return producer->handle->GetRequestsQueueVolumeMb(); + } + void asapo_producer_set_requests_queue_limits(AsapoProducerHandle producer, uint64_t size, uint64_t volume) { + producer->handle->SetRequestsQueueLimits(size, volume); + } + int asapo_producer_wait_requests_finished(AsapoProducerHandle producer, uint64_t timeout_ms, + AsapoErrorHandle* error) { + auto err = producer->handle->WaitRequestsFinished(timeout_ms); + return process_error(error, std::move(err)); + } + + + +} diff --git a/tests/automatic/common/cpp/include/testing_c.h b/tests/automatic/common/cpp/include/testing_c.h new file mode 100644 index 0000000000000000000000000000000000000000..a0ff101d2898f50da6b56cbf3eae0649c0b02cb3 --- /dev/null +++ b/tests/automatic/common/cpp/include/testing_c.h @@ -0,0 +1,56 @@ +#include <string.h> +#include <stdio.h> +#include <stdlib.h> + + +#ifndef ASAPO_TESTS_AUTOMATIC_COMMON_CPP_INCLUDE_TESTING_C_H_ +#define ASAPO_TESTS_AUTOMATIC_COMMON_CPP_INCLUDE_TESTING_C_H_ + +#define EXIT_IF_ERROR(...) exit_if_error_(__VA_ARGS__,__LINE__) +#define ASSERT_EQ_INT(...) assert_eq_int_(__VA_ARGS__,__LINE__) +#define ASSERT_EQ_STRING(...) assert_eq_string_(__VA_ARGS__,__LINE__) +#define ASSERT_TRUE(...) assert_true_(__VA_ARGS__,__LINE__) + +void assert_eq_int_(uint64_t expected, uint64_t got, const char* message, int line) { + printf("asserting %s at %d\n", message, line); + if (expected != got) { + printf("%s: expected %llu got %llu at %d\n", message, (unsigned long long)expected, (unsigned long long)got, line); + exit(EXIT_FAILURE); + } + printf("asserting %s at %d - OK\n", message, line); + +} + +void assert_eq_string_(const char* expected, const char* got, const char* message, int line) { + printf("asserting %s at %d\n", message, line); + if (strcmp(expected, got) != 0) { + printf("%s: expected %s got %s at %d\n", message, expected, got, line); + exit(EXIT_FAILURE); + } + printf("asserting %s at %d - OK \n", message, line); + +} + +void assert_true_(int value, const char* message, int line) { + printf("asserting %s at %d\n", message, line); + if (value != 1) { + printf("%s failed at %d\n", message, line); + exit(EXIT_FAILURE); + } + printf("asserting %s at %d - OK \n", message, line); +} + +void exit_if_error_(const char* error_string, const AsapoErrorHandle err, int line) { + printf("asserting no error for %s at %d\n", error_string, line); + if (asapo_is_error(err)) { + char buf[1024]; + asapo_error_explain(err, buf, sizeof(buf)); + printf("%s %s\n", error_string, buf); + exit(EXIT_FAILURE); + } + printf("asserting no error for %s at %d - OK \n", error_string, line); +} + + + +#endif //ASAPO_TESTS_AUTOMATIC_COMMON_CPP_INCLUDE_TESTING_C_H_ diff --git a/tests/automatic/consumer/consumer_api/consumer_api.c b/tests/automatic/consumer/consumer_api/consumer_api.c index ed81afab911933054f4c398fd3ddf156540e3ab3..0ec0990021aa97fa665e1ec7770ed50ad13f687d 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.c +++ b/tests/automatic/consumer/consumer_api/consumer_api.c @@ -1,49 +1,10 @@ #include "asapo/consumer_c.h" +#include "testing_c.h" #include <string.h> #include <stdio.h> #include <stdlib.h> -#define EXIT_IF_ERROR(...) exit_if_error_(__VA_ARGS__,__LINE__) -#define ASSERT_EQ_INT(...) assert_eq_int_(__VA_ARGS__,__LINE__) -#define ASSERT_EQ_STRING(...) assert_eq_string_(__VA_ARGS__,__LINE__) -#define ASSERT_TRUE(...) assert_true_(__VA_ARGS__,__LINE__) - -void assert_eq_int_(uint64_t expected, uint64_t got, const char *message, int line) { - printf("asserting %s at %d\n",message,line); - if (expected!=got) { - printf("%s: expected %llu got %llu at %d\n",message, (unsigned long long)expected, (unsigned long long)got,line); - exit(EXIT_FAILURE); - } -} - -void assert_eq_string_(const char * expected, const char *got, const char *message, int line) { - printf("asserting %s at %d\n",message,line); - if (strcmp(expected,got)!=0) { - printf("%s: expected %s got %s at %d\n",message, expected, got,line); - exit(EXIT_FAILURE); - } -} - -void assert_true_(int value, const char *message, int line) { - printf("asserting %s at %d\n",message,line); - if (value!=1) { - printf("%s failed at %d\n",message, line); - exit(EXIT_FAILURE); - } -} - -void exit_if_error_(const char *error_string, const AsapoErrorHandle err, int line) { - printf("asserting no error for %s at %d\n",error_string,line); - if (asapo_is_error(err)) { - char buf[1024]; - asapo_error_explain(err, buf, sizeof(buf)); - printf("%s %s\n", error_string, buf); - exit(EXIT_FAILURE); - } -} - - void test_datasets(AsapoConsumerHandle consumer, AsapoStringHandle group_id) { AsapoErrorHandle err = asapo_new_handle(); @@ -51,8 +12,8 @@ void test_datasets(AsapoConsumerHandle consumer, AsapoStringHandle group_id) { AsapoDataSetHandle dataset = asapo_consumer_get_next_dataset(consumer,group_id, 0, "default", &err); EXIT_IF_ERROR("asapo_consumer_get_next_dataset", err); ASSERT_EQ_INT(3,asapo_dataset_get_size(dataset),"asapo_dataset_get_size"); - AsapoMessageDataHandle md0 = asapo_dataset_get_item(dataset,0); - AsapoMessageDataHandle md2 = asapo_dataset_get_item(dataset,2); + AsapoMessageMetaHandle md0 = asapo_dataset_get_item(dataset,0); + AsapoMessageMetaHandle md2 = asapo_dataset_get_item(dataset,2); ASSERT_EQ_STRING("1_1",asapo_message_meta_get_name(md0),"dataset 0 filename"); ASSERT_EQ_STRING("1_3",asapo_message_meta_get_name(md2),"dataset 2 filename"); ASSERT_EQ_STRING("{\"test\":10}",asapo_message_meta_get_metadata(md0),"dataset 0 meta"); @@ -63,7 +24,7 @@ void test_datasets(AsapoConsumerHandle consumer, AsapoStringHandle group_id) { // get last dataset = asapo_consumer_get_last_dataset(consumer, 0, "default", &err); EXIT_IF_ERROR("asapo_consumer_get_last_dataset", err); - AsapoMessageDataHandle md = asapo_dataset_get_item(dataset,0); + AsapoMessageMetaHandle md = asapo_dataset_get_item(dataset,0); ASSERT_EQ_STRING("10_1",asapo_message_meta_get_name(md),"dataset 10 filename"); asapo_free_handle(&md); asapo_free_handle(&dataset); @@ -263,8 +224,7 @@ int main(int argc, char* argv[]) { EXIT_IF_ERROR("create consumer", err); AsapoStringHandle group_id2 = asapo_string_from_c_str("hello"); - printf("%s\n",asapo_string_c_str(group_id2)); -// ASSERT_EQ_STRING("hello",asapo_string_c_str(group_id2),"asapo str <-> string"); + ASSERT_EQ_STRING("hello",asapo_string_c_str(group_id2),"asapo str <-> string"); asapo_consumer_set_timeout(consumer, 1000ull); diff --git a/tests/automatic/producer/CMakeLists.txt b/tests/automatic/producer/CMakeLists.txt index 38bd45932b4a517d8ef088d9662a4452d3dc1cd1..a2e60a0e8d3f3fb7892ad5ad17972fa8ccffa170 100644 --- a/tests/automatic/producer/CMakeLists.txt +++ b/tests/automatic/producer/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectory(file_monitor_producer) add_subdirectory(beamtime_metadata) add_subdirectory(cpp_api) +add_subdirectory(c_api) if (BUILD_PYTHON) add_subdirectory(python_api) diff --git a/tests/automatic/producer/c_api/CMakeLists.txt b/tests/automatic/producer/c_api/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..d112715d80b40c21257927e01d3088cbd051c5f0 --- /dev/null +++ b/tests/automatic/producer/c_api/CMakeLists.txt @@ -0,0 +1,15 @@ +set(TARGET_NAME producer_api_c) +set(SOURCE_FILES producer_api.c) + + +################################ +# Executable and link +################################ +add_executable(${TARGET_NAME} ${SOURCE_FILES}) +target_link_libraries(${TARGET_NAME} test_common asapo-producer) + +################################ +# Testing +################################ +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>") + diff --git a/tests/automatic/producer/c_api/check_linux.sh b/tests/automatic/producer/c_api/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..3830f771cf3abe31bf1fc077c0f0c61cf60a4b54 --- /dev/null +++ b/tests/automatic/producer/c_api/check_linux.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +beamtime_id=asapo_test +data_source=c +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +facility=test_facility +year=2019 +receiver_folder=${receiver_root_folder}/${facility}/gpfs/${beamline}/${year}/data/${beamtime_id} + + +Cleanup() { + echo cleanup + rm -rf ${receiver_root_folder} + echo "db.dropDatabase()" | mongo ${beamtime_id}_${data_source} +} + +mkdir -p ${receiver_folder} + +echo test > file1 + +$@ 127.0.0.1:8400 $data_source $beamtime_id diff --git a/tests/automatic/producer/c_api/check_windows.bat b/tests/automatic/producer/c_api/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..0fdbabcbed5fdd6d82a1e429fbb7174ec9cc2228 --- /dev/null +++ b/tests/automatic/producer/c_api/check_windows.bat @@ -0,0 +1,28 @@ +SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe" +SET beamtime_id=asapo_test +SET beamline=test +SET data_source=c +SET receiver_root_folder=c:\tmp\asapo\receiver\files +SET receiver_folder="%receiver_root_folder%\test_facility\gpfs\%beamline%\2019\data\%beamtime_id%" +SET dbname=%beamtime_id%_%data_source% + +mkdir %receiver_folder% + +echo test > file1 + +ping 192.0.2.1 -n 1 -w 1000 > nul + +"%1" "127.0.0.1:8400" %data_source% %beamtime_id% > out +type out + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +rmdir /S /Q %receiver_root_folder% +echo db.dropDatabase() | %mongo_exe% %dbname% + + diff --git a/tests/automatic/producer/c_api/producer_api.c b/tests/automatic/producer/c_api/producer_api.c new file mode 100644 index 0000000000000000000000000000000000000000..2aee99a912477ba2dca2fab789d39bb2e456c936 --- /dev/null +++ b/tests/automatic/producer/c_api/producer_api.c @@ -0,0 +1,124 @@ +#include "asapo/producer_c.h" +#include "testing_c.h" + +#include <string.h> +#include <stdio.h> +#include <stdlib.h> + +void callback(void* original_data, AsapoRequestCallbackPayloadHandle payload, AsapoErrorHandle error) { + EXIT_IF_ERROR("error after callback", error); + AsapoStringHandle response = asapo_request_callback_payload_get_response(payload); + const struct AsapoGenericRequestHeader* header = asapo_request_callback_payload_get_original_header(payload); + + ASSERT_EQ_INT(1,header->data_id,"data id"); + ASSERT_EQ_STRING("hello",(const char*)original_data,"data in payload"); + + free(original_data); + + asapo_free_handle(&response); +} + +void test_send(AsapoProducerHandle producer) { + AsapoErrorHandle err = asapo_new_handle(); + + AsapoMessageHeaderHandle message_header = asapo_create_message_header(1,6, + "processed/test","",0,0,0); + char* data = ( char* )malloc( 6 *sizeof( char ) ); + strcpy(data,"hello"); + + asapo_producer_send(producer, + message_header, + data, + kDefaultIngestMode, + "default", + callback, + &err); + EXIT_IF_ERROR("error sending data", err); + + asapo_producer_send_stream_finished_flag(producer,"default",1,"",NULL,&err); + EXIT_IF_ERROR("asapo_producer_send_stream_finished_flag", err); + + asapo_producer_wait_requests_finished(producer,2000,&err); + EXIT_IF_ERROR("asapo_producer_wait_requests_finished", err); + + asapo_free_handle(&err); + asapo_free_handle(&message_header); +} + +void test_meta(AsapoProducerHandle producer) { + AsapoErrorHandle err = asapo_new_handle(); + char meta[] = "{\"data\":\"test\",\"embedded\":{\"edata\":2}}"; + asapo_producer_send_beamtime_metadata(producer,meta,kInsert,1,NULL,&err); + asapo_producer_wait_requests_finished(producer,5000,NULL); + AsapoStringHandle meta_received = asapo_producer_get_beamtime_meta(producer,5000, &err); + EXIT_IF_ERROR("asapo_producer_get_beamtime_meta", err); + ASSERT_EQ_STRING(meta,(const char*)asapo_string_c_str(meta_received),"returned same meta as was ingested"); + + asapo_producer_send_stream_metadata(producer,meta,kInsert,1,"default", NULL,&err); + asapo_producer_wait_requests_finished(producer,5000,NULL); + AsapoStringHandle stream_meta_received = asapo_producer_get_stream_meta(producer,"default",5000, &err); + EXIT_IF_ERROR("asapo_producer_send_stream_metadata", err); + ASSERT_EQ_STRING(meta,(const char*)asapo_string_c_str(stream_meta_received),"stream meta returned same meta as was ingested"); + asapo_free_handle(&err); + asapo_free_handle(&meta_received); + asapo_free_handle(&stream_meta_received); +} + +void test_streams(AsapoProducerHandle producer) { + AsapoErrorHandle err = asapo_new_handle(); + AsapoStreamInfoHandle sinfo = asapo_producer_get_stream_info(producer,"default",2000,&err); + EXIT_IF_ERROR("asapo_producer_get_stream_info", err); + ASSERT_EQ_STRING("default",asapo_stream_info_get_name(sinfo),"stream name"); + + AsapoStreamInfoHandle sinfo_last = asapo_producer_get_last_stream(producer,2000,&err); + EXIT_IF_ERROR("asapo_producer_get_last_stream", err); + ASSERT_EQ_STRING("default",asapo_stream_info_get_name(sinfo_last),"stream name"); + ASSERT_EQ_INT(1,(uint64_t)asapo_stream_info_get_ffinished(sinfo_last),"stream finished"); + ASSERT_EQ_INT(2,asapo_stream_info_get_last_id(sinfo_last),"last id 0"); + + asapo_free_handle(&sinfo_last); + + asapo_producer_delete_stream(producer,"default",5000,1,1,&err); + EXIT_IF_ERROR("asapo_producer_delete_stream", err); + + sinfo_last = asapo_producer_get_last_stream(producer,2000,&err); + EXIT_IF_ERROR("asapo_producer_get_last_stream after deletion", err); + ASSERT_EQ_INT(0,asapo_stream_info_get_last_id(sinfo_last),"last id 0"); + + + asapo_free_handle(&err); + asapo_free_handle(&sinfo); + asapo_free_handle(&sinfo_last); +} + + + +int main(int argc, char* argv[]) { + if (argc <4) { + abort(); + } + const char *endpoint = argv[1]; + const char *source = argv[2]; + const char *beamtime = argv[3]; + + + AsapoErrorHandle err = asapo_new_handle(); + AsapoSourceCredentialsHandle cred = asapo_create_source_credentials(kProcessed, + beamtime, + "", source, ""); + + AsapoProducerHandle producer = asapo_create_producer(endpoint,3,kTcp, cred,60000,&err); + EXIT_IF_ERROR("create producer", err); + + asapo_producer_enable_local_log(producer, 1); + asapo_producer_set_log_level(producer, Debug); + + test_send(producer); + test_meta(producer); + test_streams(producer); + + asapo_free_handle(&err); + asapo_free_handle(&cred); + asapo_free_handle(&producer); + return 0; +}