diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake index bcf2db0fcd819039856d419e85e7f3d00cc8be3b..e2badd2b9ca9e49bfd2414f7da3e431a3a12cf21 100644 --- a/CMakeModules/prepare_asapo.cmake +++ b/CMakeModules/prepare_asapo.cmake @@ -32,12 +32,14 @@ function(prepare_asapo) else() configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_tcp.json.tpl.lin.in receiver_tcp.json.tpl @ONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_fabric.json.tpl.lin.in receiver_fabric.json.tpl @ONLY) + configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_kafka.json.tpl.lin.in receiver_kafka.json.tpl @ONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/authorizer_settings.json.tpl.lin authorizer.json.tpl COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/start_services.sh start_services.sh COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/stop_services.sh stop_services.sh COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_tcp.nmd.in receiver_tcp.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_fabric.nmd.in receiver_fabric.nmd @ONLY) + configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_kafka.nmd.in receiver_kafka.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/nginx_kill_lin.nmd nginx_kill.nmd @ONLY) endif() diff --git a/config/nomad/receiver_kafka.nmd.in b/config/nomad/receiver_kafka.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..1bdc21de51e34cbbb3f4515eabaabd92e7e5c6fa --- /dev/null +++ b/config/nomad/receiver_kafka.nmd.in @@ -0,0 +1,48 @@ +job "receiver" { + datacenters = ["dc1"] + + type = "service" + + group "group" { + count = 1 + + task "receiver" { + driver = "raw_exec" + + config { + command = "@RECEIVER_DIR@/@RECEIVER_NAME@" + args = ["${NOMAD_TASK_DIR}/receiver.json"] + } + + resources { + cpu = 500 # 500 MHz + memory = 256 # 256MB + network { + port "recv" {} + port "recv_ds" {} + port "recv_metrics" {} + } + } + + service { + name = "asapo-receiver" + port = "recv" + check { + name = "alive" + type = "tcp" + interval = "10s" + timeout = "2s" + initial_status = "passing" + } + } + + template { + source = "@WORK_DIR@/receiver_kafka.json.tpl" + destination = "local/receiver.json" + change_mode = "signal" + change_signal = "SIGHUP" + } + + } + } +} diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt index aae8d4bf65ba9db9753972c71493c900cb5a8310..c3689a65a9c31061619848dbf12a02b393981075 100644 --- a/tests/automatic/producer_receiver/CMakeLists.txt +++ b/tests/automatic/producer_receiver/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(transfer_single_file) add_subdirectory(transfer_single_file_bypass_buffer) +add_subdirectory(transfer_single_file_with_kafka) if (BUILD_PYTHON) add_subdirectory(transfer_single_file_write_to_raw) diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..f3dd1eb041b3c62ed0ace09740365068b91ba218 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/CMakeLists.txt @@ -0,0 +1,17 @@ +set(TARGET_NAME transfer-single-file_kafka) +set(SOURCE_FILES kafka_mock.cpp) + +add_executable(${TARGET_NAME} ${SOURCE_FILES}) +target_link_libraries(${TARGET_NAME} rdkafka) + +#use expression generator to get rid of VS adding Debug/Release folders +set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY + ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> + ) + +################################ +# Testing +################################ +prepare_asapo() + +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem) diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..2c0ff36bac47cc47057ab47268838ce93eb65b0c --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_linux.sh @@ -0,0 +1,72 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +database_name=db_test +beamtime_id=asapo_test +beamline=test +receiver_root_folder=/tmp/asapo/receiver/files +facility=test_facility +year=2019 +receiver_folder=${receiver_root_folder}/${facility}/gpfs/${beamline}/${year}/data/${beamtime_id} + + +Cleanup() { + echo cleanup + set +e + nomad stop receiver + nomad run receiver_tcp.nmd + while true + do + sleep 1 + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + echo recevier started + break + done + rm -rf ${receiver_root_folder} + echo "db.dropDatabase()" | mongo ${beamtime_id}_detector + influx -database ${database_name} -execute "drop series from statistics, RequestsRate" +} + +./transfer-single-file_kafka & KAFKA_PID=$! + +echo "Started the kafka listener" + +while [ ! -f bootstrap ]; do + if ! kill -0 $KAFKA_PID > /dev/null 2>&1; then + # listener exited preliminary, i.e. some error + exit 1 + fi + sleep 1 +done + +BOOTSTRAP=$(cat bootstrap) + +echo "Read kafka bootstrap: ${BOOTSTRAP}" + +export NOMAD_ADDR_bootstrap=${BOOTSTRAP} + +nomad stop receiver +nomad run receiver_kafka.nmd +while true +do + sleep 1 + curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue + echo recevier started + break +done + +mkdir -p ${receiver_folder} + +$1 localhost:8400 ${beamtime_id} 100 1 1 0 30 + +ls -ln ${receiver_folder}/processed/1 | awk '{ print $5 }'| grep 100000 + +$1 localhost:8400 wrong_beamtime_id 100 1 1 0 1 2>&1 | tee /dev/stderr | grep "authorization" + +wait $KAFKA_PID +RESULT=$? + +echo "Mock kafka returned $RESULT" diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_windows.bat b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..afcbb8dd81b1d2206bd75a52682e8390a86c25ce --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/check_windows.bat @@ -0,0 +1,28 @@ +SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe" +SET beamtime_id=asapo_test +SET beamline=test +SET receiver_root_folder=c:\tmp\asapo\receiver\files +SET receiver_folder="%receiver_root_folder%\test_facility\gpfs\%beamline%\2019\data\%beamtime_id%" + +mkdir %receiver_folder% + +"%1" 127.0.0.1:8400 %beamtime_id% 100 1 1 0 30 + +ping 192.0.2.1 -n 1 -w 1000 > nul + +FOR /F "usebackq" %%A IN ('%receiver_folder%\processed\1') DO set size=%%~zA +if %size% NEQ 100000 goto :error + +"%1" 127.0.0.1:8400 wrong_id 100 1 1 0 2 2>1 | findstr /c:"authorization" || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +rmdir /S /Q %receiver_root_folder% +echo db.dropDatabase() | %mongo_exe% %beamtime_id%_detector + + diff --git a/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp new file mode 100644 index 0000000000000000000000000000000000000000..60cb5c29882f473fb4a56b3a8865f3f8f6caba81 --- /dev/null +++ b/tests/automatic/producer_receiver/transfer_single_file_with_kafka/kafka_mock.cpp @@ -0,0 +1,77 @@ +#include <stdlib.h> + +#include "librdkafka/rdkafka.h" +#include "librdkafka/rdkafka_mock.h" + +int main(int argc, char** argv) { + rd_kafka_t *rkproducer, *rkconsumer; + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + rd_kafka_mock_cluster_t *mcluster; + char errstr[256]; + + if(rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + } + /*if(rd_kafka_conf_set(conf, "bootstrap.servers", "127.0.0.1:23456", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + }*/ + + rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if(!rkproducer) { + fprintf(stderr, "Failed to create kafka producer: %s\n", errstr); + return EXIT_FAILURE; + } + + mcluster = rd_kafka_mock_cluster_new(rkproducer, 1); + if(!mcluster) { + fprintf(stderr, "Failed to create kafka cluster: %s\n", errstr); + return EXIT_FAILURE; + } + + const char* bootstrap = rd_kafka_mock_cluster_bootstraps(mcluster); + + fprintf(stdout, "Bootstrap for kafka cluster: %s\n", bootstrap); + + FILE *fp = fopen("bootstrap", "w"); + fprintf(fp, "%s", bootstrap); + fclose(fp); + + conf = rd_kafka_conf_new(); + if(rd_kafka_conf_set(conf, "client.id", "MOCK_CONSUMER", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + } + if(rd_kafka_conf_set(conf, "group.id", "asapo", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + } + if(rd_kafka_conf_set(conf, "bootstrap.servers", bootstrap, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "Failed to set kafka config: %s\n", errstr); + return EXIT_FAILURE; + } + + rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + + if(!rkconsumer) { + fprintf(stderr, "Failed to create kafka consumer: %s\n", errstr); + return EXIT_FAILURE; + } + rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 10000); + + if(!rkmessage) { + fprintf(stderr, "No kafka message received\n"); + //return EXIT_FAILURE; + return EXIT_SUCCESS; + } else { + fprintf(stderr, "Got message: err=%d, size=%ld\n", rkmessage->err, rkmessage->len); + } + + rd_kafka_message_destroy(rkmessage); + + rd_kafka_mock_cluster_destroy(mcluster); + rd_kafka_destroy(rkproducer); + rd_kafka_destroy(rkconsumer); + return EXIT_SUCCESS; +} diff --git a/tests/automatic/settings/receiver_kafka.json.tpl.lin.in b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in new file mode 100644 index 0000000000000000000000000000000000000000..51eb00de61606112fc3cbb4d83930778fffe8ebe --- /dev/null +++ b/tests/automatic/settings/receiver_kafka.json.tpl.lin.in @@ -0,0 +1,37 @@ +{ + "PerformanceDbServer":"localhost:8086", + "MonitorPerformance": true, + "PerformanceDbName": "db_test", + "DatabaseServer":"auto", + "DiscoveryServer": "localhost:8400/asapo-discovery", + "DataServer": { + "AdvertiseURI": "127.0.0.1:{{ env "NOMAD_PORT_recv_ds" }}", + "NThreads": 2, + "ListenPort": {{ env "NOMAD_PORT_recv_ds" }}, + "NetworkMode": ["fabric"] + }, + "Metrics": { + "Expose": true, + "ListenPort": {{ env "NOMAD_PORT_recv_metrics" }} + }, + "DataCache": { + "Use": @RECEIVER_USE_CACHE@, + "SizeGB": 1, + "ReservedShare": 10 + }, + "AuthorizationServer": "localhost:8400/asapo-authorizer", + "AuthorizationInterval": 1000, + "ListenPort": {{ env "NOMAD_PORT_recv" }}, + "Tag": "{{ env "NOMAD_ADDR_recv" }}", + "ReceiveToDiskThresholdMB":50, + "LogLevel" : "debug", + "Kafka" : { + "Enabled" : true, + "KafkaClient": { + "metadata.broker.list": "{{ env "NOMAD_ADDR_bootstrap" }}" + }, + "KafkaTopics": { + "asapo": {} + } + } + }