Skip to content
Snippets Groups Projects
Commit 678f0414 authored by George Sedov's avatar George Sedov
Browse files

kafka integration test

parent 5336c57e
Branches
Tags
No related merge requests found
......@@ -32,12 +32,14 @@ function(prepare_asapo)
else()
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_tcp.json.tpl.lin.in receiver_tcp.json.tpl @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_fabric.json.tpl.lin.in receiver_fabric.json.tpl @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver_kafka.json.tpl.lin.in receiver_kafka.json.tpl @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/authorizer_settings.json.tpl.lin authorizer.json.tpl COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/start_services.sh start_services.sh COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/common_scripts/stop_services.sh stop_services.sh COPYONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_tcp.nmd.in receiver_tcp.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_fabric.nmd.in receiver_fabric.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver_kafka.nmd.in receiver_kafka.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/nginx_kill_lin.nmd nginx_kill.nmd @ONLY)
endif()
......
job "receiver" {
datacenters = ["dc1"]
type = "service"
group "group" {
count = 1
task "receiver" {
driver = "raw_exec"
config {
command = "@RECEIVER_DIR@/@RECEIVER_NAME@"
args = ["${NOMAD_TASK_DIR}/receiver.json"]
}
resources {
cpu = 500 # 500 MHz
memory = 256 # 256MB
network {
port "recv" {}
port "recv_ds" {}
port "recv_metrics" {}
}
}
service {
name = "asapo-receiver"
port = "recv"
check {
name = "alive"
type = "tcp"
interval = "10s"
timeout = "2s"
initial_status = "passing"
}
}
template {
source = "@WORK_DIR@/receiver_kafka.json.tpl"
destination = "local/receiver.json"
change_mode = "signal"
change_signal = "SIGHUP"
}
}
}
}
add_subdirectory(transfer_single_file)
add_subdirectory(transfer_single_file_bypass_buffer)
add_subdirectory(transfer_single_file_with_kafka)
if (BUILD_PYTHON)
add_subdirectory(transfer_single_file_write_to_raw)
......
set(TARGET_NAME transfer-single-file_kafka)
set(SOURCE_FILES kafka_mock.cpp)
add_executable(${TARGET_NAME} ${SOURCE_FILES})
target_link_libraries(${TARGET_NAME} rdkafka)
#use expression generator to get rid of VS adding Debug/Release folders
set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY
${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:>
)
################################
# Testing
################################
prepare_asapo()
add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem)
#!/usr/bin/env bash
set -e
trap Cleanup EXIT
database_name=db_test
beamtime_id=asapo_test
beamline=test
receiver_root_folder=/tmp/asapo/receiver/files
facility=test_facility
year=2019
receiver_folder=${receiver_root_folder}/${facility}/gpfs/${beamline}/${year}/data/${beamtime_id}
Cleanup() {
echo cleanup
set +e
nomad stop receiver
nomad run receiver_tcp.nmd
while true
do
sleep 1
curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue
echo recevier started
break
done
rm -rf ${receiver_root_folder}
echo "db.dropDatabase()" | mongo ${beamtime_id}_detector
influx -database ${database_name} -execute "drop series from statistics, RequestsRate"
}
./transfer-single-file_kafka & KAFKA_PID=$!
echo "Started the kafka listener"
while [ ! -f bootstrap ]; do
if ! kill -0 $KAFKA_PID > /dev/null 2>&1; then
# listener exited preliminary, i.e. some error
exit 1
fi
sleep 1
done
BOOTSTRAP=$(cat bootstrap)
echo "Read kafka bootstrap: ${BOOTSTRAP}"
export NOMAD_ADDR_bootstrap=${BOOTSTRAP}
nomad stop receiver
nomad run receiver_kafka.nmd
while true
do
sleep 1
curl --silent 127.0.0.1:8400/asapo-discovery/v0.1/asapo-receiver?protocol=v0.1 --stderr - | grep 127.0.0.1 || continue
echo recevier started
break
done
mkdir -p ${receiver_folder}
$1 localhost:8400 ${beamtime_id} 100 1 1 0 30
ls -ln ${receiver_folder}/processed/1 | awk '{ print $5 }'| grep 100000
$1 localhost:8400 wrong_beamtime_id 100 1 1 0 1 2>&1 | tee /dev/stderr | grep "authorization"
wait $KAFKA_PID
RESULT=$?
echo "Mock kafka returned $RESULT"
SET mongo_exe="c:\Program Files\MongoDB\Server\4.2\bin\mongo.exe"
SET beamtime_id=asapo_test
SET beamline=test
SET receiver_root_folder=c:\tmp\asapo\receiver\files
SET receiver_folder="%receiver_root_folder%\test_facility\gpfs\%beamline%\2019\data\%beamtime_id%"
mkdir %receiver_folder%
"%1" 127.0.0.1:8400 %beamtime_id% 100 1 1 0 30
ping 192.0.2.1 -n 1 -w 1000 > nul
FOR /F "usebackq" %%A IN ('%receiver_folder%\processed\1') DO set size=%%~zA
if %size% NEQ 100000 goto :error
"%1" 127.0.0.1:8400 wrong_id 100 1 1 0 2 2>1 | findstr /c:"authorization" || goto :error
goto :clean
:error
call :clean
exit /b 1
:clean
rmdir /S /Q %receiver_root_folder%
echo db.dropDatabase() | %mongo_exe% %beamtime_id%_detector
#include <stdlib.h>
#include "librdkafka/rdkafka.h"
#include "librdkafka/rdkafka_mock.h"
int main(int argc, char** argv) {
rd_kafka_t *rkproducer, *rkconsumer;
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_mock_cluster_t *mcluster;
char errstr[256];
if(rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
return EXIT_FAILURE;
}
/*if(rd_kafka_conf_set(conf, "bootstrap.servers", "127.0.0.1:23456", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
return EXIT_FAILURE;
}*/
rkproducer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if(!rkproducer) {
fprintf(stderr, "Failed to create kafka producer: %s\n", errstr);
return EXIT_FAILURE;
}
mcluster = rd_kafka_mock_cluster_new(rkproducer, 1);
if(!mcluster) {
fprintf(stderr, "Failed to create kafka cluster: %s\n", errstr);
return EXIT_FAILURE;
}
const char* bootstrap = rd_kafka_mock_cluster_bootstraps(mcluster);
fprintf(stdout, "Bootstrap for kafka cluster: %s\n", bootstrap);
FILE *fp = fopen("bootstrap", "w");
fprintf(fp, "%s", bootstrap);
fclose(fp);
conf = rd_kafka_conf_new();
if(rd_kafka_conf_set(conf, "client.id", "MOCK_CONSUMER", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
return EXIT_FAILURE;
}
if(rd_kafka_conf_set(conf, "group.id", "asapo", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
return EXIT_FAILURE;
}
if(rd_kafka_conf_set(conf, "bootstrap.servers", bootstrap, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Failed to set kafka config: %s\n", errstr);
return EXIT_FAILURE;
}
rkconsumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if(!rkconsumer) {
fprintf(stderr, "Failed to create kafka consumer: %s\n", errstr);
return EXIT_FAILURE;
}
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rkconsumer, 10000);
if(!rkmessage) {
fprintf(stderr, "No kafka message received\n");
//return EXIT_FAILURE;
return EXIT_SUCCESS;
} else {
fprintf(stderr, "Got message: err=%d, size=%ld\n", rkmessage->err, rkmessage->len);
}
rd_kafka_message_destroy(rkmessage);
rd_kafka_mock_cluster_destroy(mcluster);
rd_kafka_destroy(rkproducer);
rd_kafka_destroy(rkconsumer);
return EXIT_SUCCESS;
}
{
"PerformanceDbServer":"localhost:8086",
"MonitorPerformance": true,
"PerformanceDbName": "db_test",
"DatabaseServer":"auto",
"DiscoveryServer": "localhost:8400/asapo-discovery",
"DataServer": {
"AdvertiseURI": "127.0.0.1:{{ env "NOMAD_PORT_recv_ds" }}",
"NThreads": 2,
"ListenPort": {{ env "NOMAD_PORT_recv_ds" }},
"NetworkMode": ["fabric"]
},
"Metrics": {
"Expose": true,
"ListenPort": {{ env "NOMAD_PORT_recv_metrics" }}
},
"DataCache": {
"Use": @RECEIVER_USE_CACHE@,
"SizeGB": 1,
"ReservedShare": 10
},
"AuthorizationServer": "localhost:8400/asapo-authorizer",
"AuthorizationInterval": 1000,
"ListenPort": {{ env "NOMAD_PORT_recv" }},
"Tag": "{{ env "NOMAD_ADDR_recv" }}",
"ReceiveToDiskThresholdMB":50,
"LogLevel" : "debug",
"Kafka" : {
"Enabled" : true,
"KafkaClient": {
"metadata.broker.list": "{{ env "NOMAD_ADDR_bootstrap" }}"
},
"KafkaTopics": {
"asapo": {}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment