Skip to content
Snippets Groups Projects
Commit de7dff8f authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

fix memleak, test

parent a61ca2ad
No related branches found
No related tags found
No related merge requests found
......@@ -14,7 +14,7 @@ typedef struct {
}* AsapoMessageHeaderHandle;
#endif
typedef void(*AsapoRequestCallback)(AsapoRequestCallbackPayloadHandle, AsapoErrorHandle);
typedef void(*AsapoRequestCallback)(void*, AsapoRequestCallbackPayloadHandle, AsapoErrorHandle);
#define kMaxMessageSize 1024
#define kMaxVersionSize 10
#define kNCustomParams 3
......
......@@ -119,13 +119,14 @@ extern "C" {
dataset_size,
auto_id != 0));
}
#define BUILD_WRAPPER asapo::RequestCallback wrapper = [ = ](asapo::RequestCallbackPayload payload, asapo::Error err) -> void { \
auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload); \
void* data = (void*) payload.data.release(); \
auto payLoadHandle = new AsapoHandlerHolder<asapo::RequestCallbackPayload>(&payload, false); \
auto errorHandle = new AsapoHandlerHolder<asapo::ErrorInterface>(err.release()); \
callback(payLoadHandle, errorHandle); \
if (callback != NULL) { \
callback(data,payLoadHandle, errorHandle); \
} \
delete errorHandle; \
delete payLoadHandle; \
}
......
......@@ -17,6 +17,8 @@ void assert_eq_int_(uint64_t expected, uint64_t got, const char* message, int li
printf("%s: expected %llu got %llu at %d\n", message, (unsigned long long)expected, (unsigned long long)got, line);
exit(EXIT_FAILURE);
}
printf("asserting %s at %d - OK\n", message, line);
}
void assert_eq_string_(const char* expected, const char* got, const char* message, int line) {
......@@ -25,6 +27,8 @@ void assert_eq_string_(const char* expected, const char* got, const char* messag
printf("%s: expected %s got %s at %d\n", message, expected, got, line);
exit(EXIT_FAILURE);
}
printf("asserting %s at %d - OK \n", message, line);
}
void assert_true_(int value, const char* message, int line) {
......@@ -33,6 +37,7 @@ void assert_true_(int value, const char* message, int line) {
printf("%s failed at %d\n", message, line);
exit(EXIT_FAILURE);
}
printf("asserting %s at %d - OK \n", message, line);
}
void exit_if_error_(const char* error_string, const AsapoErrorHandle err, int line) {
......@@ -43,6 +48,7 @@ void exit_if_error_(const char* error_string, const AsapoErrorHandle err, int li
printf("%s %s\n", error_string, buf);
exit(EXIT_FAILURE);
}
printf("asserting no error for %s at %d - OK \n", error_string, line);
}
......
......@@ -23,4 +23,5 @@ mkdir -p ${receiver_folder}
echo test > file1
$@ "127.0.0.1:8400" $data_source $beamtime_id
#$@ 127.0.0.1:8400 $data_source $beamtime_id
/Users/yakubov/Projects/asapo/cmake-build-debug/tests/automatic/producer/c_api/producer_api_c 127.0.0.1:8400 c asapo_test
......@@ -5,73 +5,14 @@
#include <stdio.h>
#include <stdlib.h>
/*
void TestMeta(const std::unique_ptr<asapo::Producer>& producer) {
asapo::Error err;
std::string meta = R"({"data":"test","embedded":{"edata":2}})";
producer->SendBeamtimeMetadata(meta, asapo::MetaIngestMode{asapo::MetaIngestOp::kInsert, true}, nullptr);
producer->WaitRequestsFinished(5000);
auto meta_received = producer->GetBeamtimeMeta(5000, &err);
M_AssertTrue(meta_received == meta);
std::string meta_update = R"({"embedded":{"edata":3}})";
std::string meta_updated = R"({"data":"test","embedded":{"edata":3}})";
producer->SendBeamtimeMetadata(meta_update, asapo::MetaIngestMode{asapo::MetaIngestOp::kUpdate, false}, nullptr);
producer->WaitRequestsFinished(5000);
meta_received = producer->GetBeamtimeMeta(5000, &err);
M_AssertTrue(meta_received == meta_updated);
}
void Test(const std::unique_ptr<asapo::Producer>& producer) {
asapo::MessageMeta fi;
asapo::Error err;
std::string client, server;
bool supported;
err = producer->GetVersionInfo(&client, &server, &supported);
M_AssertTrue(err == nullptr, "Version OK");
M_AssertTrue(supported, "client supported by server");
M_AssertTrue(!client.empty(), "client version");
M_AssertTrue(!server.empty(), "server version");
TestMeta(producer);
producer->GetStreamInfo("default", 5000, &err);
if (err) {
printf("%s\n", err->Explain().c_str());
}
M_AssertTrue(err == nullptr, "stream info");
}
std::unique_ptr<asapo::Producer> CreateProducer(const Args& args) {
asapo::Error err;
auto producer = asapo::Producer::Create(args.server, 2,
asapo::RequestHandlerType::kTcp,
asapo::SourceCredentials{asapo::SourceType::kProcessed, args.beamtime,
"", args.source, ""}, 60000, &err);
if (err) {
std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
exit(EXIT_FAILURE);
}
producer->EnableLocalLog(true);
producer->SetLogLevel(asapo::LogLevel::Debug);
return producer;
}
*/
void callback(AsapoRequestCallbackPayloadHandle payload, AsapoErrorHandle error) {
void callback(void* original_data, AsapoRequestCallbackPayloadHandle payload, AsapoErrorHandle error) {
EXIT_IF_ERROR("error after callback", error);
AsapoMessageDataHandle data_handle = asapo_request_callback_payload_get_data(payload);
AsapoStringHandle response = asapo_request_callback_payload_get_response(payload);
const struct AsapoGenericRequestHeader* header = asapo_request_callback_payload_get_original_header(payload);
printf("%d\n",(int)header->data_id);
ASSERT_EQ_INT(1,header->data_id,"data id");
ASSERT_EQ_STRING("hello",(const char*)original_data,"data in payload");
asapo_free_handle(&data_handle);
asapo_free_handle(&response);
......@@ -79,17 +20,27 @@ void callback(AsapoRequestCallbackPayloadHandle payload, AsapoErrorHandle error)
void test_send(AsapoProducerHandle producer) {
AsapoErrorHandle err = asapo_new_handle();
AsapoMessageHeaderHandle message_header = NULL;
char data[] = "hello";
AsapoMessageHeaderHandle message_header = asapo_create_message_header(1,6,
"processed/test","",0,0,0);
char* data = ( char* )malloc( 6 *sizeof( char ) );
strcpy(data,"hello");
asapo_producer_send(producer,
message_header,
data,
kDefaultIngestMode,
"default",
callback,
callback,
&err);
EXIT_IF_ERROR("error sending data", err);
asapo_producer_wait_requests_finished(producer,2000,&err);
EXIT_IF_ERROR("asapo_producer_wait_requests_finished", err);
asapo_free_handle(&err);
asapo_free_handle(&message_header);
}
int main(int argc, char* argv[]) {
......@@ -106,7 +57,7 @@ int main(int argc, char* argv[]) {
beamtime,
"", source, "");
AsapoProducerHandle producer = asapo_create_producer(endpoint,2,kTcp, cred,60000,&err);
AsapoProducerHandle producer = asapo_create_producer(endpoint,3,kTcp, cred,60000,&err);
EXIT_IF_ERROR("create producer", err);
asapo_producer_enable_local_log(producer, 1);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment