From f3abcac68306ea62cd24c74f37ad822ee9b7299b Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 23 Jun 2021 13:02:25 +0200 Subject: [PATCH] add C API tests for datasets --- consumer/api/c/include/asapo/consumer_c.h | 14 +++ .../include/asapo/consumer/consumer_error.h | 1 - consumer/api/cpp/src/consumer_c_glue.cpp | 86 +++++++++++++++++- .../consumer/consumer_api/consumer_api.c | 90 ++++++++++++++++++- 4 files changed, 186 insertions(+), 5 deletions(-) diff --git a/consumer/api/c/include/asapo/consumer_c.h b/consumer/api/c/include/asapo/consumer_c.h index cb244ae91..06ef753a4 100644 --- a/consumer/api/c/include/asapo/consumer_c.h +++ b/consumer/api/c/include/asapo/consumer_c.h @@ -15,6 +15,8 @@ typedef void* AsapoStreamInfoHandle; typedef void* AsapoStreamInfosHandle; typedef void* AsapoIdListHandle; typedef void* AsapoDataSetHandle; +typedef void* AsapoPartialErrorDataHandle; +typedef void* AsapoConsumerErrorDataHandle; #include <time.h> #include <stdint.h> @@ -201,6 +203,18 @@ size_t asapo_message_metas_get_size(const AsapoMessageMetasHandle metas); AsapoMessageMetaHandle asapo_message_metas_get_item(const AsapoMessageMetasHandle metas, size_t index); + +AsapoPartialErrorDataHandle asapo_error_get_payload_from_partial_error(const AsapoErrorHandle error); +uint64_t asapo_partial_error_get_id(const AsapoPartialErrorDataHandle error_payload); +uint64_t asapo_partial_error_get_expected_size(const AsapoPartialErrorDataHandle error_payload); + + +AsapoConsumerErrorDataHandle asapo_error_get_payload_from_consumer_error(const AsapoErrorHandle error); +uint64_t asapo_consumer_error_get_id(const AsapoConsumerErrorDataHandle error_payload); +uint64_t asapo_consumer_error_get_id_max(const AsapoConsumerErrorDataHandle error_payload); +const char* asapo_consumer_error_get_next_stream(const AsapoConsumerErrorDataHandle error_payload); + + void asapo_free_handle(void** handle); void* asapo_new_handle(); diff --git a/consumer/api/cpp/include/asapo/consumer/consumer_error.h b/consumer/api/cpp/include/asapo/consumer/consumer_error.h index ddfeab2f5..55618fddc 100644 --- a/consumer/api/cpp/include/asapo/consumer/consumer_error.h +++ b/consumer/api/cpp/include/asapo/consumer/consumer_error.h @@ -34,7 +34,6 @@ class ConsumerErrorData : public CustomErrorData { std::string next_stream; }; - namespace ConsumerErrorTemplates { diff --git a/consumer/api/cpp/src/consumer_c_glue.cpp b/consumer/api/cpp/src/consumer_c_glue.cpp index 9ecc0fc55..9aede5da0 100644 --- a/consumer/api/cpp/src/consumer_c_glue.cpp +++ b/consumer/api/cpp/src/consumer_c_glue.cpp @@ -87,6 +87,19 @@ typedef AsapoHandlerHolder<asapo::IdList>* AsapoIdListHandle; //! handle for data sets /// touch only with proper functions and use asapo_free_handle() to delete typedef AsapoHandlerHolder<asapo::DataSet>* AsapoDataSetHandle; + +//! handle for partial error payload +/// create with asapo_new_handle() +/// free after use with asapo_free_handle() +/// A set of getters asapo_partial_error_get_xx() are defined +typedef AsapoHandlerHolder<asapo::PartialErrorData>* AsapoPartialErrorDataHandle; + +//! handle for consumer error payload +/// create with asapo_new_handle() +/// free after use with asapo_free_handle() +/// A set of getters asapo_consumer_error_get_xx() are defined +typedef AsapoHandlerHolder<asapo::ConsumerErrorData>* AsapoConsumerErrorDataHandle; + #include <algorithm> template<typename t> @@ -466,7 +479,7 @@ extern "C" { AsapoErrorHandle* error) { asapo::Error err; auto retval = new asapo::DataSet(consumer->handle->GetNextDataset(*group_id->handle, min_size, stream, &err)); - if (process_error(error, std::move(err)) < 0) { + if (process_error(error, std::move(err)) < 0 && err != asapo::ConsumerErrorTemplates::kPartialData) { return nullptr; } return new AsapoHandlerHolder<asapo::DataSet> {retval}; @@ -482,7 +495,7 @@ extern "C" { AsapoErrorHandle* error) { asapo::Error err; auto retval = new asapo::DataSet(consumer->handle->GetLastDataset(min_size, stream, &err)); - if (process_error(error, std::move(err)) < 0) { + if (process_error(error, std::move(err)) < 0 && err != asapo::ConsumerErrorTemplates::kPartialData) { return nullptr; } return new AsapoHandlerHolder<asapo::DataSet> {retval}; @@ -515,7 +528,7 @@ extern "C" { AsapoErrorHandle* error) { asapo::Error err; auto retval = new asapo::DataSet(consumer->handle->GetDatasetById(id, min_size, stream, &err)); - if (process_error(error, std::move(err)) < 0) { + if (process_error(error, std::move(err)) < 0 && err != asapo::ConsumerErrorTemplates::kPartialData) { return nullptr; } return new AsapoHandlerHolder<asapo::DataSet> {retval}; @@ -767,6 +780,73 @@ extern "C" { return new AsapoHandlerHolder<asapo::MessageMeta> {&(metas->handle->at(index)), false}; } + +//! get payload from partial error +/// \param[in] asapo error +/// \return handle to partial error data or NULL if error is wrong type + AsapoPartialErrorDataHandle asapo_error_get_payload_from_partial_error(const AsapoErrorHandle error) { + if (error == nullptr && error->handle == nullptr) { + return nullptr; + } + auto payload = dynamic_cast<asapo::PartialErrorData*>(error->handle->GetCustomData()); + if (payload == nullptr) { + return nullptr; + } + return new AsapoHandlerHolder<asapo::PartialErrorData> {payload, false}; + } + + +//! get id from the partial error object +/// \param[in] error_payload handle of the partial error data object +/// \sa asapo::PartialErrorData + uint64_t asapo_partial_error_get_id(const AsapoPartialErrorDataHandle error_payload) { + return error_payload->handle->id; + } + +//! get expected dataset size from the partial error object +/// \param[in] error_payload handle of the partial error data object +/// \sa asapo::PartialErrorData + uint64_t asapo_partial_error_get_expected_size(const AsapoPartialErrorDataHandle error_payload) { + return error_payload->handle->expected_size; + } + +//! get payload from consumer error +/// \param[in] asapo error +/// \return handle to partial error data or NULL if error is wrong type + AsapoConsumerErrorDataHandle asapo_error_get_payload_from_consumer_error(const AsapoErrorHandle error) { + if (error == nullptr && error->handle == nullptr) { + return nullptr; + } + auto payload = dynamic_cast<asapo::ConsumerErrorData*>(error->handle->GetCustomData()); + if (payload == nullptr) { + return nullptr; + } + return new AsapoHandlerHolder<asapo::ConsumerErrorData> {payload, false}; + } + +//! get id from the consumer error data object +/// \param[in] error_payload handle of the consumer error data object +/// \sa asapo::ConsumerErrorData + uint64_t asapo_consumer_error_get_id(const AsapoConsumerErrorDataHandle error_payload) { + return error_payload->handle->id; + } + +//! get id_max from the consumer error data object +/// \param[in] error_payload handle of the consumer error data object +/// \sa asapo::ConsumerErrorData + uint64_t asapo_consumer_error_get_id_max(const AsapoConsumerErrorDataHandle error_payload) { + return error_payload->handle->id_max; + } + +//! get next_stream from the consumer error data object +/// \param[in] error_payload handle of the consumer error data object +/// \sa asapo::ConsumerErrorData + const char* asapo_consumer_error_get_next_stream(const AsapoConsumerErrorDataHandle error_payload) { + return error_payload->handle->next_stream.c_str(); + } + + + //! free handle memory, set handle to NULL /// \param[in] pointer to an ASAPO handle void asapo_free_handle(void** handle) { diff --git a/tests/automatic/consumer/consumer_api/consumer_api.c b/tests/automatic/consumer/consumer_api/consumer_api.c index 3a7029123..381121b42 100644 --- a/tests/automatic/consumer/consumer_api/consumer_api.c +++ b/tests/automatic/consumer/consumer_api/consumer_api.c @@ -44,6 +44,90 @@ void exit_if_error_(const char *error_string, const AsapoErrorHandle err, int li } +void test_datasets(AsapoConsumerHandle consumer, AsapoStringHandle group_id) { + AsapoErrorHandle err = asapo_new_handle(); + +// get next + AsapoDataSetHandle dataset = asapo_consumer_get_next_dataset(consumer,group_id, 0, "default", &err); + EXIT_IF_ERROR("asapo_consumer_get_next_dataset", err); + ASSERT_EQ_INT(3,asapo_dataset_get_size(dataset),"asapo_dataset_get_size"); + AsapoMessageDataHandle md0 = asapo_dataset_get_item(dataset,0); + AsapoMessageDataHandle md2 = asapo_dataset_get_item(dataset,2); + ASSERT_EQ_STRING("1_1",asapo_message_meta_get_name(md0),"dataset 0 filename"); + ASSERT_EQ_STRING("1_3",asapo_message_meta_get_name(md2),"dataset 2 filename"); + ASSERT_EQ_STRING("{\"test\":10}",asapo_message_meta_get_metadata(md0),"dataset 0 meta"); + asapo_free_handle(&md0); + asapo_free_handle(&md2); + asapo_free_handle(&dataset); + +// get last + dataset = asapo_consumer_get_last_dataset(consumer, 0, "default", &err); + EXIT_IF_ERROR("asapo_consumer_get_last_dataset", err); + AsapoMessageDataHandle md = asapo_dataset_get_item(dataset,0); + ASSERT_EQ_STRING("10_1",asapo_message_meta_get_name(md),"dataset 10 filename"); + asapo_free_handle(&md); + asapo_free_handle(&dataset); + +// get by id + dataset = asapo_consumer_get_dataset_by_id(consumer, 8,0, "default", &err); + EXIT_IF_ERROR("asapo_consumer_get_last_dataset", err); + md = asapo_dataset_get_item(dataset,2); + ASSERT_EQ_STRING("8_3",asapo_message_meta_get_name(md),"dataset 8 filename"); + asapo_free_handle(&md); + asapo_free_handle(&dataset); + +// size + int64_t size = asapo_consumer_get_current_dataset_count(consumer,"default", 0, &err); + EXIT_IF_ERROR("asapo_consumer_get_current_dataset_count", err); + ASSERT_EQ_INT(10,size,"asapo_consumer_get_current_dataset_count"); + +// get next incomplete datasets without min_size + dataset = asapo_consumer_get_next_dataset(consumer,group_id, 0, "incomplete", &err); + ASSERT_TRUE(asapo_error_get_type(err) == kPartialData,"incomplete dataset patial data error"); + ASSERT_EQ_INT(2,asapo_dataset_get_size(dataset),"incomplete dataset size"); + AsapoPartialErrorDataHandle err_data = asapo_error_get_payload_from_partial_error(err); + ASSERT_EQ_INT(3, asapo_partial_error_get_expected_size(err_data), "incomplete dataset size"); + ASSERT_EQ_INT(1,asapo_partial_error_get_id(err_data),"incomplete dataset id "); + asapo_free_handle(&err_data); + asapo_free_handle(&dataset); + +// get last incomplete datasets without min_size + asapo_consumer_get_last_dataset(consumer, 0, "incomplete", &err); + ASSERT_TRUE(asapo_error_get_type(err) == kEndOfStream,"incomplete dataset end of stream error"); + +// get dataset by id incomplete datasets without min_size + dataset = asapo_consumer_get_dataset_by_id(consumer,2, 0,"incomplete", &err); + ASSERT_TRUE(asapo_error_get_type(err) == kPartialData,"incomplete dataset patial data error"); + md = asapo_dataset_get_item(dataset,0); + ASSERT_EQ_STRING("2_1",asapo_message_meta_get_name(md),"incomplete dataset 2 filename"); + asapo_free_handle(&dataset); + asapo_free_handle(&md); + +// get next incomplete datasets with min_size = 2 + dataset = asapo_consumer_get_next_dataset(consumer,group_id, 2, "incomplete", &err); + EXIT_IF_ERROR("asapo_consumer_get_next_dataset minsize error", err); + ASSERT_EQ_INT(2,asapo_dataset_get_id(dataset),"incomplete dataset size"); + asapo_free_handle(&dataset); + +// get last incomplete datasets with min_size = 2 + dataset = asapo_consumer_get_last_dataset(consumer,2, "incomplete", &err); + EXIT_IF_ERROR("asapo_consumer_get_next_dataset minsize error", err); + ASSERT_EQ_INT(5,asapo_dataset_get_id(dataset),"incomplete dataset size"); + asapo_free_handle(&dataset); + +// get size + size = asapo_consumer_get_current_dataset_count(consumer,"incomplete", 1, &err); + EXIT_IF_ERROR("asapo_consumer_get_current_dataset_count", err); + ASSERT_EQ_INT(5,size,"asapo_consumer_get_current_dataset_count include incomplete"); + + size = asapo_consumer_get_current_dataset_count(consumer,"incomplete", 0, &err); + EXIT_IF_ERROR("asapo_consumer_get_current_dataset_count", err); + ASSERT_EQ_INT(0,size,"asapo_consumer_get_current_dataset_count exclude incomplete"); + + + asapo_free_handle(&err); +} + void test_single(AsapoConsumerHandle consumer, AsapoStringHandle group_id) { AsapoErrorHandle err = asapo_new_handle(); AsapoMessageMetaHandle md = asapo_new_handle(); @@ -184,8 +268,12 @@ int main(int argc, char* argv[]) { test_single(consumer,group_id); } + asapo_free_handle(&group_id); + group_id = asapo_consumer_generate_new_group_id(consumer, &err); + EXIT_IF_ERROR("create group id", err); + if (strcmp(argv[4],"dataset") == 0) { -// exit(0); + test_datasets(consumer,group_id); } asapo_free_handle(&err); -- GitLab