From 56617e70661250814d092ebb2779baa9eb023c95 Mon Sep 17 00:00:00 2001 From: George Sedov <george.sedov@desy.de> Date: Wed, 29 Dec 2021 17:16:59 +0100 Subject: [PATCH] another fix to kafka integration test --- config/nomad/receiver_kafka.nmd.in | 9 +++- receiver/CMakeLists.txt | 1 + .../check_linux.sh | 8 ++-- .../kafka_mock.cpp | 48 ++++++++++++------- .../settings/receiver_kafka.json.tpl.lin.in | 2 +- 5 files changed, 45 insertions(+), 23 deletions(-) diff --git a/config/nomad/receiver_kafka.nmd.in b/config/nomad/receiver_kafka.nmd.in index aa397aceb..71bfe537d 100644 --- a/config/nomad/receiver_kafka.nmd.in +++ b/config/nomad/receiver_kafka.nmd.in @@ -32,12 +32,17 @@ job "receiver" { name = "asapo-receiver" port = "recv" check { - name = "alive" - type = "tcp" + name = "metrics" + type = "http" + port = "recv_metrics" + path = "/metrics" interval = "10s" timeout = "2s" initial_status = "passing" } + meta { + metrics-port = "${NOMAD_PORT_recv_metrics}" + } } meta { diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index a6492e0c0..92e0b87ca 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -83,6 +83,7 @@ endif() configure_file(docker/Dockerfile . COPYONLY) configure_file(docker/install_libfabric.sh . COPYONLY) +configure_file(docker/install_rdkafka.sh . COPYONLY) ################################ diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh index 4782af1e8..51a93d0ba 100644 --- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh @@ -30,13 +30,15 @@ Cleanup() { influx -database ${database_name} -execute "drop series from statistics, RequestsRate" } -./transfer-single-file_kafka & KAFKA_PID=$! +rm -f bootstrap + +./transfer-single-file_kafka processed/1 & KAFKA_PID=$! echo "Started the kafka listener" while [ ! -f bootstrap ]; do if ! kill -0 $KAFKA_PID > /dev/null 2>&1; then - # listener exited preliminary, i.e. some error + echo Kafka listener exited unexpectedly exit 1 fi sleep 1 @@ -62,8 +64,6 @@ $1 localhost:8400 ${beamtime_id} 100 1 1 0 30 ls -ln ${receiver_folder}/processed/1 | awk '{ print $5 }'| grep 100000 -$1 localhost:8400 wrong_beamtime_id 100 1 1 0 1 2>&1 | tee /dev/stderr | grep "authorization" - wait $KAFKA_PID RESULT=$? diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp index 6fe0909ab..b2cf96dce 100644 --- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp @@ -1,12 +1,15 @@ #include <stdlib.h> +#include <string.h> #include "librdkafka/rdkafka.h" #include "librdkafka/rdkafka_mock.h" -int main(/*int argc, char** argv*/) { - rd_kafka_t *rkproducer, *rkconsumer; +int main(int argc, char *argv[]) { + + char *expectedmsg; + asprintf(&expectedmsg, "{\"event\":\"IN_CLOSE_WRITE\",\"path\":\"%s\"}", argc > 1 ? argv[argc - 1] : "processed/1"); + rd_kafka_conf_t *conf = rd_kafka_conf_new(); - rd_kafka_mock_cluster_t *mcluster; char errstr[256]; if(rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { @@ -18,13 +21,13 @@ int main(/*int argc, char** argv*/) { return EXIT_FAILURE; }*/ - rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + rd_kafka_t *rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if(!rkproducer) { fprintf(stderr, "Failed to create kafka producer: %s\n", errstr); return EXIT_FAILURE; } - mcluster = rd_kafka_mock_cluster_new(rkproducer, 1); + rd_kafka_mock_cluster_t *mcluster = rd_kafka_mock_cluster_new(rkproducer, 1); if(!mcluster) { fprintf(stderr, "Failed to create kafka cluster: %s\n", errstr); return EXIT_FAILURE; @@ -32,8 +35,6 @@ int main(/*int argc, char** argv*/) { const char* bootstrap = rd_kafka_mock_cluster_bootstraps(mcluster); - fprintf(stdout, "Bootstrap for kafka cluster: %s\n", bootstrap); - FILE *fp = fopen("bootstrap", "w"); fprintf(fp, "%s", bootstrap); fclose(fp); @@ -52,25 +53,40 @@ int main(/*int argc, char** argv*/) { return EXIT_FAILURE; } - rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + rd_kafka_t *rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); if(!rkconsumer) { fprintf(stderr, "Failed to create kafka consumer: %s\n", errstr); return EXIT_FAILURE; } + + rd_kafka_poll_set_consumer(rkconsumer); + rd_kafka_topic_partition_list_t* subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, "asapo", RD_KAFKA_PARTITION_UA); + rd_kafka_resp_err_t err = rd_kafka_subscribe(rkconsumer, subscription); + if (err) { + fprintf(stderr, "Failed to subscribe to topic: %s\n", rd_kafka_err2str(err)); + return EXIT_FAILURE; + } + + rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 30000); if(!rkmessage) { fprintf(stderr, "No kafka message received\n"); return EXIT_FAILURE; } else { - fprintf(stderr, "Got message: err=%d, size=%ld\n", rkmessage->err, rkmessage->len); + if (rkmessage->err) { + fprintf(stderr, "Got error: %s\n", rd_kafka_message_errstr(rkmessage)); + return EXIT_FAILURE; + } else { + if (strncmp((const char *)rkmessage->payload, expectedmsg, rkmessage->len)) { + fprintf(stdout, "Kafka message is correct: %.*s\n", (int)rkmessage->len, (const char *)rkmessage->payload); + return EXIT_SUCCESS; + } else { + fprintf(stderr, "Kafka message is incorrect: %.*s\n", (int)rkmessage->len, (const char *)rkmessage->payload); + return EXIT_FAILURE; + } + } } - - rd_kafka_message_destroy(rkmessage); - - rd_kafka_mock_cluster_destroy(mcluster); - rd_kafka_destroy(rkproducer); - rd_kafka_destroy(rkconsumer); - return EXIT_SUCCESS; } diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in index 9affd4377..f9d0fae47 100644 --- a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in @@ -8,7 +8,7 @@ "AdvertiseURI": "127.0.0.1:{{ env "NOMAD_PORT_recv_ds" }}", "NThreads": 2, "ListenPort": {{ env "NOMAD_PORT_recv_ds" }}, - "NetworkMode": ["fabric"] + "NetworkMode": ["tcp"] }, "Metrics": { "Expose": true, -- GitLab