diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h index 995e20780b4e643a49ef5ea25af4a6f3945b8848..60e0069996f7eb4f18b0b24c840d6a71af4b86bd 100644 --- a/consumer/api/c/include/asapo/consumer_c.h +++ b/consumer/api/c/include/asapo/consumer_c.h @@ -81,11 +81,11 @@ AsapoError asapo_consumer_negative_acknowledge(AsapoConsumer consumer, uint64_t delay_ms, const char* stream); AsapoIdList asapo_consumer_get_unacknowledged_messages(AsapoConsumer consumer, - AsapoString group_id, - uint64_t from_id, - uint64_t to_id, - const char* stream, - AsapoError* error); + AsapoString group_id, + uint64_t from_id, + uint64_t to_id, + const char* stream, + AsapoError* error); void asapo_delete_id_list(AsapoIdList* list); size_t asapo_id_list_get_size(const AsapoIdList list); uint64_t asapo_id_list_get_item(const AsapoIdList list, @@ -122,24 +122,23 @@ AsapoError asapo_consumer_retrive_data(AsapoConsumer consumer, AsapoMessageData* data); AsapoError asapo_consumer_get_last(AsapoConsumer consumer, - AsapoMessageMeta info, + AsapoMessageMeta* info, AsapoMessageData* data, const char* stream); AsapoError asapo_consumer_get_next(AsapoConsumer consumer, AsapoString group_id, - AsapoMessageMeta info, + AsapoMessageMeta* info, AsapoMessageData* data, const char* stream); void asapo_delete_message_data(AsapoMessageData* data); const char* asapo_message_data_get_as_chars(const AsapoMessageData data); AsapoSourceCredentials asapo_create_source_credentials(enum AsapoSourceType type, - const char* beamtime, - const char* beamline, - const char* data_source, - const char* token); + const char* beamtime, + const char* beamline, + const char* data_source, + const char* token); void asapo_delete_source_credentials(AsapoSourceCredentials* cred); -AsapoMessageMeta asapo_create_message_meta(); void asapo_delete_message_meta(AsapoMessageMeta* meta); const char* asapo_message_meta_get_name(const AsapoMessageMeta md); diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index 01b4b369d695e496ef4c5cb439fefe9a951cf4b8..8afb212abd98ea0a642b1abe984f2de93f2c182d 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -95,7 +95,7 @@ extern "C" { "incompatible bit reps between c++ and c for asapo::NetworkConnectionType"); static void time_point_to_time_spec(std::chrono::system_clock::time_point tp, - struct timespec* stamp) { + struct timespec* stamp) { stamp->tv_sec = std::chrono::duration_cast<std::chrono::seconds>(tp.time_since_epoch()).count(); stamp->tv_nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(tp.time_since_epoch()).count() % 1000000000; } @@ -131,10 +131,10 @@ extern "C" { /// \copydoc asapo::ConsumerFactory::CreateConsumer /// return handle to the created cosumer AsapoConsumer asapo_create_consumer(const char* server_name, - const char* source_path, - AsapoBool has_filesysytem, - AsapoSourceCredentials source, - AsapoError* error) { + const char* source_path, + AsapoBool has_filesysytem, + AsapoSourceCredentials source, + AsapoError* error) { asapo::Error err; @@ -158,7 +158,7 @@ extern "C" { /// \copydoc asapo::Consumer::GenerateNewGroupId() /// \param[in] consumer the handle of the consumer concerned AsapoString asapo_consumer_generate_new_group_id(AsapoConsumer consumer, - AsapoError* error) { + AsapoError* error) { asapo::Error err; auto result = new std::string(consumer->GenerateNewGroupId(&err)); *error = err.release(); @@ -179,11 +179,13 @@ extern "C" { //! give a pointer to the content of the asapoString /// \param[in] str the handle of the asapoString in question /// \return const char pointer to the content - const char* asapo_string_c_str(const AsapoString str); + const char* asapo_string_c_str(const AsapoString str) { + return str->c_str(); + } //! give the size of an asapoString /// \param[in] str the handle of the asapoString in question /// \return the number of bytes in the string , not counting the final nul byte. - size_t asapoStringSize(const AsapoString str) { + size_t asapo_string_size(const AsapoString str) { return str->size(); } //! clean up string @@ -205,8 +207,8 @@ extern "C" { /// \copydoc asapo::Consumer::ResetLastReadMarker() /// \param[in] consumer the handle of the consumer concerned AsapoError asapo_consumer_reset_last_read_marker(AsapoConsumer consumer, - const AsapoString group_id, - const char* stream) { + const AsapoString group_id, + const char* stream) { auto err = consumer->ResetLastReadMarker(*group_id, stream); return err.release(); } @@ -215,9 +217,9 @@ extern "C" { /// \copydoc asapo::Consumer::SetLastReadMarker() /// \param[in] consumer the handle of the consumer concerned AsapoError asapo_consumer_set_last_read_marker(AsapoConsumer consumer, - const AsapoString group_id, - uint64_t value, - const char* stream) { + const AsapoString group_id, + uint64_t value, + const char* stream) { auto err = consumer->SetLastReadMarker(*group_id, value, stream); return err.release(); } @@ -225,9 +227,9 @@ extern "C" { /// \copydoc asapo::Consumer::Acknowledge() /// \param[in] consumer the handle of the consumer concerned AsapoError asapo_consumer_acknowledge(AsapoConsumer consumer, - const AsapoString group_id, - uint64_t id, - const char* stream) { + const AsapoString group_id, + uint64_t id, + const char* stream) { auto err = consumer->Acknowledge(*group_id, id, stream); return err.release(); } @@ -235,10 +237,10 @@ extern "C" { /// \copydoc asapo::Consumer::NegativeAcknowledge() /// \param[in] consumer the handle of the consumer concerned AsapoError asapo_consumer_negative_acknowledge(AsapoConsumer consumer, - const AsapoString group_id, - uint64_t id, - uint64_t delay_ms, - const char* stream) { + const AsapoString group_id, + uint64_t id, + uint64_t delay_ms, + const char* stream) { auto err = consumer->NegativeAcknowledge(*group_id, id, delay_ms, stream); return err.release(); } @@ -247,11 +249,11 @@ extern "C" { /// \copydoc asapo::Consumer::GetUnacknowledgedMessages() /// \param[in] consumer the handle of the consumer concerned AsapoIdList asapo_consumer_get_unacknowledged_messages(AsapoConsumer consumer, - AsapoString group_id, - uint64_t from_id, - uint64_t to_id, - const char* stream, - AsapoError* error) { + AsapoString group_id, + uint64_t from_id, + uint64_t to_id, + const char* stream, + AsapoError* error) { asapo::Error err; auto list = new asapo::IdList(consumer->GetUnacknowledgedMessages(*group_id, from_id, to_id, @@ -274,7 +276,7 @@ extern "C" { /// \param[in] list handle of an id list /// \param[in] index index of the item to return, start at 0 uint64_t asapo_id_list_get_item(const AsapoIdList list, - size_t index) { + size_t index) { return list->at(index); } //! wraps asapo::Consumer::ForceNoRdma() @@ -297,9 +299,9 @@ extern "C" { \sa asapoStreamInfosGetInfo() asapo_stream_infos_get_size() asapo_delete_stream_infos() */ AsapoStreamInfos asapo_consumer_get_stream_list(AsapoConsumer consumer, - const char* from, - enum AsapoStreamFilter filter, - AsapoError* error) { + const char* from, + enum AsapoStreamFilter filter, + AsapoError* error) { asapo::Error err; auto info = new asapo::StreamInfos(consumer->GetStreamList(from, static_cast<asapo::StreamFilter>(filter), @@ -313,7 +315,7 @@ extern "C" { /// \param[in] index index od info to get, starts at 0 /// \return handle to stream info, do not delete! const AsapoStreamInfo asapo_stream_infos_get_item(const AsapoStreamInfos infos, - size_t index) { + size_t index) { return &infos->at(index); } //! get size (number of elements) of a stream infos handle @@ -336,9 +338,9 @@ extern "C" { /// \param[in] delete_meta the delete_meta part of the asapo::DeleteStreamOptions /// \param[in] error_on_not_exist the error_on_not_exist part of the asapo::DeleteStreamOptions AsapoError asapo_consumer_delete_stream(AsapoConsumer consumer, - const char* stream, - AsapoBool delete_meta, - AsapoBool error_on_not_exist) { + const char* stream, + AsapoBool delete_meta, + AsapoBool error_on_not_exist) { asapo::DeleteStreamOptions opt(delete_meta, error_on_not_exist); auto err = consumer->DeleteStream(stream, opt); return err.release(); @@ -347,8 +349,8 @@ extern "C" { /// \copydoc asapo::Consumer::GetCurrentSize() /// \param[in] consumer the consumer that is acted upon uint64_t asapo_consumer_get_current_size(AsapoConsumer consumer, - const char* stream, - AsapoError* error) { + const char* stream, + AsapoError* error) { asapo::Error err; auto retval = consumer->GetCurrentSize(stream, &err); *error = err.release(); @@ -358,9 +360,9 @@ extern "C" { /// \copydoc asapo::Copydoc::GetCurrentDatasetCount() /// \param[in] consumer the consumer that is acted upon uint64_t asapo_consumer_get_current_dataset_count(AsapoConsumer consumer, - const char* stream, - AsapoBool include_incomplete, - AsapoError* error) { + const char* stream, + AsapoBool include_incomplete, + AsapoError* error) { asapo::Error err; auto retval = consumer->GetCurrentDatasetCount(stream, include_incomplete, &err); *error = err.release(); @@ -372,7 +374,7 @@ extern "C" { /// \param[in] consumer the consumer that is acted upon /// the returned string must be freed after use with asapo_delete_string() AsapoString asapo_consumer_get_beamtime_meta(AsapoConsumer consumer, - AsapoError* error) { + AsapoError* error) { asapo::Error err; auto retval = new std::string(consumer->GetBeamtimeMeta(&err)); *error = err.release(); @@ -385,8 +387,8 @@ extern "C" { /// \param[in] consumer the consumer that is acted upon /// if data are retrieved (data != NULL) they must be freed with asapo_delete_message_data() AsapoError asapo_consumer_retrive_data(AsapoConsumer consumer, - AsapoMessageMeta info, - AsapoMessageData* data); + AsapoMessageMeta info, + AsapoMessageData* data); @@ -395,15 +397,22 @@ extern "C" { /// \param[in] consumer the consumer that is acted upon /// if data are retrieved (data != NULL) they must be freed with asapo_delete_message_data() AsapoError asapo_consumer_get_last(AsapoConsumer consumer, - AsapoMessageMeta info, - AsapoMessageData* data, - const char* stream) { - delete *data; + AsapoMessageMeta* info, + AsapoMessageData* data, + const char* stream) { + if (data) delete *data; // do we need that? asapo::MessageData d; - auto err = consumer->GetLast(info, data ? &d : nullptr, stream); + asapo::MessageMeta* fi = nullptr; + if (info) { + fi = new asapo::MessageMeta; + } + auto err = consumer->GetLast(fi, data ? &d : nullptr, stream); if (data) { *data = d.release(); } + if (info) { + *info = fi; + } return err.release(); } @@ -412,16 +421,23 @@ extern "C" { /// \param[in] consumer the consumer that is acted upon /// if data are retrieved (data != NULL) they must be freed with asapo_delete_message_data() AsapoError asapo_consumer_get_next(AsapoConsumer consumer, - AsapoString group_id, - AsapoMessageMeta info, - AsapoMessageData* data, - const char* stream) { - delete *data; + AsapoString group_id, + AsapoMessageMeta* info, + AsapoMessageData* data, + const char* stream) { + if (data) delete *data; asapo::MessageData d; - auto err = consumer->GetNext(*group_id, info, data ? &d : nullptr, stream); + asapo::MessageMeta* fi = nullptr; + if (info) { + fi = new asapo::MessageMeta; + } + auto err = consumer->GetNext(*group_id, fi, data ? &d : nullptr, stream); if (data) { *data = d.release(); } + if (info) { + *info = fi; + } return err.release(); } @@ -443,10 +459,10 @@ extern "C" { //! wraps asapo::SourceCredentials::SourceCredentials() /// \copydoc asapo::SourceCredentials::SourceCredentials() AsapoSourceCredentials asapo_create_source_credentials(enum AsapoSourceType type, - const char* beamtime, - const char* beamline, - const char* data_source, - const char* token) { + const char* beamtime, + const char* beamline, + const char* data_source, + const char* token) { return new asapo::SourceCredentials(static_cast<asapo::SourceType>(type), beamtime, beamline, data_source, token); @@ -457,13 +473,7 @@ extern "C" { delete *cred; cred = nullptr; } - //! create asapoMessageMeta object - /// create a metadata object, the handle can be used as a parameter to consumer functions - /// \sa asapo_consumer_get_last() - /// \return handle to metadata object - AsapoMessageMeta asapo_create_message_meta() { - return new asapo::MessageMeta; - } + //! clean up asapoMessageMeta object /// frees the resources occupied by meta, sets *meta to NULL void asapo_delete_message_meta(AsapoMessageMeta* meta) { @@ -483,7 +493,7 @@ extern "C" { /// \param[out] stamp the timestamp as timespec /// \sa asapo::MessageMeta void asapo_message_meta_get_timestamp(const AsapoMessageMeta md, - struct timespec* stamp) { + struct timespec* stamp) { time_point_to_time_spec(md->timestamp, stamp); } @@ -563,7 +573,7 @@ extern "C" { /// \param[out] stamp creation timestamp as timespec /// \sa asapo::StreamInfo void asapo_stream_info_get_timestamp_created(const AsapoStreamInfo info, - struct timespec* stamp) { + struct timespec* stamp) { time_point_to_time_spec(info->timestamp_created, stamp); } //! get time of last entry from the stream info object @@ -571,7 +581,7 @@ extern "C" { /// \param[out] stamp last entry timestamp as timespec /// \sa asapo::StreamInfo void asapo_stream_info_get_timestamp_last_entry(const AsapoStreamInfo info, - struct timespec* stamp) { + struct timespec* stamp) { time_point_to_time_spec(info->timestamp_lastentry, stamp); } diff --git a/examples/consumer/CMakeLists.txt b/examples/consumer/CMakeLists.txt index 612edd821b7a60a010bb88c851951b6e13392b0f..0cf72e439d5fd58b000b3fd78c1a9fe0d98c760d 100644 --- a/examples/consumer/CMakeLists.txt +++ b/examples/consumer/CMakeLists.txt @@ -2,7 +2,6 @@ find_package(Threads) add_subdirectory(getnext) add_subdirectory(simple-consumer-c) -add_subdirectory(simple-consumer) if(BUILD_EXAMPLES AND BUILD_PYTHON) add_subdirectory(getnext_python) diff --git a/examples/consumer/simple-consumer-c/CMakeLists.txt b/examples/consumer/simple-consumer-c/CMakeLists.txt index 6529b6d76f84601fc42b79831b98efb026500247..9d10ea1f9a31f5c3a90c8cd88a1601a958c8b422 100644 --- a/examples/consumer/simple-consumer-c/CMakeLists.txt +++ b/examples/consumer/simple-consumer-c/CMakeLists.txt @@ -1,21 +1,8 @@ cmake_minimum_required(VERSION 2.8) -project(asapo-consume-c) - -set(CMAKE_CXX_STANDARD 11) - -IF(CMAKE_C_COMPILER_ID STREQUAL "GNU") - SET( CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++") -ENDIF() - -find_package (Threads) - set(TARGET_NAME "asapo-consume-c") set(SOURCE_FILES consume.c) -link_directories(asapo/lib) - add_executable(${TARGET_NAME} ${SOURCE_FILES}) -target_include_directories(${TARGET_NAME} PUBLIC asapo/include) target_link_libraries(${TARGET_NAME} asapo-consumer curl ${CMAKE_THREAD_LIBS_INIT}) diff --git a/examples/consumer/simple-consumer-c/consume.c b/examples/consumer/simple-consumer-c/consume.c index 13e8a9d3c135506041fc3cdbbc0001001dec0aa5..44ad3051cf0b7b94602a1a16811d16e005d47937 100644 --- a/examples/consumer/simple-consumer-c/consume.c +++ b/examples/consumer/simple-consumer-c/consume.c @@ -36,9 +36,8 @@ int main(int argc, char* argv[]) { AsapoString group_id = asapo_consumer_generate_new_group_id(consumer, &err); exit_if_error("Cannot create group id", err); - AsapoMessageMeta fi = asapo_create_message_meta(); + AsapoMessageMeta fi; AsapoMessageData data; - err = asapo_consumer_get_last(consumer, &fi, &data, group_id); exit_if_error("Cannot get next record", err); diff --git a/tests/automatic/consumer/consumer_api/CMakeLists.txt b/tests/automatic/consumer/consumer_api/CMakeLists.txt index d7f451aa2b93cbdb99a6d5d38fea3df90190df84..8276b6c3efa9e59440a081bbad8a2de407a318d5 100644 --- a/tests/automatic/consumer/consumer_api/CMakeLists.txt +++ b/tests/automatic/consumer/consumer_api/CMakeLists.txt @@ -1,16 +1,25 @@ set(TARGET_NAME consumer_api) +set(TARGET_NAME_C consumer_api_c) + set(SOURCE_FILES consumer_api.cpp) +set(SOURCE_FILES_C consumer_api.c) ################################ # Executable and link ################################ add_executable(${TARGET_NAME} ${SOURCE_FILES}) +add_executable(${TARGET_NAME_C} ${SOURCE_FILES_C}) + target_link_libraries(${TARGET_NAME} test_common asapo-consumer) +target_link_libraries(${TARGET_NAME_C} test_common asapo-consumer) + ################################ # Testing ################################ -add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>" - ) +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:${TARGET_NAME}>") +add_script_test("${TARGET_NAME_C}" "$<TARGET_FILE:${TARGET_NAME_C}>") + + diff --git a/tests/automatic/consumer/consumer_api/consumer_api.c b/tests/automatic/consumer/consumer_api/consumer_api.c new file mode 100644 index 0000000000000000000000000000000000000000..599a057fa2ac14142262bdbdb4421ca511e6989c --- /dev/null +++ b/tests/automatic/consumer/consumer_api/consumer_api.c @@ -0,0 +1,83 @@ +#include "asapo/consumer_c.h" + +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> + + +void assert_eq_int(int expected, int got, const char *message) { + if (expected!=got) { + printf("%s: expected %d got %d at %d\n",message, expected, got,__LINE__); + exit(EXIT_FAILURE); + } +} + +void assert_eq_string(const char * expected, const char *got, const char *message) { + if (strcmp(expected,got)!=0) { + printf("%s: expected %s got %s at %d\n",message, expected, got,__LINE__); + exit(EXIT_FAILURE); + } +} + +void assert_bool(int value, const char *message) { + if (value!=1) { + printf("%s failed at %d\n",message, __LINE__); + exit(EXIT_FAILURE); + } +} + + +void exit_if_error(const char *error_string, const AsapoError err) { + if (err) { + char buf[1024]; + asapo_error_explain(err, buf, sizeof(buf)); + printf("%s %s\n", error_string, buf); + exit(EXIT_FAILURE); + } +} + +int main(int argc, char* argv[]) { + AsapoError err = NULL; + + const char *endpoint = argv[1]; + const char *beamtime = argv[2]; + const char *token = argv[3]; + + if (strcmp(argv[4],"dataset") == 0) { // do not test datasets for now + exit(0); + } + + AsapoSourceCredentials cred = asapo_create_source_credentials(kProcessed, + beamtime, + "", "", token); + AsapoConsumer consumer = asapo_create_consumer(endpoint, + "", 1, + cred, + &err); + asapo_delete_source_credentials(&cred); + + exit_if_error("Cannot create consumer", err); + asapo_consumer_set_timeout(consumer, 1000ull); + + AsapoString group_id = asapo_consumer_generate_new_group_id(consumer, &err); + exit_if_error("Cannot create group id", err); + + printf("group id: %s\n",asapo_string_c_str(group_id)); + + AsapoMessageMeta fi; + AsapoMessageData data; + + err = asapo_consumer_get_last(consumer, &fi, NULL, "default"); + exit_if_error("Cannot get last record", err); + + assert_eq_int(10,asapo_message_meta_get_id(fi),"id"); + assert_eq_string("10",asapo_message_meta_get_name(fi),"id"); + + asapo_delete_message_meta(&fi); + asapo_delete_message_data(&data); + asapo_delete_consumer(&consumer); + asapo_delete_string(&group_id); + return EXIT_SUCCESS; +} +