From fefe1d19598858b861fa0342662ae920501cf301 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 23 May 2018 13:29:43 +0200
Subject: [PATCH] start converting integration tests into nomad

---
 common/cpp/src/json_parser/rapid_json.cpp     |  4 +-
 common/cpp/src/json_parser/rapid_json.h       |  1 +
 config/nomad/receiver.nmd.in                  | 46 +++++++++++++++++++
 .../api/include/producer/producer_error.h     |  6 +--
 .../api/src/receiver_discovery_service.cpp    |  2 +-
 producer/api/src/request_handler_tcp.cpp      |  4 +-
 producer/api/src/request_pool.cpp             |  6 ++-
 producer/api/unittests/test_request_pool.cpp  |  2 +-
 .../check_monitoring/CMakeLists.txt           |  9 ++--
 .../check_monitoring/check_linux.sh           | 14 +++---
 .../settings/discovery_settings.json          |  3 +-
 tests/automatic/settings/receiver.json.tpl    | 10 ++++
 12 files changed, 84 insertions(+), 23 deletions(-)
 create mode 100644 config/nomad/receiver.nmd.in
 create mode 100644 tests/automatic/settings/receiver.json.tpl

diff --git a/common/cpp/src/json_parser/rapid_json.cpp b/common/cpp/src/json_parser/rapid_json.cpp
index cf47a597c..31fcd5842 100644
--- a/common/cpp/src/json_parser/rapid_json.cpp
+++ b/common/cpp/src/json_parser/rapid_json.cpp
@@ -36,7 +36,7 @@ Error RapidJson::LazyInitialize()const noexcept {
     return nullptr;
 }
 
-asapo::Error CheckValueType(const std::string& name, ValueType type, const Value* val) {
+asapo::Error RapidJson::CheckValueType(const std::string& name, ValueType type, const Value* val) const {
     bool res = false;
     switch (type) {
     case ValueType::kObject:
@@ -56,7 +56,7 @@ asapo::Error CheckValueType(const std::string& name, ValueType type, const Value
         break;
     }
     if (!res) {
-        return TextError("wrong type: " + name);
+        return TextError("wrong type: " + name + " in: " + json_);
     }
 
     return nullptr;
diff --git a/common/cpp/src/json_parser/rapid_json.h b/common/cpp/src/json_parser/rapid_json.h
index bb23bd730..ca0c0b054 100644
--- a/common/cpp/src/json_parser/rapid_json.h
+++ b/common/cpp/src/json_parser/rapid_json.h
@@ -33,6 +33,7 @@ class RapidJson {
     std::string json_;
     mutable bool initialized_ = false;
     Error LazyInitialize() const noexcept;
+    Error CheckValueType(const std::string& name, ValueType type, const rapidjson::Value* val) const;
     Error embedded_error_ = nullptr;
 
     asapo::Error GetValuePointer(const std::string& name, ValueType type, rapidjson::Value** val)const noexcept;
diff --git a/config/nomad/receiver.nmd.in b/config/nomad/receiver.nmd.in
new file mode 100644
index 000000000..1d9c1675e
--- /dev/null
+++ b/config/nomad/receiver.nmd.in
@@ -0,0 +1,46 @@
+job "receiver" {
+  datacenters = ["dc1"]
+
+  type = "service"
+
+  group "group" {
+    count = 1
+
+    task "service" {
+      driver = "raw_exec"
+
+      config {
+        command = "/bin/bash",
+        args =  ["-c", "mkdir files && exec ${RECEIVER_DIR}/${RECEIVER_NAME} receiver.json && rm -rf files"]
+      }
+
+      resources {
+        cpu    = 500 # 500 MHz
+        memory = 256 # 256MB
+        network {
+          port "recv" {}
+        }
+      }
+
+      service {
+        name = "receiver"
+        port = "recv"
+        check {
+          name     = "alive"
+          type     = "tcp"
+          interval = "10s"
+          timeout  = "2s"
+          initial_status =   "passing"
+        }
+      }
+
+      template {
+         source        = "${CMAKE_CURRENT_BINARY_DIR}/receiver.json.tpl"
+         destination   = "receiver.json"
+         change_mode   = "signal"
+         change_signal = "SIGHUP"
+      }
+
+    }
+  }
+}
diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h
index a7d93435c..5a0641b7d 100644
--- a/producer/api/include/producer/producer_error.h
+++ b/producer/api/include/producer/producer_error.h
@@ -10,7 +10,7 @@ enum class ProducerErrorType {
     kConnectionNotReady,
     kFileTooLarge,
     kFileIdAlreadyInUse,
-    kUnknownServerError,
+    kInternalServerError,
     kCannotSendDataToReceivers,
     kRequestPoolIsFull
 };
@@ -74,8 +74,8 @@ auto const kFileIdAlreadyInUse = ProducerErrorTemplate {
     "File already in use", ProducerErrorType::kFileIdAlreadyInUse
 };
 
-auto const kUnknownServerError = ProducerErrorTemplate {
-    "Unknown server error", ProducerErrorType::kUnknownServerError
+auto const kInternalServerError = ProducerErrorTemplate {
+    "Internal server error", ProducerErrorType::kInternalServerError
 };
 
 auto const kCannotSendDataToReceivers = ProducerErrorTemplate {
diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp
index 564313444..62e6a7a35 100644
--- a/producer/api/src/receiver_discovery_service.cpp
+++ b/producer/api/src/receiver_discovery_service.cpp
@@ -42,7 +42,6 @@ Error ReceiverDiscoveryService::UpdateFromEndpoint(ReceiversList* list, uint64_t
     if (code != HttpCode::OK) {
         return TextError(responce);
     }
-
     return ParseResponse(responce, list, max_connections);
 
 }
@@ -59,6 +58,7 @@ void ReceiverDiscoveryService::ThreadHandler() {
             lock.lock();
             continue;
         }
+        log__->Debug("got receivers from " + endpoint_ );
         lock.lock();
         max_connections_ = max_connections;
         uri_list_ = uris;
diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp
index fbdd30225..b382e0010 100644
--- a/producer/api/src/request_handler_tcp.cpp
+++ b/producer/api/src/request_handler_tcp.cpp
@@ -56,7 +56,7 @@ Error RequestHandlerTcp::ReceiveResponse(const std::string& receiver_address) {
         if(sendDataResponse.error_code == kNetErrorFileIdAlreadyInUse) {
             return ProducerErrorTemplates::kFileIdAlreadyInUse.Generate();
         }
-        return ProducerErrorTemplates::kUnknownServerError.Generate();
+        return ProducerErrorTemplates::kInternalServerError.Generate();
     }
     return nullptr;
 }
@@ -69,6 +69,7 @@ Error RequestHandlerTcp::TrySendToReceiver(const Request* request, const std::st
 
     err = ReceiveResponse(receiver_address);
     if (err)  {
+        log__->Debug("cannot send data to " + receiver_address + ": " + err->Explain());
         return err;
     }
 
@@ -121,6 +122,7 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) {
         if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse)  {
             io__->CloseSocket(sd_, nullptr);
             sd_ = kDisconnectedSocketDescriptor;
+            log__->Debug("disconnected from  " + receiver_uri);
             continue;
         }
 
diff --git a/producer/api/src/request_pool.cpp b/producer/api/src/request_pool.cpp
index 1825a0a1f..a5598feed 100644
--- a/producer/api/src/request_pool.cpp
+++ b/producer/api/src/request_pool.cpp
@@ -49,7 +49,9 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_
     thread_info->lock.lock();
     request_handler->TearDownProcessingRequestLocked(err);
     if (err) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
         PutRequestBackToQueue(std::move(request));
+        condition_.notify_all();
     }
 }
 
@@ -58,11 +60,11 @@ void RequestPool::ThreadHandler(uint64_t id) {
     thread_info.lock =  std::unique_lock<std::mutex>(mutex_);
     auto request_handler = request_handler_factory__->NewRequestHandler(id, &shared_counter_);
     do {
-        condition_.wait(thread_info.lock, [this, &request_handler] {
+        auto do_work = condition_.wait_for(thread_info.lock, std::chrono::milliseconds(100), [this, &request_handler] {
             return (CanProcessRequest(request_handler) || quit_);
         });
         //after wait, we own the lock
-        if (!quit_) {
+        if (!quit_ && do_work) {
             ProcessRequest(request_handler, &thread_info);
         };
     } while (!quit_);
diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp
index 5b2c8f3bb..64a0b70e6 100644
--- a/producer/api/unittests/test_request_pool.cpp
+++ b/producer/api/unittests/test_request_pool.cpp
@@ -80,7 +80,7 @@ TEST(RequestPool, Constructor) {
     ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr));
 }
 
-TEST_F(RequestPoolTests, AddRequestDoesGoFurtherWhenNotReady) {
+TEST_F(RequestPoolTests, AddRequestDoesNotGoFurtherWhenNotReady) {
 
     EXPECT_CALL(*mock_request_handler, ReadyProcessRequest()).Times(AtLeast(1)).WillRepeatedly(Return(false));
     EXPECT_CALL(*mock_request_handler, PrepareProcessingRequestLocked()).Times(0);
diff --git a/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt
index c7be857c5..5b43f59df 100644
--- a/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt
+++ b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt
@@ -3,7 +3,10 @@ set(TARGET_NAME receiver)
 ################################
 # Testing
 ################################
-configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json receiver.json COPYONLY)
+get_target_property(RECEIVER_DIR receiver-bin BINARY_DIR)
+get_target_property(RECEIVER_NAME receiver-bin OUTPUT_NAME)
+
+configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in  receiver.nmd)
+configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json.tpl receiver.json.tpl COPYONLY)
 configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/discovery_settings.json discovery.json COPYONLY)
-add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:asapo-discovery,EXENAME>" nomem
-        )
+add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:asapo-discovery,EXENAME>" nomem)
diff --git a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh
index 16cc1bdc9..39597e47a 100644
--- a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh
+++ b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh
@@ -10,23 +10,21 @@ trap Cleanup EXIT
 Cleanup() {
 	echo cleanup
 	influx -execute "drop database ${database_name}"
-    kill $receiverid
+#    kill $receiverid
+    nomad stop receiver
     kill $discoveryid
-	rm -rf files
     echo "db.dropDatabase()" | mongo ${mongo_database_name}
 }
 
 influx -execute "create database ${database_name}"
 
-nohup $3 -config discovery.json &>/dev/null &
-discoveryid=`echo $!`
+nomad run receiver.nmd
 sleep 0.3
 
-nohup $2 receiver.json &>/dev/null &
-sleep 0.3
-receiverid=`echo $!`
+nohup $3 -config discovery.json &>/dev/null &
+discoveryid=`echo $!`
 
-mkdir files
+sleep 1
 
 $1 localhost:5006 100 112 4  0
 
diff --git a/tests/automatic/settings/discovery_settings.json b/tests/automatic/settings/discovery_settings.json
index 379b5bfa0..bcd04b3e0 100644
--- a/tests/automatic/settings/discovery_settings.json
+++ b/tests/automatic/settings/discovery_settings.json
@@ -1,7 +1,6 @@
 {
   "MaxConnections": 32,
-  "Endpoints": ["localhost:4200"],
-  "Mode": "static",
+  "Mode": "consul",
   "Port":5006,
   "LogLevel":"debug"
 }
\ No newline at end of file
diff --git a/tests/automatic/settings/receiver.json.tpl b/tests/automatic/settings/receiver.json.tpl
new file mode 100644
index 000000000..0d0abd37e
--- /dev/null
+++ b/tests/automatic/settings/receiver.json.tpl
@@ -0,0 +1,10 @@
+{
+  "MonitorDbAddress":"localhost:8086",
+  "MonitorDbName": "db_test",
+  "BrokerDbAddress":"localhost:27017",
+  "BrokerDbName": "test_run",
+  "ListenPort": {{ env "NOMAD_PORT_recv" }},
+  "WriteToDisk":true,
+  "WriteToDb":true,
+  "LogLevel" : "info"
+}
-- 
GitLab