diff --git a/producer/api/cpp/src/producer_c_glue.cpp b/producer/api/cpp/src/producer_c_glue.cpp index abb22d35c8725d2535db08cb78835eea7ce91a6e..bb7d314c05c199c79ad4bb38fe5c9fe686fe849f 100644 --- a/producer/api/cpp/src/producer_c_glue.cpp +++ b/producer/api/cpp/src/producer_c_glue.cpp @@ -86,23 +86,45 @@ extern "C" { AsapoStreamInfoHandle asapo_producer_get_stream_info(AsapoProducerHandle producer, const char* stream, uint64_t timeout_ms, - AsapoErrorHandle* error); + AsapoErrorHandle* error) { + asapo::Error err; + auto result = new asapo::StreamInfo(producer->handle->GetStreamInfo(stream, timeout_ms, &err)); + return handle_or_null_t(result, error, std::move(err)); + } AsapoStringHandle asapo_producer_get_stream_meta(AsapoProducerHandle producer, const char* stream, uint64_t timeout_ms, - AsapoErrorHandle* error); + AsapoErrorHandle* error) { + asapo::Error err; + auto result = producer->handle->GetStreamMeta(stream, timeout_ms, &err); + return handle_or_null_t(result, error, std::move(err)); + + } AsapoStringHandle asapo_producer_get_beamtime_meta(AsapoProducerHandle producer, uint64_t timeout_ms, - AsapoErrorHandle* error); + AsapoErrorHandle* error) { + asapo::Error err; + auto result = producer->handle->GetBeamtimeMeta(timeout_ms, &err); + return handle_or_null_t(result, error, std::move(err)); + } int asapo_producer_delete_stream(AsapoProducerHandle producer, const char* stream, uint64_t timeout_ms, AsapoBool delete_meta, AsapoBool error_on_not_exist, - AsapoErrorHandle* error); + AsapoErrorHandle* error) { + auto err = producer->handle->DeleteStream(stream, timeout_ms, + asapo::DeleteStreamOptions{static_cast<bool>(delete_meta), + static_cast<bool>(error_on_not_exist)}); + return process_error(error, std::move(err)); + } AsapoStreamInfoHandle asapo_producer_get_last_stream(AsapoProducerHandle producer, uint64_t timeout_ms, - AsapoErrorHandle* error); + AsapoErrorHandle* error) { + asapo::Error err; + auto result = new asapo::StreamInfo(producer->handle->GetLastStream(timeout_ms, &err)); + return handle_or_null_t(result, error, std::move(err)); + } AsapoMessageHeaderHandle asapo_create_message_header(uint64_t message_id, uint64_t data_size, diff --git a/tests/automatic/producer/c_api/producer_api.c b/tests/automatic/producer/c_api/producer_api.c index f5822e68d228bf2f375b8d965ca04dca3b13c660..2aee99a912477ba2dca2fab789d39bb2e456c936 100644 --- a/tests/automatic/producer/c_api/producer_api.c +++ b/tests/automatic/producer/c_api/producer_api.c @@ -31,10 +31,12 @@ void test_send(AsapoProducerHandle producer) { data, kDefaultIngestMode, "default", - callback, + callback, &err); EXIT_IF_ERROR("error sending data", err); + asapo_producer_send_stream_finished_flag(producer,"default",1,"",NULL,&err); + EXIT_IF_ERROR("asapo_producer_send_stream_finished_flag", err); asapo_producer_wait_requests_finished(producer,2000,&err); EXIT_IF_ERROR("asapo_producer_wait_requests_finished", err); @@ -43,6 +45,54 @@ void test_send(AsapoProducerHandle producer) { asapo_free_handle(&message_header); } +void test_meta(AsapoProducerHandle producer) { + AsapoErrorHandle err = asapo_new_handle(); + char meta[] = "{\"data\":\"test\",\"embedded\":{\"edata\":2}}"; + asapo_producer_send_beamtime_metadata(producer,meta,kInsert,1,NULL,&err); + asapo_producer_wait_requests_finished(producer,5000,NULL); + AsapoStringHandle meta_received = asapo_producer_get_beamtime_meta(producer,5000, &err); + EXIT_IF_ERROR("asapo_producer_get_beamtime_meta", err); + ASSERT_EQ_STRING(meta,(const char*)asapo_string_c_str(meta_received),"returned same meta as was ingested"); + + asapo_producer_send_stream_metadata(producer,meta,kInsert,1,"default", NULL,&err); + asapo_producer_wait_requests_finished(producer,5000,NULL); + AsapoStringHandle stream_meta_received = asapo_producer_get_stream_meta(producer,"default",5000, &err); + EXIT_IF_ERROR("asapo_producer_send_stream_metadata", err); + ASSERT_EQ_STRING(meta,(const char*)asapo_string_c_str(stream_meta_received),"stream meta returned same meta as was ingested"); + asapo_free_handle(&err); + asapo_free_handle(&meta_received); + asapo_free_handle(&stream_meta_received); +} + +void test_streams(AsapoProducerHandle producer) { + AsapoErrorHandle err = asapo_new_handle(); + AsapoStreamInfoHandle sinfo = asapo_producer_get_stream_info(producer,"default",2000,&err); + EXIT_IF_ERROR("asapo_producer_get_stream_info", err); + ASSERT_EQ_STRING("default",asapo_stream_info_get_name(sinfo),"stream name"); + + AsapoStreamInfoHandle sinfo_last = asapo_producer_get_last_stream(producer,2000,&err); + EXIT_IF_ERROR("asapo_producer_get_last_stream", err); + ASSERT_EQ_STRING("default",asapo_stream_info_get_name(sinfo_last),"stream name"); + ASSERT_EQ_INT(1,(uint64_t)asapo_stream_info_get_ffinished(sinfo_last),"stream finished"); + ASSERT_EQ_INT(2,asapo_stream_info_get_last_id(sinfo_last),"last id 0"); + + asapo_free_handle(&sinfo_last); + + asapo_producer_delete_stream(producer,"default",5000,1,1,&err); + EXIT_IF_ERROR("asapo_producer_delete_stream", err); + + sinfo_last = asapo_producer_get_last_stream(producer,2000,&err); + EXIT_IF_ERROR("asapo_producer_get_last_stream after deletion", err); + ASSERT_EQ_INT(0,asapo_stream_info_get_last_id(sinfo_last),"last id 0"); + + + asapo_free_handle(&err); + asapo_free_handle(&sinfo); + asapo_free_handle(&sinfo_last); +} + + + int main(int argc, char* argv[]) { if (argc <4) { abort(); @@ -64,6 +114,8 @@ int main(int argc, char* argv[]) { asapo_producer_set_log_level(producer, Debug); test_send(producer); + test_meta(producer); + test_streams(producer); asapo_free_handle(&err); asapo_free_handle(&cred);