From 3995791191a3dc821d4125c401f9ed76df45f560 Mon Sep 17 00:00:00 2001
From: George Sedov <george.sedov@desy.de>
Date: Tue, 28 Dec 2021 18:30:13 +0100
Subject: [PATCH] correctly pass the nomad var to integration test

---
 config/nomad/receiver_kafka.nmd.in                       | 9 ++++++++-
 receiver/src/receiver_config.cpp                         | 6 +++---
 .../transfer_single_file_with_kafka/check_linux.sh       | 4 +---
 .../transfer_single_file_with_kafka/kafka_mock.cpp       | 5 ++---
 tests/automatic/settings/receiver_kafka.json.tpl.lin.in  | 2 +-
 5 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/config/nomad/receiver_kafka.nmd.in b/config/nomad/receiver_kafka.nmd.in
index 1bdc21de5..aa397aceb 100644
--- a/config/nomad/receiver_kafka.nmd.in
+++ b/config/nomad/receiver_kafka.nmd.in
@@ -1,3 +1,7 @@
+variable "receiver_kafka_metadata_broker_list" {
+    type = string
+}
+
 job "receiver" {
   datacenters = ["dc1"]
 
@@ -36,13 +40,16 @@ job "receiver" {
         }
       }
 
+      meta {
+        receiver_kafka_metadata_broker_list = "${var.receiver_kafka_metadata_broker_list}"
+      }
+
       template {
          source        = "@WORK_DIR@/receiver_kafka.json.tpl"
          destination   = "local/receiver.json"
          change_mode   = "signal"
          change_signal = "SIGHUP"
       }
-
     }
   }
 }
diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp
index ae62336fc..3c162e6ce 100644
--- a/receiver/src/receiver_config.cpp
+++ b/receiver/src/receiver_config.cpp
@@ -48,8 +48,8 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
 
     if (kafkaEnabled) {
         // read the configuration only if kafka is enabled. empty configuration means "disabled"
-        (err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) ||
-        (err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics));
+        (err = parser.Embedded("Kafka").GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) ||
+        (err = parser.Embedded("Kafka").GetArrayObjectMembers("KafkaTopics", &kafkaTopics));
 
         if (err) {
             return err;
@@ -57,7 +57,7 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) {
 
         for(const auto& topic : kafkaTopics) {
             auto topicConfig = config.kafka_config.topics_config[topic];
-            err = parser.Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig);
+            err = parser.Embedded("Kafka").Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig);
             if (err) {
                 return err;
             }
diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh
index 2c0ff36ba..4782af1e8 100644
--- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh
+++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh
@@ -46,10 +46,8 @@ BOOTSTRAP=$(cat bootstrap)
 
 echo "Read kafka bootstrap: ${BOOTSTRAP}"
 
-export NOMAD_ADDR_bootstrap=${BOOTSTRAP}
-
 nomad stop receiver
-nomad run receiver_kafka.nmd
+nomad run -var receiver_kafka_metadata_broker_list="${BOOTSTRAP}" receiver_kafka.nmd
 while true
 do
   sleep 1
diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp
index 4f50d280f..6fe0909ab 100644
--- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp
+++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp
@@ -58,12 +58,11 @@ int main(/*int argc, char** argv*/) {
         fprintf(stderr, "Failed to create kafka consumer: %s\n", errstr);
         return EXIT_FAILURE;
     }
-    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 10000);
+    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 30000);
 
     if(!rkmessage) {
         fprintf(stderr, "No kafka message received\n");
-        //return EXIT_FAILURE;
-        return EXIT_SUCCESS;
+        return EXIT_FAILURE;
     } else {
         fprintf(stderr, "Got message: err=%d, size=%ld\n", rkmessage->err, rkmessage->len);
     }
diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in
index 51eb00de6..9affd4377 100644
--- a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in
+++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in
@@ -28,7 +28,7 @@
   "Kafka" : {
     "Enabled" : true,
     "KafkaClient": {
-      "metadata.broker.list": "{{ env "NOMAD_ADDR_bootstrap" }}"
+      "metadata.broker.list": "{{ env "NOMAD_META_receiver_kafka_metadata_broker_list" }}"
     },
     "KafkaTopics": {
       "asapo": {}
-- 
GitLab