From 8b2ee2cdd6a5d4c0aa4ebb5846ac823e0461261b Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 19 Sep 2018 15:38:35 +0200
Subject: [PATCH] update nomad jobs, more debug out for windows

---
 common/cpp/src/system_io/system_io_linux.cpp  |  5 +-
 .../src/shared_event_list.cpp                 |  2 +
 .../src/single_folder_watch_windows.cpp       |  3 +
 receiver/src/receiver.cpp                     |  2 +-
 receiver/src/receiver.h                       |  1 +
 tests/automatic/CMakeLists.txt                |  4 +-
 tests/automatic/bug_fixes/CMakeLists.txt      |  5 ++
 .../CMakeLists.txt                            | 16 ++++
 .../check_linux.sh                            | 55 ++++++++++++++
 .../check_windows.bat                         | 76 +++++++++++++++++++
 .../producer_send_after_restart/test.json.in  | 12 +++
 .../receiver_cpu_usage/CMakeLists.txt         | 11 +++
 .../receiver_cpu_usage/check_linux.sh         | 56 ++++++++++++++
 .../bug_fixes/receiver_cpu_usage/test.json.in | 12 +++
 .../settings/discovery_fixed_settings.json    | 10 +++
 .../asapo-test_filegen_filemon.nomad          | 42 +++++-----
 ...apo-test_filemon_producer_toreceiver.nomad |  2 +-
 17 files changed, 289 insertions(+), 25 deletions(-)
 create mode 100644 tests/automatic/bug_fixes/CMakeLists.txt
 create mode 100644 tests/automatic/bug_fixes/producer_send_after_restart/CMakeLists.txt
 create mode 100644 tests/automatic/bug_fixes/producer_send_after_restart/check_linux.sh
 create mode 100644 tests/automatic/bug_fixes/producer_send_after_restart/check_windows.bat
 create mode 100644 tests/automatic/bug_fixes/producer_send_after_restart/test.json.in
 create mode 100644 tests/automatic/bug_fixes/receiver_cpu_usage/CMakeLists.txt
 create mode 100644 tests/automatic/bug_fixes/receiver_cpu_usage/check_linux.sh
 create mode 100644 tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in
 create mode 100644 tests/automatic/settings/discovery_fixed_settings.json

diff --git a/common/cpp/src/system_io/system_io_linux.cpp b/common/cpp/src/system_io/system_io_linux.cpp
index 76342ac4f..a1013b29b 100644
--- a/common/cpp/src/system_io/system_io_linux.cpp
+++ b/common/cpp/src/system_io/system_io_linux.cpp
@@ -1,4 +1,5 @@
 
+
 #include <cstring>
 
 #include <dirent.h>
@@ -236,11 +237,11 @@ SocketDescriptor SystemIO::_socket(int address_family, int socket_type, int sock
 }
 
 ssize_t SystemIO::_send(SocketDescriptor socket_fd, const void* buffer, size_t length) {
-    return ::send(socket_fd, buffer, length, MSG_DONTWAIT);
+    return ::send(socket_fd, buffer, length, MSG_NOSIGNAL);
 }
 
 ssize_t SystemIO::_recv(SocketDescriptor socket_fd, void* buffer, size_t length) {
-    return ::recv(socket_fd, buffer, length, MSG_DONTWAIT);
+    return ::recv(socket_fd, buffer, length, 0);
 }
 
 int SystemIO::_mkdir(const char* dirname) const {
diff --git a/producer/event_monitor_producer/src/shared_event_list.cpp b/producer/event_monitor_producer/src/shared_event_list.cpp
index 8787ead85..286165cd7 100644
--- a/producer/event_monitor_producer/src/shared_event_list.cpp
+++ b/producer/event_monitor_producer/src/shared_event_list.cpp
@@ -1,4 +1,5 @@
 #include "shared_event_list.h"
+#include "eventmon_logger.h"
 
 #include <algorithm>
 
@@ -13,6 +14,7 @@ FilesToSend SharedEventList::GetAndClearEvents() {
         uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() -
                               it->time).count();
         if (!it->apply_delay || elapsed_ms > kFileDelayMs) {
+            GetDefaultEventMonLogger()->Debug("file considered closed or file moved: " + it->file_name);
             events.push_back(it->file_name);
             it = events_.erase(it);
         } else {
diff --git a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp
index 8f0010fab..0453c4ea9 100644
--- a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp
+++ b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp
@@ -27,6 +27,7 @@ Error SingleFolderWatch::Init()  {
         this->log__->Error("cannot add folder watch for " + full_path + ": " + err->Explain());
         return err;
     }
+    GetDefaultEventMonLogger()->Debug("added folder to monitor: " + full_path);
     return nullptr;
 }
 
@@ -53,9 +54,11 @@ Error SingleFolderWatch::ProcessEvent(const WinEvent& event) {
     if (watch_io__->IsDirectory(root_folder_ + kPathSeparator + fname)) {
         return nullptr;
     }
+    GetDefaultEventMonLogger()->Debug("file modified event: " + fname);
     event_list_->AddEvent(fname, event.ShouldBeProcessedAfterDelay());
     return nullptr;
 }
+
 void SingleFolderWatch::ProcessEvents(DWORD bytes_to_read) {
     for (char* p = buffer_.get(); p < buffer_.get() + bytes_to_read; ) {
         WinEvent event{(FILE_NOTIFY_INFORMATION*) p};
diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp
index 9a94afe45..758817873 100644
--- a/receiver/src/receiver.cpp
+++ b/receiver/src/receiver.cpp
@@ -61,7 +61,7 @@ void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, cons
     });
 
     if (thread) {
-        thread->detach();
+        threads_.emplace_back(std::move(thread));
     }
     return;
 }
diff --git a/receiver/src/receiver.h b/receiver/src/receiver.h
index c7435ce68..bc90ac678 100644
--- a/receiver/src/receiver.h
+++ b/receiver/src/receiver.h
@@ -17,6 +17,7 @@ class Receiver {
     Error PrepareListener(std::string listener_address);
     void StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address);
     void ProcessConnections(Error* err);
+    std::vector<std::unique_ptr<std::thread>> threads_;
   public:
     static const int kMaxUnacceptedConnectionsBacklog;//TODO: Read from config
     Receiver(const Receiver&) = delete;
diff --git a/tests/automatic/CMakeLists.txt b/tests/automatic/CMakeLists.txt
index b75349614..ea37ce600 100644
--- a/tests/automatic/CMakeLists.txt
+++ b/tests/automatic/CMakeLists.txt
@@ -31,4 +31,6 @@ add_subdirectory(producer)
 
 if (UNIX)
     add_subdirectory(high_avail)
-endif()
\ No newline at end of file
+endif()
+
+add_subdirectory(bug_fixes)
diff --git a/tests/automatic/bug_fixes/CMakeLists.txt b/tests/automatic/bug_fixes/CMakeLists.txt
new file mode 100644
index 000000000..91bd32692
--- /dev/null
+++ b/tests/automatic/bug_fixes/CMakeLists.txt
@@ -0,0 +1,5 @@
+if (UNIX)
+    add_subdirectory(receiver_cpu_usage)
+endif()
+
+add_subdirectory(producer_send_after_restart)
\ No newline at end of file
diff --git a/tests/automatic/bug_fixes/producer_send_after_restart/CMakeLists.txt b/tests/automatic/bug_fixes/producer_send_after_restart/CMakeLists.txt
new file mode 100644
index 000000000..56a0067a3
--- /dev/null
+++ b/tests/automatic/bug_fixes/producer_send_after_restart/CMakeLists.txt
@@ -0,0 +1,16 @@
+set(TARGET_NAME producer_send_after_restart)
+
+################################
+# Testing
+################################
+prepare_asapo()
+
+if (UNIX)
+    set (ROOT_PATH "/tmp/asapo/")
+else()
+    set (ROOT_PATH "c:\\\\tmp\\\\asapo\\\\")
+endif()
+
+configure_file(test.json.in test.json @ONLY)
+
+add_script_test("${TARGET_NAME}" "$<TARGET_FILE:event-monitor-producer-bin> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME>" nomem)
diff --git a/tests/automatic/bug_fixes/producer_send_after_restart/check_linux.sh b/tests/automatic/bug_fixes/producer_send_after_restart/check_linux.sh
new file mode 100644
index 000000000..250584350
--- /dev/null
+++ b/tests/automatic/bug_fixes/producer_send_after_restart/check_linux.sh
@@ -0,0 +1,55 @@
+#!/usr/bin/env bash
+
+set -e
+
+trap Cleanup EXIT
+
+beamtime_id=asapo_test
+
+monitor_database_name=db_test
+proxy_address=127.0.0.1:8400
+
+beamline=test
+receiver_root_folder=/tmp/asapo/receiver/files
+receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id}
+
+mkdir -p /tmp/asapo/test_in/test1/
+
+Cleanup() {
+    echo cleanup
+    rm -rf ${receiver_root_folder}
+    nomad stop nginx
+    nomad stop receiver
+    nomad stop discovery
+    nomad stop authorizer
+    echo "db.dropDatabase()" | mongo ${beamtime_id}
+}
+
+nomad run nginx.nmd
+nomad run authorizer.nmd
+nomad run receiver.nmd
+nomad run discovery.nmd
+
+sleep 1
+
+#producer
+mkdir -p ${receiver_folder}
+$1 test.json &> output &
+producerid=`echo $!`
+
+sleep 1
+
+echo hello > /tmp/asapo/test_in/test1/file1
+sleep 1
+nomad stop receiver
+sleep 1
+nomad run receiver.nmd
+
+echo hello > /tmp/asapo/test_in/test1/file1
+sleep 1
+
+kill -s INT $producerid
+sleep 0.5
+cat output
+cat output | grep "Processed 2"
+
diff --git a/tests/automatic/bug_fixes/producer_send_after_restart/check_windows.bat b/tests/automatic/bug_fixes/producer_send_after_restart/check_windows.bat
new file mode 100644
index 000000000..162097f75
--- /dev/null
+++ b/tests/automatic/bug_fixes/producer_send_after_restart/check_windows.bat
@@ -0,0 +1,76 @@
+
+
+SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
+SET beamtime_id=asapo_test
+SET beamline=test
+SET receiver_root_folder=c:\tmp\asapo\receiver\files
+SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%"
+
+set producer_short_name="%~nx1"
+
+
+"%3" token -secret broker_secret.key %beamtime_id% > token
+set /P token=< token
+
+set proxy_address="127.0.0.1:8400"
+
+echo db.%beamtime_id%.insert({dummy:1}) | %mongo_exe% %beamtime_id%
+
+c:\opt\consul\nomad run receiver.nmd
+c:\opt\consul\nomad run authorizer.nmd
+c:\opt\consul\nomad run discovery.nmd
+c:\opt\consul\nomad run broker.nmd
+c:\opt\consul\nomad run nginx.nmd
+
+ping 1.0.0.0 -n 10 -w 100 > nul
+
+REM producer
+mkdir %receiver_folder%
+mkdir  c:\tmp\asapo\test_in\test1
+mkdir  c:\tmp\asapo\test_in\test2
+start /B "" "%1" test.json
+
+ping 1.0.0.0 -n 3 -w 100 > nul
+
+echo hello > c:\tmp\asapo\test_in\test1\file1
+echo hello > c:\tmp\asapo\test_in\test1\file2
+
+ping 1.0.0.0 -n 3 -w 100 > nul
+
+c:\opt\consul\nomad stop receiver
+c:\opt\consul\nomad run receiver.nmd
+
+ping 1.0.0.0 -n 3 -w 100 > nul
+
+
+echo hello > c:\tmp\asapo\test_in\test1\file3
+
+ping 1.0.0.0 -n 10 -w 100 > nul
+
+
+REM worker
+"%2" %proxy_address% %beamtime_id% 2 %token% 1000 | findstr /c:"Processed 3 file(s)"  || goto :error
+
+
+goto :clean
+
+:error
+call :clean
+exit /b 1
+
+:clean
+c:\opt\consul\nomad stop receiver
+c:\opt\consul\nomad stop discovery
+c:\opt\consul\nomad stop broker
+c:\opt\consul\nomad stop authorizer
+c:\opt\consul\nomad stop nginx
+rmdir /S /Q %receiver_root_folder%
+rmdir /S /Q c:\tmp\asapo\test_in\test1
+rmdir /S /Q c:\tmp\asapo\test_in\test2
+Taskkill /IM "%producer_short_name%" /F
+
+del /f token
+echo db.dropDatabase() | %mongo_exe% %beamtime_id%
+
+
+
diff --git a/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in
new file mode 100644
index 000000000..22b699c17
--- /dev/null
+++ b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in
@@ -0,0 +1,12 @@
+{
+ "AsapoEndpoint":"localhost:8400",
+ "Tag":"test_tag",
+ "BeamtimeID":"asapo_test",
+ "Mode":"tcp",
+ "NThreads":1,
+ "LogLevel":"debug",
+ "RootMonitoredFolder":"@ROOT_PATH@test_in",
+ "MonitoredSubFolders":["test1"],
+ "IgnoreExtentions":["tmp"],
+ "RemoveAfterSend":true
+}
diff --git a/tests/automatic/bug_fixes/receiver_cpu_usage/CMakeLists.txt b/tests/automatic/bug_fixes/receiver_cpu_usage/CMakeLists.txt
new file mode 100644
index 000000000..f6bf9f8cf
--- /dev/null
+++ b/tests/automatic/bug_fixes/receiver_cpu_usage/CMakeLists.txt
@@ -0,0 +1,11 @@
+set(TARGET_NAME receiver_cpu_usage)
+
+################################
+# Testing
+################################
+prepare_asapo()
+set (ROOT_PATH "/tmp/asapo/")
+
+configure_file(test.json.in test.json @ONLY)
+
+add_script_test("${TARGET_NAME}" "$<TARGET_FILE:event-monitor-producer-bin> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME>" nomem)
diff --git a/tests/automatic/bug_fixes/receiver_cpu_usage/check_linux.sh b/tests/automatic/bug_fixes/receiver_cpu_usage/check_linux.sh
new file mode 100644
index 000000000..52228deb2
--- /dev/null
+++ b/tests/automatic/bug_fixes/receiver_cpu_usage/check_linux.sh
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+
+set -e
+
+trap Cleanup EXIT
+
+beamtime_id=asapo_test
+
+monitor_database_name=db_test
+proxy_address=127.0.0.1:8400
+
+beamline=test
+receiver_root_folder=/tmp/asapo/receiver/files
+receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id}
+
+mkdir -p /tmp/asapo/test_in/test1/
+
+Cleanup() {
+    echo cleanup
+    kill $producerid
+    rm -rf /tmp/asapo/test_in/test1
+    rm -rf ${receiver_root_folder}
+    nomad stop nginx
+    nomad stop receiver
+    nomad stop discovery
+    nomad stop authorizer
+    echo "db.dropDatabase()" | mongo ${beamtime_id}
+}
+
+nomad run nginx.nmd
+nomad run authorizer.nmd
+nomad run receiver.nmd
+nomad run discovery.nmd
+
+sleep 1
+
+#producer
+mkdir -p ${receiver_folder}
+$1 test.json &
+producerid=`echo $!`
+
+sleep 1
+
+echo hello > /tmp/asapo/test_in/test1/file1
+
+sleep 5
+
+usage=`top -b -n 1 | grep receiver | awk  '{print int($9)'}`
+echo CPU usage: $usage
+if [ -z "$usage" ]; then
+exit 1
+fi
+
+if (( $usage > 50 )); then
+    exit 1
+fi
diff --git a/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in
new file mode 100644
index 000000000..22b699c17
--- /dev/null
+++ b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in
@@ -0,0 +1,12 @@
+{
+ "AsapoEndpoint":"localhost:8400",
+ "Tag":"test_tag",
+ "BeamtimeID":"asapo_test",
+ "Mode":"tcp",
+ "NThreads":1,
+ "LogLevel":"debug",
+ "RootMonitoredFolder":"@ROOT_PATH@test_in",
+ "MonitoredSubFolders":["test1"],
+ "IgnoreExtentions":["tmp"],
+ "RemoveAfterSend":true
+}
diff --git a/tests/automatic/settings/discovery_fixed_settings.json b/tests/automatic/settings/discovery_fixed_settings.json
new file mode 100644
index 000000000..25cba6782
--- /dev/null
+++ b/tests/automatic/settings/discovery_fixed_settings.json
@@ -0,0 +1,10 @@
+{
+  "Mode": "consul",
+  "Receiver": {
+    "MaxConnections": 32
+  },
+  "Port": {{ env "NOMAD_PORT_discovery" }},
+  "LogLevel":"debug"
+}
+
+
diff --git a/tests/manual/tests_via_nomad/asapo-test_filegen_filemon.nomad b/tests/manual/tests_via_nomad/asapo-test_filegen_filemon.nomad
index c5181c44b..6ea2313dc 100644
--- a/tests/manual/tests_via_nomad/asapo-test_filegen_filemon.nomad
+++ b/tests/manual/tests_via_nomad/asapo-test_filegen_filemon.nomad
@@ -20,9 +20,10 @@ job "asapo-test" {
         command = "local/filegen_win.exe"
         args = [
           "1",
-          "1M",
-          "10",
-          "c:/tmp/asapo/test_in/test_folder/file_win"]
+          "10M",
+          "10000",
+          "120",
+          "u:/asapo/test_folder/file_win"]
       }
 
       artifact {
@@ -57,9 +58,10 @@ job "asapo-test" {
         command = "local/filegen_linux"
         args = [
           "1",
-          "1M",
-          "10",
-          "/tmp/asapo/test_in/test_folder/file_lin_"]
+          "10M",
+          "10000",
+          "120",
+          "/tmp/asapo/test_in/test_folder/file_lin"]
       }
 
       artifact {
@@ -100,13 +102,13 @@ job "asapo-test" {
           "30000"]
       }
 
-      resources {
-        cpu = 5000
-        memory = 128
-        network {
-          mbits = 10000
-        }
-      }
+#      resources {
+#        cpu = 5000
+#        memory = 128
+#        network {
+#          mbits = 10000
+#        }
+#      }
 
       artifact {
         source = "http://nims.desy.de/extra/asapo/getnext_broker"
@@ -144,13 +146,13 @@ job "asapo-test" {
           "yzgAcLmijSLWIm8dBiGNCbc0i42u5HSm-zR6FRqo__Y=",
           "30000"]
       }
-      resources {
-        cpu = 5000
-        memory = 128
-        network {
-          mbits = 10000
-        }
-      }
+#      resources {
+#        cpu = 5000
+#        memory = 128
+#        network {
+#          mbits = 10000
+#        }
+#      }
 
       artifact {
         source = "http://nims.desy.de/extra/asapo/getnext_broker"
diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad
index eb22e67a0..fcde0f344 100644
--- a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad
+++ b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad
@@ -36,7 +36,7 @@ job "asapo-filemon-producer" {
  "Mode":"tcp",
  "NThreads":8,
  "LogLevel":"debug",
- "RootMonitoredFolder":"c:\\tmp\\asapo\\test_in",
+ "RootMonitoredFolder":"u:\\asapo",
  "MonitoredSubFolders":["test_folder"],
  "IgnoreExtentions":["tmp"],
  "RemoveAfterSend":true
-- 
GitLab