From 68c262a00de94eddc962c490d36c701853fd69a8 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Fri, 18 Oct 2019 14:10:19 +0200
Subject: [PATCH] upadte dummy producer, set number of threads for datacenter

---
 .../cluster/asapo_overwrite_vars.tfvars       |  1 +
 deploy/docker/cluster/run_maxwell.sh          | 20 ++++++++-----------
 .../cluster/scripts/asapo-logging.nmd.tpl     |  3 ++-
 .../cluster/scripts/asapo-receivers.nmd.tpl   |  1 +
 .../cluster/scripts/asapo.auto.tfvars.in      |  2 ++
 .../docker/cluster/scripts/receiver.json.tpl  |  2 +-
 deploy/docker/cluster/scripts/templates.tf    |  1 +
 deploy/docker/cluster/scripts/vars.tf         |  2 ++
 .../getnext_broker/getnext_broker.cpp         | 16 ++++++++++++++-
 .../dummy_data_producer.cpp                   | 19 ++++++++++--------
 .../full_chain/simple_chain/check_linux.sh    |  1 -
 .../full_chain/simple_chain/check_windows.bat |  2 --
 .../client_serv/ip_tcp_network.cpp            |  2 +-
 13 files changed, 45 insertions(+), 27 deletions(-)

diff --git a/deploy/docker/cluster/asapo_overwrite_vars.tfvars b/deploy/docker/cluster/asapo_overwrite_vars.tfvars
index aa170da0c..83378f60e 100644
--- a/deploy/docker/cluster/asapo_overwrite_vars.tfvars
+++ b/deploy/docker/cluster/asapo_overwrite_vars.tfvars
@@ -2,6 +2,7 @@ elk_logs = true
 
 receiver_total_memory_size = 35000
 receiver_dataserver_cache_size = 30 #gb
+receiver_dataserver_nthreads = 8
 
 grafana_total_memory_size = 2000
 influxdb_total_memory_size = 2000
diff --git a/deploy/docker/cluster/run_maxwell.sh b/deploy/docker/cluster/run_maxwell.sh
index 0f38142cf..500d8691e 100755
--- a/deploy/docker/cluster/run_maxwell.sh
+++ b/deploy/docker/cluster/run_maxwell.sh
@@ -17,29 +17,25 @@ ASAPO_USER=`id -u`:`id -g`
 ASAPO_VAR_FILE=`pwd`/asapo_overwrite_vars.tfvars
 
 
-# use ib interface for service discovery (all communications goes thourgh this interface)
-# todo: use ib only for communications with receiver (asapo discovery service should return correct ip using node meta IB_ADDRESS)
-USE_IP_OVER_IB=true
-
 #docker stuff
 DOCKER_ENDPOINT="127.0.0.1:2376" #comment to use unix sockets
 DOCKER_TLS_CA=/data/netapp/docker/certs/ca.pem
 DOCKER_TLS_KEY=/data/netapp/docker/certs/$USER/key.pem
 DOCKER_TLS_CERT=/data/netapp/docker/certs/$USER/cert.pem
 
-IB_HOSTNAME=`hostname --short`-ib
-IB_ADDRESS=`getent hosts $IB_HOSTNAME | awk '{ print $1 }'`
-
-if [ "$USE_IP_OVER_IB" == "true" ]; then
-  ADVERTISE_IP=$IB_ADDRESS
-  HOSTNAME_SUFFIX=-ib
+#adresses to use
+USE_IB_FOR_RECEIVER=true
+if [ "$USE_IB_FOR_RECEIVER" == "true" ]; then
+  IB_HOSTNAME=`hostname --short`-ib
+  IB_ADDRESS=`getent hosts $IB_HOSTNAME | awk '{ print $1 }'`
 fi
+#ADVERTISE_IP=  #set if differs from default
 
 #prepare env variables based on the above input
 N_SERVERS=$(( $SLURM_JOB_NUM_NODES > $MAX_NOMAD_SERVERS ? $MAX_NOMAD_SERVERS : $SLURM_JOB_NUM_NODES ))
 
-SERVER_ADRESSES=`scontrol show hostnames $SLURM_JOB_NODELIST | head -$N_SERVERS | awk -v suf=$HOSTNAME_SUFFIX 'BEGIN{printf "["} {printf "%s\"%s%s\"",sep,$0,suf; sep=","} END{print "]"}'`
-ASAPO_LIGHTWEIGHT_SERVICE_NODES=`scontrol show hostnames $SLURM_JOB_NODELIST | head -$N_ASAPO_LIGHTWEIGHT_SERVICE_NODES | awk -v suf=$HOSTNAME_SUFFIX 'BEGIN{printf "["} {printf "%s\"%s%s\"",sep,$0,suf; sep=","} END{print "]"}'`
+SERVER_ADRESSES=`scontrol show hostnames $SLURM_JOB_NODELIST | head -$N_SERVERS | awk 'BEGIN{printf "["} {printf "%s\"%s\"",sep,$0; sep=","} END{print "]"}'`
+ASAPO_LIGHTWEIGHT_SERVICE_NODES=`scontrol show hostnames $SLURM_JOB_NODELIST | head -$N_ASAPO_LIGHTWEIGHT_SERVICE_NODES | 'BEGIN{printf "["} {printf "%s\"%s\"",sep,$0; sep=","} END{print "]"}'`
 
 # make folders if not exist
 mkdir -p $NOMAD_ALLOC_HOST_SHARED $SERVICE_DATA_CLUSTER_SHARED $DATA_GLOBAL_SHARED
diff --git a/deploy/docker/cluster/scripts/asapo-logging.nmd.tpl b/deploy/docker/cluster/scripts/asapo-logging.nmd.tpl
index 960395a8a..d228555b7 100644
--- a/deploy/docker/cluster/scripts/asapo-logging.nmd.tpl
+++ b/deploy/docker/cluster/scripts/asapo-logging.nmd.tpl
@@ -19,7 +19,8 @@ job "asapo-logging" {
     count = "%{ if nomad_logs }0%{ else }1%{ endif }"
     restart {
       attempts = 2
-      interval = "3m"
+      interval = "3m"h nnodes
+
       delay = "15s"
       mode = "delay"
     }
diff --git a/deploy/docker/cluster/scripts/asapo-receivers.nmd.tpl b/deploy/docker/cluster/scripts/asapo-receivers.nmd.tpl
index 3eca7a63b..af4c2f500 100644
--- a/deploy/docker/cluster/scripts/asapo-receivers.nmd.tpl
+++ b/deploy/docker/cluster/scripts/asapo-receivers.nmd.tpl
@@ -77,6 +77,7 @@ job "asapo-receivers" {
 
       meta {
         receiver_dataserver_cache_size = "${receiver_dataserver_cache_size}"
+        receiver_dataserver_nthreads = "${receiver_dataserver_nthreads}"
       }
 
 
diff --git a/deploy/docker/cluster/scripts/asapo.auto.tfvars.in b/deploy/docker/cluster/scripts/asapo.auto.tfvars.in
index 2e378e19c..8f2df9e65 100644
--- a/deploy/docker/cluster/scripts/asapo.auto.tfvars.in
+++ b/deploy/docker/cluster/scripts/asapo.auto.tfvars.in
@@ -16,6 +16,8 @@ job_scripts_dir = "/var/run/asapo"
 receiver_total_memory_size = "2000"
 
 receiver_dataserver_cache_size = "1" #gb
+receiver_dataserver_nthreads = 4
+
 grafana_total_memory_size = "256"
 
 influxdb_total_memory_size = "256"
diff --git a/deploy/docker/cluster/scripts/receiver.json.tpl b/deploy/docker/cluster/scripts/receiver.json.tpl
index e2770b245..94eada3d8 100644
--- a/deploy/docker/cluster/scripts/receiver.json.tpl
+++ b/deploy/docker/cluster/scripts/receiver.json.tpl
@@ -8,7 +8,7 @@
   "AuthorizationInterval": 10000,
   "ListenPort": {{ env "NOMAD_PORT_recv" }},
   "DataServer": {
-    "NThreads": 2,
+    "NThreads": {{ env "NOMAD_META_receiver_dataserver_nthreads" }},
     "ListenPort": {{ env "NOMAD_PORT_recv_ds" }}
   },
   "DataCache": {
diff --git a/deploy/docker/cluster/scripts/templates.tf b/deploy/docker/cluster/scripts/templates.tf
index b74df8127..df20e3b7a 100644
--- a/deploy/docker/cluster/scripts/templates.tf
+++ b/deploy/docker/cluster/scripts/templates.tf
@@ -38,6 +38,7 @@ data "template_file" "asapo_receivers" {
     nomad_logs = "${var.nomad_logs}"
     receiver_total_memory_size = "${var.receiver_total_memory_size}"
     receiver_dataserver_cache_size = "${var.receiver_dataserver_cache_size}"
+    receiver_dataserver_nthreads = "${var.receiver_dataserver_nthreads}"
     asapo_user = "${var.asapo_user}"
     n_receivers = "${var.n_receivers}"
   }
diff --git a/deploy/docker/cluster/scripts/vars.tf b/deploy/docker/cluster/scripts/vars.tf
index 9d6036284..266cd2ace 100644
--- a/deploy/docker/cluster/scripts/vars.tf
+++ b/deploy/docker/cluster/scripts/vars.tf
@@ -30,6 +30,8 @@ variable "receiver_total_memory_size" {}
 
 variable "receiver_dataserver_cache_size" {}
 
+variable "receiver_dataserver_nthreads" {}
+
 variable "grafana_total_memory_size" {}
 
 variable "influxdb_total_memory_size" {}
diff --git a/examples/consumer/getnext_broker/getnext_broker.cpp b/examples/consumer/getnext_broker/getnext_broker.cpp
index 4f16ab67b..2cf8fbc99 100644
--- a/examples/consumer/getnext_broker/getnext_broker.cpp
+++ b/examples/consumer/getnext_broker/getnext_broker.cpp
@@ -19,6 +19,9 @@ using asapo::Error;
 std::string group_id = "";
 std::mutex lock;
 
+uint64_t file_size = 0;
+
+
 struct Args {
     std::string server;
     std::string file_path;
@@ -89,6 +92,11 @@ std::vector<std::thread> StartThreads(const Args& params,
                 err = broker->GetNext(&fi, group_id, params.read_data ? &data : nullptr);
                 if (err == nullptr) {
                     (*nbuf)[i] += fi.buf_id == 0 ? 0 : 1;
+                    if (file_size == 0) {
+                        lock.lock();
+                        file_size = fi.size;
+                        lock.unlock();
+                    }
                     if (params.read_data && (*nfiles)[i] < 10 && fi.size < 10) {
                         data[9] = 0;
                         std::cout << "Received: " << reinterpret_cast<char const*>(data.get()) << std::endl;
@@ -193,6 +201,12 @@ int main(int argc, char* argv[]) {
     }
     std::cout << "Errors : " << nerrors << std::endl;
     std::cout << "Elapsed : " << duration_ms << "ms" << std::endl;
-    std::cout << "Rate : " << 1000.0f * nfiles / (duration_ms - params.timeout_ms) << std::endl;
+    auto rate = 1000.0f * nfiles / (duration_ms - params.timeout_ms);
+    auto bw_gbytes = rate * file_size / 1000.0f / 1000.0f / 1000.0f;
+    std::cout << "Rate : " << rate << std::endl;
+    if (file_size > 0) {
+        std::cout << "Bandwidth " << bw_gbytes * 8 << " Gbit/s" << std::endl;
+        std::cout << "Bandwidth " << bw_gbytes << " GBytes/s" << std::endl;
+    }
     return nerrors == 0 ? 0 : 1;
 }
diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index 8f62763dd..1d5484184 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -130,14 +130,12 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
                    const std::string& stream, bool write_files) {
 
     asapo::Error err;
-    if (iterations > 0) { // send wrong meta, for negative integration tests
-        err = producer->SendMetaData("bla", &ProcessAfterMetaDataSend);
-    } else {
+    if (iterations == 0) {
         err = producer->SendMetaData("{\"dummy_meta\":\"test\"}", &ProcessAfterMetaDataSend);
-    }
-    if (err) {
-        std::cerr << "Cannot send metadata: " << err << std::endl;
-        return false;
+        if (err) {
+            std::cerr << "Cannot send metadata: " << err << std::endl;
+            return false;
+        }
     }
 
     for(uint64_t i = 0; i < iterations; i++) {
@@ -219,6 +217,7 @@ void PrintOutput(const Args& args, const system_clock::time_point& start) {
     double rate = args.iterations / duration_sec;
     std::cout << "Rate: " << rate << " Hz" << std::endl;
     std::cout << "Bandwidth " << size_gb / duration_sec << " Gbit/s" << std::endl;
+    std::cout << "Bandwidth " << size_gb / duration_sec / 8 << " GBytes/s" << std::endl;
 }
 
 
@@ -229,7 +228,11 @@ int main (int argc, char* argv[]) {
 
     auto producer = CreateProducer(args);
 
-    iterations_remained = args.iterations * args.images_in_set + 1;
+    if (args.iterations == 0) {
+        iterations_remained = 1; // metadata
+    } else {
+        iterations_remained = args.iterations * args.images_in_set;
+    }
 
     system_clock::time_point start_time = system_clock::now();
 
diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh
index b6e90a8f6..da561d454 100644
--- a/tests/automatic/full_chain/simple_chain/check_linux.sh
+++ b/tests/automatic/full_chain/simple_chain/check_linux.sh
@@ -51,4 +51,3 @@ $1 localhost:8400 ${beamtime_id} 100 1000 4 0 100
 $2 ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 5000 1 > out
 cat out
 cat out   | grep "Processed 1000 file(s)"
-cat out | grep "Cannot get metadata"
diff --git a/tests/automatic/full_chain/simple_chain/check_windows.bat b/tests/automatic/full_chain/simple_chain/check_windows.bat
index 38ce093a3..41a7479f1 100644
--- a/tests/automatic/full_chain/simple_chain/check_windows.bat
+++ b/tests/automatic/full_chain/simple_chain/check_windows.bat
@@ -29,8 +29,6 @@ REM consumer
 "%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 5000  1 > out.txt
 type out.txt
 findstr /i /l /c:"Processed 1000 file(s)"  out.txt || goto :error
-findstr /i /l /c:"Cannot get metadata"  out.txt || goto :error
-
 
 goto :clean
 
diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp
index f5a880bee..4f6ba0f7f 100644
--- a/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp
+++ b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp
@@ -17,7 +17,7 @@ using asapo::M_AssertEq;
 using namespace std::chrono;
 
 static const std::unique_ptr<asapo::IO>
-    io(asapo::GenerateDefaultIO());
+io(asapo::GenerateDefaultIO());
 static const std::string kListenAddress = "127.0.0.1:60123";
 static std::promise<void> kThreadStarted;
 static const int kNumberOfChecks = 2;
-- 
GitLab