diff --git a/config/nomad/receiver_kafka.nmd.in b/config/nomad/receiver_kafka.nmd.in index 1bdc21de51e34cbbb3f4515eabaabd92e7e5c6fa..aa397aceb54985a89ef511998021d0473ef064eb 100644 --- a/config/nomad/receiver_kafka.nmd.in +++ b/config/nomad/receiver_kafka.nmd.in @@ -1,3 +1,7 @@ +variable "receiver_kafka_metadata_broker_list" { + type = string +} + job "receiver" { datacenters = ["dc1"] @@ -36,13 +40,16 @@ job "receiver" { } } + meta { + receiver_kafka_metadata_broker_list = "${var.receiver_kafka_metadata_broker_list}" + } + template { source = "@WORK_DIR@/receiver_kafka.json.tpl" destination = "local/receiver.json" change_mode = "signal" change_signal = "SIGHUP" } - } } } diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index ae62336fc5405b2b98f6058f52134089bf87a37a..3c162e6ced92e7d4272c781817ae40f2f6c535be 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -48,8 +48,8 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { if (kafkaEnabled) { // read the configuration only if kafka is enabled. empty configuration means "disabled" - (err = parser.GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) || - (err = parser.GetArrayObjectMembers("KafkaTopics", &kafkaTopics)); + (err = parser.Embedded("Kafka").GetDictionaryString("KafkaClient", &config.kafka_config.global_config)) || + (err = parser.Embedded("Kafka").GetArrayObjectMembers("KafkaTopics", &kafkaTopics)); if (err) { return err; @@ -57,7 +57,7 @@ Error ReceiverConfigManager::ReadConfigFromFile(std::string file_name) { for(const auto& topic : kafkaTopics) { auto topicConfig = config.kafka_config.topics_config[topic]; - err = parser.Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig); + err = parser.Embedded("Kafka").Embedded("KafkaTopics").GetDictionaryString(topic, &topicConfig); if (err) { return err; } diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh index 2c0ff36bac47cc47057ab47268838ce93eb65b0c..4782af1e885d85ff3e035d468544e1816c79ea52 100644 --- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh @@ -46,10 +46,8 @@ BOOTSTRAP=$(cat bootstrap) echo "Read kafka bootstrap: ${BOOTSTRAP}" -export NOMAD_ADDR_bootstrap=${BOOTSTRAP} - nomad stop receiver -nomad run receiver_kafka.nmd +nomad run -var receiver_kafka_metadata_broker_list="${BOOTSTRAP}" receiver_kafka.nmd while true do sleep 1 diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp index 4f50d280f2ba6ad269590653e6bef46febbaac5f..6fe0909ab3cabd49828e02080f787ee98aa74d06 100644 --- a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp @@ -58,12 +58,11 @@ int main(/*int argc, char** argv*/) { fprintf(stderr, "Failed to create kafka consumer: %s\n", errstr); return EXIT_FAILURE; } - rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 10000); + rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 30000); if(!rkmessage) { fprintf(stderr, "No kafka message received\n"); - //return EXIT_FAILURE; - return EXIT_SUCCESS; + return EXIT_FAILURE; } else { fprintf(stderr, "Got message: err=%d, size=%ld\n", rkmessage->err, rkmessage->len); } diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in index 51eb00de61606112fc3cbb4d83930778fffe8ebe..9affd4377cd0af9947049b636b04672af9f2dc2a 100644 --- a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in +++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in @@ -28,7 +28,7 @@ "Kafka" : { "Enabled" : true, "KafkaClient": { - "metadata.broker.list": "{{ env "NOMAD_ADDR_bootstrap" }}" + "metadata.broker.list": "{{ env "NOMAD_META_receiver_kafka_metadata_broker_list" }}" }, "KafkaTopics": { "asapo": {}