Skip to content
Snippets Groups Projects
Commit a186c128 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

update tests, add missing funcitons

parent ef47ea89
No related branches found
No related tags found
No related merge requests found
......@@ -86,23 +86,45 @@ extern "C" {
AsapoStreamInfoHandle asapo_producer_get_stream_info(AsapoProducerHandle producer,
const char* stream,
uint64_t timeout_ms,
AsapoErrorHandle* error);
AsapoErrorHandle* error) {
asapo::Error err;
auto result = new asapo::StreamInfo(producer->handle->GetStreamInfo(stream, timeout_ms, &err));
return handle_or_null_t(result, error, std::move(err));
}
AsapoStringHandle asapo_producer_get_stream_meta(AsapoProducerHandle producer,
const char* stream,
uint64_t timeout_ms,
AsapoErrorHandle* error);
AsapoErrorHandle* error) {
asapo::Error err;
auto result = producer->handle->GetStreamMeta(stream, timeout_ms, &err);
return handle_or_null_t(result, error, std::move(err));
}
AsapoStringHandle asapo_producer_get_beamtime_meta(AsapoProducerHandle producer,
uint64_t timeout_ms,
AsapoErrorHandle* error);
AsapoErrorHandle* error) {
asapo::Error err;
auto result = producer->handle->GetBeamtimeMeta(timeout_ms, &err);
return handle_or_null_t(result, error, std::move(err));
}
int asapo_producer_delete_stream(AsapoProducerHandle producer,
const char* stream,
uint64_t timeout_ms,
AsapoBool delete_meta,
AsapoBool error_on_not_exist,
AsapoErrorHandle* error);
AsapoErrorHandle* error) {
auto err = producer->handle->DeleteStream(stream, timeout_ms,
asapo::DeleteStreamOptions{static_cast<bool>(delete_meta),
static_cast<bool>(error_on_not_exist)});
return process_error(error, std::move(err));
}
AsapoStreamInfoHandle asapo_producer_get_last_stream(AsapoProducerHandle producer,
uint64_t timeout_ms,
AsapoErrorHandle* error);
AsapoErrorHandle* error) {
asapo::Error err;
auto result = new asapo::StreamInfo(producer->handle->GetLastStream(timeout_ms, &err));
return handle_or_null_t(result, error, std::move(err));
}
AsapoMessageHeaderHandle asapo_create_message_header(uint64_t message_id,
uint64_t data_size,
......
......@@ -31,10 +31,12 @@ void test_send(AsapoProducerHandle producer) {
data,
kDefaultIngestMode,
"default",
callback,
callback,
&err);
EXIT_IF_ERROR("error sending data", err);
asapo_producer_send_stream_finished_flag(producer,"default",1,"",NULL,&err);
EXIT_IF_ERROR("asapo_producer_send_stream_finished_flag", err);
asapo_producer_wait_requests_finished(producer,2000,&err);
EXIT_IF_ERROR("asapo_producer_wait_requests_finished", err);
......@@ -43,6 +45,54 @@ void test_send(AsapoProducerHandle producer) {
asapo_free_handle(&message_header);
}
void test_meta(AsapoProducerHandle producer) {
AsapoErrorHandle err = asapo_new_handle();
char meta[] = "{\"data\":\"test\",\"embedded\":{\"edata\":2}}";
asapo_producer_send_beamtime_metadata(producer,meta,kInsert,1,NULL,&err);
asapo_producer_wait_requests_finished(producer,5000,NULL);
AsapoStringHandle meta_received = asapo_producer_get_beamtime_meta(producer,5000, &err);
EXIT_IF_ERROR("asapo_producer_get_beamtime_meta", err);
ASSERT_EQ_STRING(meta,(const char*)asapo_string_c_str(meta_received),"returned same meta as was ingested");
asapo_producer_send_stream_metadata(producer,meta,kInsert,1,"default", NULL,&err);
asapo_producer_wait_requests_finished(producer,5000,NULL);
AsapoStringHandle stream_meta_received = asapo_producer_get_stream_meta(producer,"default",5000, &err);
EXIT_IF_ERROR("asapo_producer_send_stream_metadata", err);
ASSERT_EQ_STRING(meta,(const char*)asapo_string_c_str(stream_meta_received),"stream meta returned same meta as was ingested");
asapo_free_handle(&err);
asapo_free_handle(&meta_received);
asapo_free_handle(&stream_meta_received);
}
void test_streams(AsapoProducerHandle producer) {
AsapoErrorHandle err = asapo_new_handle();
AsapoStreamInfoHandle sinfo = asapo_producer_get_stream_info(producer,"default",2000,&err);
EXIT_IF_ERROR("asapo_producer_get_stream_info", err);
ASSERT_EQ_STRING("default",asapo_stream_info_get_name(sinfo),"stream name");
AsapoStreamInfoHandle sinfo_last = asapo_producer_get_last_stream(producer,2000,&err);
EXIT_IF_ERROR("asapo_producer_get_last_stream", err);
ASSERT_EQ_STRING("default",asapo_stream_info_get_name(sinfo_last),"stream name");
ASSERT_EQ_INT(1,(uint64_t)asapo_stream_info_get_ffinished(sinfo_last),"stream finished");
ASSERT_EQ_INT(2,asapo_stream_info_get_last_id(sinfo_last),"last id 0");
asapo_free_handle(&sinfo_last);
asapo_producer_delete_stream(producer,"default",5000,1,1,&err);
EXIT_IF_ERROR("asapo_producer_delete_stream", err);
sinfo_last = asapo_producer_get_last_stream(producer,2000,&err);
EXIT_IF_ERROR("asapo_producer_get_last_stream after deletion", err);
ASSERT_EQ_INT(0,asapo_stream_info_get_last_id(sinfo_last),"last id 0");
asapo_free_handle(&err);
asapo_free_handle(&sinfo);
asapo_free_handle(&sinfo_last);
}
int main(int argc, char* argv[]) {
if (argc <4) {
abort();
......@@ -64,6 +114,8 @@ int main(int argc, char* argv[]) {
asapo_producer_set_log_level(producer, Debug);
test_send(producer);
test_meta(producer);
test_streams(producer);
asapo_free_handle(&err);
asapo_free_handle(&cred);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment