From 980d971219e3d74371a57b8a37057b7250cc1047 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 15 Aug 2018 12:06:40 +0200
Subject: [PATCH] journal write concern for receiver's mongo client, test to
 recover after mongo restart

---
 CMakeModules/prepare_asapo.cmake              |  1 +
 common/cpp/src/database/mongodb_client.cpp    |  7 +-
 common/cpp/src/database/mongodb_client.h      |  1 +
 config/nomad/mongo.nmd.in                     | 53 ++++++++++++++
 deploy/nomad_jobs/asapo-mongo.nmd.in          |  2 +
 producer/api/src/request_handler_tcp.cpp      |  3 +-
 receiver/src/request_handler_db_write.cpp     |  6 +-
 receiver/src/requests_dispatcher.cpp          |  2 +-
 .../unittests/test_requests_dispatcher.cpp    |  2 +-
 .../producer_receiver/CMakeLists.txt          |  1 +
 .../check_mongo_restart/CMakeLists.txt        |  7 ++
 .../check_mongo_restart/check_linux.sh        | 70 +++++++++++++++++++
 12 files changed, 149 insertions(+), 6 deletions(-)
 create mode 100644 config/nomad/mongo.nmd.in
 create mode 100644 tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt
 create mode 100644 tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh

diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake
index 17681e5bd..449796fbd 100644
--- a/CMakeModules/prepare_asapo.cmake
+++ b/CMakeModules/prepare_asapo.cmake
@@ -11,6 +11,7 @@ function(prepare_asapo)
         configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json.tpl.lin receiver.json.tpl COPYONLY)
     endif()
     configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in  receiver.nmd @ONLY)
+    configure_file(${CMAKE_SOURCE_DIR}/config/nomad/mongo.nmd.in  mongo.nmd @ONLY)
     configure_file(${CMAKE_SOURCE_DIR}/config/nomad/discovery.nmd.in  discovery.nmd @ONLY)
     configure_file(${CMAKE_SOURCE_DIR}/config/nomad/authorizer.nmd.in  authorizer.nmd @ONLY)
     configure_file(${CMAKE_SOURCE_DIR}/config/nomad/broker.nmd.in  broker.nmd @ONLY)
diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp
index b5a29568e..6bd22a3f7 100644
--- a/common/cpp/src/database/mongodb_client.cpp
+++ b/common/cpp/src/database/mongodb_client.cpp
@@ -47,6 +47,11 @@ void MongoDBClient::InitializeCollection(const string& database_name,
                                          const string& collection_name) {
     collection_ = mongoc_client_get_collection (client_, database_name.c_str(),
                                                 collection_name.c_str());
+
+    write_concern_ = mongoc_write_concern_new ();
+    mongoc_write_concern_set_w (write_concern_, MONGOC_WRITE_CONCERN_W_DEFAULT);
+    mongoc_write_concern_set_journal (write_concern_, true);
+    mongoc_collection_set_write_concern (collection_, write_concern_);
 }
 
 Error MongoDBClient::TryConnectDatabase() {
@@ -82,6 +87,7 @@ string MongoDBClient::DBAddress(const string& address) const {
 }
 
 void MongoDBClient::CleanUp() {
+    mongoc_write_concern_destroy(write_concern_);
     mongoc_collection_destroy (collection_);
     mongoc_client_destroy (client_);
 }
@@ -105,7 +111,6 @@ Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_dupl
         if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) {
             return ignore_duplicates ? nullptr : TextError(DBError::kDuplicateID);
         }
-
         return TextError(std::string(DBError::kInsertError) + " - " + mongo_err.message);
     }
 
diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h
index 7d9d38c08..8ce6ec965 100644
--- a/common/cpp/src/database/mongodb_client.h
+++ b/common/cpp/src/database/mongodb_client.h
@@ -43,6 +43,7 @@ class MongoDBClient final : public Database {
   private:
     mongoc_client_t* client_{nullptr};
     mongoc_collection_t* collection_{nullptr};
+    mongoc_write_concern_t* write_concern_;
     bool connected_{false};
     void CleanUp();
     std::string DBAddress(const std::string& address) const;
diff --git a/config/nomad/mongo.nmd.in b/config/nomad/mongo.nmd.in
new file mode 100644
index 000000000..cdac5fe4a
--- /dev/null
+++ b/config/nomad/mongo.nmd.in
@@ -0,0 +1,53 @@
+job "mongo" {
+  datacenters = ["dc1"]
+
+  group "mongo" {
+    count = 1
+
+    restart {
+      attempts = 2
+      interval = "3m"
+      delay = "15s"
+      mode = "delay"
+    }
+
+    task "mongo" {
+      driver = "docker"
+
+      config {
+        network_mode = "host"
+        image = "mongo:4.0.0"
+        volumes = ["/tmp/mongo:/data/db"]
+        args = ["--port","27016"]
+      }
+
+      resources {
+        cpu    = 1500
+        memory = 1256
+        network {
+          port "mongo" {
+          static = 27016
+          }
+        }
+      }
+
+      service {
+        port = "mongo"
+        name = "mongo"
+        check {
+          type     = "script"
+          name     = "alive"
+          command  = "mongo"
+          args     = ["--eval","db.version()"]
+          interval = "10s"
+          timeout  = "5s"
+        }
+        check_restart {
+          limit = 2
+          grace = "90s"
+          ignore_warnings = false
+        }
+      }
+    }
+  }
+}
diff --git a/deploy/nomad_jobs/asapo-mongo.nmd.in b/deploy/nomad_jobs/asapo-mongo.nmd.in
index 87d25a018..be05168e3 100644
--- a/deploy/nomad_jobs/asapo-mongo.nmd.in
+++ b/deploy/nomad_jobs/asapo-mongo.nmd.in
@@ -28,6 +28,8 @@ job "asapo-mongo" {
       }
 
       resources {
+        cpu    = 1500
+        memory = 12560
         network {
           port "mongo" {
           static = 27017
diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp
index 043b1881a..ecac295c8 100644
--- a/producer/api/src/request_handler_tcp.cpp
+++ b/producer/api/src/request_handler_tcp.cpp
@@ -171,7 +171,8 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) {
         auto err = TrySendToReceiver(request);
         if (ServerError(err))  {
             Disconnect();
-            log__->Debug("cannot send data to " + receiver_uri + ": " + err->Explain());
+            log__->Debug("cannot send data id " + std::to_string(request->header.data_id) + " to " + receiver_uri + ": " +
+                         err->Explain());
             continue;
         }
 
diff --git a/receiver/src/request_handler_db_write.cpp b/receiver/src/request_handler_db_write.cpp
index 7da85f40d..96d6b4855 100644
--- a/receiver/src/request_handler_db_write.cpp
+++ b/receiver/src/request_handler_db_write.cpp
@@ -18,9 +18,11 @@ Error RequestHandlerDbWrite::ProcessRequest(Request* request) const {
     file_info.name = request->GetFileName();
     file_info.size = request->GetDataSize();
     file_info.id = request->GetDataID();
-    auto err =  db_client__->Insert(file_info, false);
+    // todo: create flag ignore dups, allow dups for attempts to resend data
+    auto err =  db_client__->Insert(file_info, true);
     if (!err) {
-        log__->Debug(std::string{"insert record to "} + kDBCollectionName + " in " + db_name_ +
+        log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + kDBCollectionName + " in " +
+                     db_name_ +
                      " at " + GetReceiverConfig()->broker_db_uri);
     }
     return err;
diff --git a/receiver/src/requests_dispatcher.cpp b/receiver/src/requests_dispatcher.cpp
index ddd2ad76b..a9abf4c3b 100644
--- a/receiver/src/requests_dispatcher.cpp
+++ b/receiver/src/requests_dispatcher.cpp
@@ -28,7 +28,7 @@ NetworkErrorCode GetNetworkCodeFromError(const Error& err) {
 }
 
 Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept {
-    log__->Debug("processing request from " + producer_uri_ );
+    log__->Debug("processing request id " + std::to_string(request->GetDataID()) + " from " + producer_uri_ );
     Error handle_err;
     handle_err = request->Handle(statistics__);
     GenericNetworkResponse generic_response;
diff --git a/receiver/unittests/test_requests_dispatcher.cpp b/receiver/unittests/test_requests_dispatcher.cpp
index 561bed160..ab08460ae 100644
--- a/receiver/unittests/test_requests_dispatcher.cpp
+++ b/receiver/unittests/test_requests_dispatcher.cpp
@@ -149,7 +149,7 @@ class RequestsDispatcherTests : public Test {
 
     }
     void MockHandleRequest(bool error, Error err = asapo::IOErrorTemplates::kUnknownIOError.Generate() ) {
-        EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request from"), HasSubstr(connected_uri))));
+        EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri))));
 
         EXPECT_CALL(mock_request, Handle_t()).WillOnce(
             Return(error ? err.release() : nullptr)
diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt
index ad3639491..70f089ea8 100644
--- a/tests/automatic/producer_receiver/CMakeLists.txt
+++ b/tests/automatic/producer_receiver/CMakeLists.txt
@@ -1,4 +1,5 @@
 add_subdirectory(transfer_single_file)
 if (UNIX)
     add_subdirectory(check_monitoring)
+    add_subdirectory(check_mongo_restart)
 endif()
\ No newline at end of file
diff --git a/tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt b/tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt
new file mode 100644
index 000000000..e76f25ff4
--- /dev/null
+++ b/tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(TARGET_NAME check-mongo-restart)
+
+################################
+# Testing
+################################
+prepare_asapo()
+add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem)
diff --git a/tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh b/tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh
new file mode 100644
index 000000000..926b36389
--- /dev/null
+++ b/tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh
@@ -0,0 +1,70 @@
+#!/usr/bin/env bash
+
+set -e
+
+trap Cleanup EXIT
+
+function wait_mongo {
+NEXT_WAIT_TIME=0
+until mongo --port 27016 --eval "db.version()" | tail -1 | grep 4.0.0 || [ $NEXT_WAIT_TIME -eq 30 ]; do
+  echo "Wait for mongo"
+  NEXT_WAIT_TIME=$(( NEXT_WAIT_TIME++ ))
+  sleep 1
+done
+if (( NEXT_WAIT_TIME == 30 )); then
+    echo "Timeout"
+    exit -1
+fi
+}
+
+
+database_name=db_test
+beamtime_id=asapo_test
+beamline=test
+
+Cleanup() {
+	echo cleanup
+    nomad stop receiver
+    nomad stop discovery
+    nomad stop authorizer
+    nomad stop nginx
+    echo "db.dropDatabase()" | mongo --port 27016 ${beamtime_id}
+    nomad stop mongo
+}
+
+nomad run mongo.nmd
+wait_mongo
+# create db before worker starts reading it. todo: git rid of it
+echo "db.${beamtime_id}.insert({dummy:1})" | mongo --port 27016 ${beamtime_id}
+
+sed -i 's/27017/27016/g' receiver.json.tpl
+sed -i 's/"WriteToDisk":true/"WriteToDisk":false/g' receiver.json.tpl
+
+
+nomad run authorizer.nmd
+nomad run nginx.nmd
+nomad run receiver.nmd
+nomad run discovery.nmd
+
+sleep 1
+
+nfiles=1000
+
+$1 localhost:8400 ${beamtime_id} 100 $nfiles 1  0 200 &
+
+sleep 0.1
+
+docker rm -f  `docker ps | grep mongo | awk '{print $1;}'`
+sleep 1
+
+wait
+
+echo "db.data.validate(true)" | mongo --port 27016 ${beamtime_id}
+
+echo processed files:
+echo "db.data.count()" | mongo --port 27016 ${beamtime_id}
+
+
+echo "db.data.count()" | mongo --port 27016 ${beamtime_id} | grep $nfiles
+
+
-- 
GitLab