Skip to content
Snippets Groups Projects
Commit 56617e70 authored by George Sedov's avatar George Sedov
Browse files

another fix to kafka integration test

parent 39957911
No related branches found
No related tags found
No related merge requests found
......@@ -32,12 +32,17 @@ job "receiver" {
name = "asapo-receiver"
port = "recv"
check {
name = "alive"
type = "tcp"
name = "metrics"
type = "http"
port = "recv_metrics"
path = "/metrics"
interval = "10s"
timeout = "2s"
initial_status = "passing"
}
meta {
metrics-port = "${NOMAD_PORT_recv_metrics}"
}
}
meta {
......
......@@ -83,6 +83,7 @@ endif()
configure_file(docker/Dockerfile . COPYONLY)
configure_file(docker/install_libfabric.sh . COPYONLY)
configure_file(docker/install_rdkafka.sh . COPYONLY)
################################
......
......@@ -30,13 +30,15 @@ Cleanup() {
influx -database ${database_name} -execute "drop series from statistics, RequestsRate"
}
./transfer-single-file_kafka & KAFKA_PID=$!
rm -f bootstrap
./transfer-single-file_kafka processed/1 & KAFKA_PID=$!
echo "Started the kafka listener"
while [ ! -f bootstrap ]; do
if ! kill -0 $KAFKA_PID > /dev/null 2>&1; then
# listener exited preliminary, i.e. some error
echo Kafka listener exited unexpectedly
exit 1
fi
sleep 1
......@@ -62,8 +64,6 @@ $1 localhost:8400 ${beamtime_id} 100 1 1 0 30
ls -ln ${receiver_folder}/processed/1 | awk '{ print $5 }'| grep 100000
$1 localhost:8400 wrong_beamtime_id 100 1 1 0 1 2>&1 | tee /dev/stderr | grep "authorization"
wait $KAFKA_PID
RESULT=$?
......
#include <stdlib.h>
#include <string.h>
#include "librdkafka/rdkafka.h"
#include "librdkafka/rdkafka_mock.h"
int main(/*int argc, char** argv*/) {
rd_kafka_t *rkproducer, *rkconsumer;
int main(int argc, char *argv[]) {
char *expectedmsg;
asprintf(&expectedmsg, "{\"event\":\"IN_CLOSE_WRITE\",\"path\":\"%s\"}", argc > 1 ? argv[argc - 1] : "processed/1");
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_mock_cluster_t *mcluster;
char errstr[256];
if(rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
......@@ -18,13 +21,13 @@ int main(/*int argc, char** argv*/) {
return EXIT_FAILURE;
}*/
rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
rd_kafka_t *rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if(!rkproducer) {
fprintf(stderr, "Failed to create kafka producer: %s\n", errstr);
return EXIT_FAILURE;
}
mcluster = rd_kafka_mock_cluster_new(rkproducer, 1);
rd_kafka_mock_cluster_t *mcluster = rd_kafka_mock_cluster_new(rkproducer, 1);
if(!mcluster) {
fprintf(stderr, "Failed to create kafka cluster: %s\n", errstr);
return EXIT_FAILURE;
......@@ -32,8 +35,6 @@ int main(/*int argc, char** argv*/) {
const char* bootstrap = rd_kafka_mock_cluster_bootstraps(mcluster);
fprintf(stdout, "Bootstrap for kafka cluster: %s\n", bootstrap);
FILE *fp = fopen("bootstrap", "w");
fprintf(fp, "%s", bootstrap);
fclose(fp);
......@@ -52,25 +53,40 @@ int main(/*int argc, char** argv*/) {
return EXIT_FAILURE;
}
rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
rd_kafka_t *rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if(!rkconsumer) {
fprintf(stderr, "Failed to create kafka consumer: %s\n", errstr);
return EXIT_FAILURE;
}
rd_kafka_poll_set_consumer(rkconsumer);
rd_kafka_topic_partition_list_t* subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, "asapo", RD_KAFKA_PARTITION_UA);
rd_kafka_resp_err_t err = rd_kafka_subscribe(rkconsumer, subscription);
if (err) {
fprintf(stderr, "Failed to subscribe to topic: %s\n", rd_kafka_err2str(err));
return EXIT_FAILURE;
}
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 30000);
if(!rkmessage) {
fprintf(stderr, "No kafka message received\n");
return EXIT_FAILURE;
} else {
fprintf(stderr, "Got message: err=%d, size=%ld\n", rkmessage->err, rkmessage->len);
if (rkmessage->err) {
fprintf(stderr, "Got error: %s\n", rd_kafka_message_errstr(rkmessage));
return EXIT_FAILURE;
} else {
if (strncmp((const char *)rkmessage->payload, expectedmsg, rkmessage->len)) {
fprintf(stdout, "Kafka message is correct: %.*s\n", (int)rkmessage->len, (const char *)rkmessage->payload);
return EXIT_SUCCESS;
} else {
fprintf(stderr, "Kafka message is incorrect: %.*s\n", (int)rkmessage->len, (const char *)rkmessage->payload);
return EXIT_FAILURE;
}
}
}
rd_kafka_message_destroy(rkmessage);
rd_kafka_mock_cluster_destroy(mcluster);
rd_kafka_destroy(rkproducer);
rd_kafka_destroy(rkconsumer);
return EXIT_SUCCESS;
}
......@@ -8,7 +8,7 @@
"AdvertiseURI": "127.0.0.1:{{ env "NOMAD_PORT_recv_ds" }}",
"NThreads": 2,
"ListenPort": {{ env "NOMAD_PORT_recv_ds" }},
"NetworkMode": ["fabric"]
"NetworkMode": ["tcp"]
},
"Metrics": {
"Expose": true,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment