diff --git a/producer/api/c/include/asapo/producer_c.h b/producer/api/c/include/asapo/producer_c.h index 190f013ec29799a152e5fafb34d6402f24e37a56..02b46d418fa9f1cc65f466bedd5614f9af0d60e5 100644 --- a/producer/api/c/include/asapo/producer_c.h +++ b/producer/api/c/include/asapo/producer_c.h @@ -14,7 +14,7 @@ typedef struct { }* AsapoMessageHeaderHandle; #endif -typedef void(*AsapoRequestCallback)(AsapoRequestCallbackPayloadHandle, AsapoErrorHandle); +typedef void(*AsapoRequestCallback)(void*, AsapoRequestCallbackPayloadHandle, AsapoErrorHandle); #define kMaxMessageSize 1024 #define kMaxVersionSize 10 #define kNCustomParams 3 diff --git a/producer/api/cpp/src/producer_c_glue.cpp b/producer/api/cpp/src/producer_c_glue.cpp index 2a4455626b8e1c143f6aff2ca583bc65e9ca71e4..ea7ed21581051b15a4153800f2514f551f31244c 100644 --- a/producer/api/cpp/src/producer_c_glue.cpp +++ b/producer/api/cpp/src/producer_c_glue.cpp @@ -119,13 +119,14 @@ extern "C" { dataset_size, auto_id != 0)); } - - - + #define BUILD_WRAPPER asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void { \ - auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload); \ + void* data = (void*) payload.data.release(); \ + auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload, false); \ auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>(err.release()); \ - callback(payLoadHandle, errorHandle); \ + if (callback != NULL) { \ + callback(data,payLoadHandle, errorHandle); \ + } \ delete errorHandle; \ delete payLoadHandle; \ } diff --git a/tests/automatic/common/cpp/include/testing_c.h b/tests/automatic/common/cpp/include/testing_c.h index 0075cd45cfa79b78287a89773c49f615cdb1175e..a0ff101d2898f50da6b56cbf3eae0649c0b02cb3 100644 --- a/tests/automatic/common/cpp/include/testing_c.h +++ b/tests/automatic/common/cpp/include/testing_c.h @@ -17,6 +17,8 @@ void assert_eq_int_(uint64_t expected, uint64_t got, const char* message, int li printf("%s: expected %llu got %llu at %d\n", message, (unsigned long long)expected, (unsigned long long)got, line); exit(EXIT_FAILURE); } + printf("asserting %s at %d - OK\n", message, line); + } void assert_eq_string_(const char* expected, const char* got, const char* message, int line) { @@ -25,6 +27,8 @@ void assert_eq_string_(const char* expected, const char* got, const char* messag printf("%s: expected %s got %s at %d\n", message, expected, got, line); exit(EXIT_FAILURE); } + printf("asserting %s at %d - OK \n", message, line); + } void assert_true_(int value, const char* message, int line) { @@ -33,6 +37,7 @@ void assert_true_(int value, const char* message, int line) { printf("%s failed at %d\n", message, line); exit(EXIT_FAILURE); } + printf("asserting %s at %d - OK \n", message, line); } void exit_if_error_(const char* error_string, const AsapoErrorHandle err, int line) { @@ -43,6 +48,7 @@ void exit_if_error_(const char* error_string, const AsapoErrorHandle err, int li printf("%s %s\n", error_string, buf); exit(EXIT_FAILURE); } + printf("asserting no error for %s at %d - OK \n", error_string, line); } diff --git a/tests/automatic/producer/c_api/check_linux.sh b/tests/automatic/producer/c_api/check_linux.sh index 895a669225ee55fe02773d800a6d76fe6821d21f..4fcd7f3c729fa68fb847b9bf572d79f660ccb605 100644 --- a/tests/automatic/producer/c_api/check_linux.sh +++ b/tests/automatic/producer/c_api/check_linux.sh @@ -23,4 +23,5 @@ mkdir -p ${receiver_folder} echo test > file1 -$@ "127.0.0.1:8400" $data_source $beamtime_id +#$@ 127.0.0.1:8400 $data_source $beamtime_id +/Users/yakubov/Projects/asapo/cmake-build-debug/tests/automatic/producer/c_api/producer_api_c 127.0.0.1:8400 c asapo_test diff --git a/tests/automatic/producer/c_api/producer_api.c b/tests/automatic/producer/c_api/producer_api.c index 80b0130db644df8a6d8a78f36a50da46a70aff79..fe88def56070c58403ce6f4ea6007293dea51b79 100644 --- a/tests/automatic/producer/c_api/producer_api.c +++ b/tests/automatic/producer/c_api/producer_api.c @@ -5,73 +5,14 @@ #include <stdio.h> #include <stdlib.h> - -/* -void TestMeta(const std::unique_ptr<asapo::Producer>& producer) { - asapo::Error err; - std::string meta = R"({"data":"test","embedded":{"edata":2}})"; - producer->SendBeamtimeMetadata(meta, asapo::MetaIngestMode{asapo::MetaIngestOp::kInsert, true}, nullptr); - producer->WaitRequestsFinished(5000); - auto meta_received = producer->GetBeamtimeMeta(5000, &err); - M_AssertTrue(meta_received == meta); - std::string meta_update = R"({"embedded":{"edata":3}})"; - std::string meta_updated = R"({"data":"test","embedded":{"edata":3}})"; - producer->SendBeamtimeMetadata(meta_update, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, false}, nullptr); - producer->WaitRequestsFinished(5000); - meta_received = producer->GetBeamtimeMeta(5000, &err); - M_AssertTrue(meta_received == meta_updated); -} - - -void Test(const std::unique_ptr<asapo::Producer>& producer) { - asapo::MessageMeta fi; - asapo::Error err; - - std::string client, server; - bool supported; - err = producer->GetVersionInfo(&client, &server, &supported); - M_AssertTrue(err == nullptr, "Version OK"); - M_AssertTrue(supported, "client supported by server"); - M_AssertTrue(!client.empty(), "client version"); - M_AssertTrue(!server.empty(), "server version"); - - - TestMeta(producer); - - producer->GetStreamInfo("default", 5000, &err); - if (err) { - printf("%s\n", err->Explain().c_str()); - } - M_AssertTrue(err == nullptr, "stream info"); - -} - - -std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) { - asapo::Error err; - auto producer = asapo::Producer::Create(args.server, 2, - asapo::RequestHandlerType::kTcp, - asapo::SourceCredentials{asapo::SourceType::kProcessed, args.beamtime, - "", args.source, ""}, 60000, &err); - if (err) { - std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; - exit(EXIT_FAILURE); - } - - producer->EnableLocalLog(true); - producer->SetLogLevel(asapo::LogLevel::Debug); - return producer; -} - - */ - -void callback(AsapoRequestCallbackPayloadHandle payload, AsapoErrorHandle error) { +void callback(void* original_data, AsapoRequestCallbackPayloadHandle payload, AsapoErrorHandle error) { EXIT_IF_ERROR("error after callback", error); AsapoMessageDataHandle data_handle = asapo_request_callback_payload_get_data(payload); AsapoStringHandle response = asapo_request_callback_payload_get_response(payload); const struct AsapoGenericRequestHeader* header = asapo_request_callback_payload_get_original_header(payload); - printf("%d\n",(int)header->data_id); + ASSERT_EQ_INT(1,header->data_id,"data id"); + ASSERT_EQ_STRING("hello",(const char*)original_data,"data in payload"); asapo_free_handle(&data_handle); asapo_free_handle(&response); @@ -79,17 +20,27 @@ void callback(AsapoRequestCallbackPayloadHandle payload, AsapoErrorHandle error) void test_send(AsapoProducerHandle producer) { AsapoErrorHandle err = asapo_new_handle(); - AsapoMessageHeaderHandle message_header = NULL; - char data[] = "hello"; + AsapoMessageHeaderHandle message_header = asapo_create_message_header(1,6, + "processed/test","",0,0,0); + char* data = ( char* )malloc( 6 *sizeof( char ) ); + strcpy(data,"hello"); + asapo_producer_send(producer, message_header, data, kDefaultIngestMode, "default", - callback, + callback, &err); + EXIT_IF_ERROR("error sending data", err); + + asapo_producer_wait_requests_finished(producer,2000,&err); + EXIT_IF_ERROR("asapo_producer_wait_requests_finished", err); + + asapo_free_handle(&err); + asapo_free_handle(&message_header); } int main(int argc, char* argv[]) { @@ -106,7 +57,7 @@ int main(int argc, char* argv[]) { beamtime, "", source, ""); - AsapoProducerHandle producer = asapo_create_producer(endpoint,2,kTcp, cred,60000,&err); + AsapoProducerHandle producer = asapo_create_producer(endpoint,3,kTcp, cred,60000,&err); EXIT_IF_ERROR("create producer", err); asapo_producer_enable_local_log(producer, 1);