diff --git a/config/nomad/receiver_kafka.nmd.in b/config/nomad/receiver_kafka.nmd.in index aa397aceb54985a89ef511998021d0473ef064eb..71bfe537dda1dc88d59ce61817618102e75291fc 100644 --- a/config/nomad/receiver_kafka.nmd.in +++ b/config/nomad/receiver_kafka.nmd.in @@ -32,12 +32,17 @@ job "receiver" { name = "asapo-receiver" port = "recv" check { - name = "alive" - type = "tcp" + name = "metrics" + type = "http" + port = "recv_metrics" + path = "/metrics" interval = "10s" timeout = "2s" initial_status = "passing" } + meta { + metrics-port = "${NOMAD_PORT_recv_metrics}" + } } meta { diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index a6492e0c02e9174bcc2053e89aceb10aacb2534d..92e0b87ca8180b53ac70f1c44e8b5caa0646803f 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -83,6 +83,7 @@ endif() configure_file(docker/Dockerfile . COPYONLY) configure_file(docker/install_libfabric.sh . COPYONLY) +configure_file(docker/install_rdkafka.sh . COPYONLY) ################################ diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh index 4782af1e885d85ff3e035d468544e1816c79ea52..51a93d0ba23c57c5d5245357dec3d5d16a906033 100644 --- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh @@ -30,13 +30,15 @@ Cleanup() { influx -database ${database_name} -execute "drop series from statistics, RequestsRate" } -./transfer-single-file_kafka & KAFKA_PID=$! +rm -f bootstrap + +./transfer-single-file_kafka processed/1 & KAFKA_PID=$! echo "Started the kafka listener" while [ ! -f bootstrap ]; do if ! kill -0 $KAFKA_PID > /dev/null 2>&1; then - # listener exited preliminary, i.e. some error + echo Kafka listener exited unexpectedly exit 1 fi sleep 1 @@ -62,8 +64,6 @@ $1 localhost:8400 ${beamtime_id} 100 1 1 0 30 ls -ln ${receiver_folder}/processed/1 | awk '{ print $5 }'| grep 100000 -$1 localhost:8400 wrong_beamtime_id 100 1 1 0 1 2>&1 | tee /dev/stderr | grep "authorization" - wait $KAFKA_PID RESULT=$? diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp index 6fe0909ab3cabd49828e02080f787ee98aa74d06..b2cf96dcef537179dffb475bfbcde66ca8670cf5 100644 --- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp @@ -1,12 +1,15 @@ #include <stdlib.h> +#include <string.h> #include "librdkafka/rdkafka.h" #include "librdkafka/rdkafka_mock.h" -int main(/*int argc, char** argv*/) { - rd_kafka_t *rkproducer, *rkconsumer; +int main(int argc, char *argv[]) { + + char *expectedmsg; + asprintf(&expectedmsg, "{\"event\":\"IN_CLOSE_WRITE\",\"path\":\"%s\"}", argc > 1 ? argv[argc - 1] : "processed/1"); + rd_kafka_conf_t *conf = rd_kafka_conf_new(); - rd_kafka_mock_cluster_t *mcluster; char errstr[256]; if(rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { @@ -18,13 +21,13 @@ int main(/*int argc, char** argv*/) { return EXIT_FAILURE; }*/ - rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + rd_kafka_t *rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if(!rkproducer) { fprintf(stderr, "Failed to create kafka producer: %s\n", errstr); return EXIT_FAILURE; } - mcluster = rd_kafka_mock_cluster_new(rkproducer, 1); + rd_kafka_mock_cluster_t *mcluster = rd_kafka_mock_cluster_new(rkproducer, 1); if(!mcluster) { fprintf(stderr, "Failed to create kafka cluster: %s\n", errstr); return EXIT_FAILURE; @@ -32,8 +35,6 @@ int main(/*int argc, char** argv*/) { const char* bootstrap = rd_kafka_mock_cluster_bootstraps(mcluster); - fprintf(stdout, "Bootstrap for kafka cluster: %s\n", bootstrap); - FILE *fp = fopen("bootstrap", "w"); fprintf(fp, "%s", bootstrap); fclose(fp); @@ -52,25 +53,40 @@ int main(/*int argc, char** argv*/) { return EXIT_FAILURE; } - rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + rd_kafka_t *rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); if(!rkconsumer) { fprintf(stderr, "Failed to create kafka consumer: %s\n", errstr); return EXIT_FAILURE; } + + rd_kafka_poll_set_consumer(rkconsumer); + rd_kafka_topic_partition_list_t* subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, "asapo", RD_KAFKA_PARTITION_UA); + rd_kafka_resp_err_t err = rd_kafka_subscribe(rkconsumer, subscription); + if (err) { + fprintf(stderr, "Failed to subscribe to topic: %s\n", rd_kafka_err2str(err)); + return EXIT_FAILURE; + } + + rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 30000); if(!rkmessage) { fprintf(stderr, "No kafka message received\n"); return EXIT_FAILURE; } else { - fprintf(stderr, "Got message: err=%d, size=%ld\n", rkmessage->err, rkmessage->len); + if (rkmessage->err) { + fprintf(stderr, "Got error: %s\n", rd_kafka_message_errstr(rkmessage)); + return EXIT_FAILURE; + } else { + if (strncmp((const char *)rkmessage->payload, expectedmsg, rkmessage->len)) { + fprintf(stdout, "Kafka message is correct: %.*s\n", (int)rkmessage->len, (const char *)rkmessage->payload); + return EXIT_SUCCESS; + } else { + fprintf(stderr, "Kafka message is incorrect: %.*s\n", (int)rkmessage->len, (const char *)rkmessage->payload); + return EXIT_FAILURE; + } + } } - - rd_kafka_message_destroy(rkmessage); - - rd_kafka_mock_cluster_destroy(mcluster); - rd_kafka_destroy(rkproducer); - rd_kafka_destroy(rkconsumer); - return EXIT_SUCCESS; } diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in index 9affd4377cd0af9947049b636b04672af9f2dc2a..f9d0fae47e197697f9174e37b2dfb87ce634e7ba 100644 --- a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in @@ -8,7 +8,7 @@ "AdvertiseURI": "127.0.0.1:{{ env "NOMAD_PORT_recv_ds" }}", "NThreads": 2, "ListenPort": {{ env "NOMAD_PORT_recv_ds" }}, - "NetworkMode": ["fabric"] + "NetworkMode": ["tcp"] }, "Metrics": { "Expose": true,