diff --git a/CMakeModules/prepare_asapo.cmake b/CMakeModules/prepare_asapo.cmake index 17681e5bd0645f497429addf73575c4728c531d6..449796fbd199f9c456aec90073a9762cf0b3e7cb 100644 --- a/CMakeModules/prepare_asapo.cmake +++ b/CMakeModules/prepare_asapo.cmake @@ -11,6 +11,7 @@ function(prepare_asapo) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json.tpl.lin receiver.json.tpl COPYONLY) endif() configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd @ONLY) + configure_file(${CMAKE_SOURCE_DIR}/config/nomad/mongo.nmd.in mongo.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/discovery.nmd.in discovery.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/authorizer.nmd.in authorizer.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/broker.nmd.in broker.nmd @ONLY) diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp index b5a29568e08e051547edc7765d1f7a944d5837bf..6bd22a3f7501f2c9340e40f09787007e8444db63 100644 --- a/common/cpp/src/database/mongodb_client.cpp +++ b/common/cpp/src/database/mongodb_client.cpp @@ -47,6 +47,11 @@ void MongoDBClient::InitializeCollection(const string& database_name, const string& collection_name) { collection_ = mongoc_client_get_collection (client_, database_name.c_str(), collection_name.c_str()); + + write_concern_ = mongoc_write_concern_new (); + mongoc_write_concern_set_w (write_concern_, MONGOC_WRITE_CONCERN_W_DEFAULT); + mongoc_write_concern_set_journal (write_concern_, true); + mongoc_collection_set_write_concern (collection_, write_concern_); } Error MongoDBClient::TryConnectDatabase() { @@ -82,6 +87,7 @@ string MongoDBClient::DBAddress(const string& address) const { } void MongoDBClient::CleanUp() { + mongoc_write_concern_destroy(write_concern_); mongoc_collection_destroy (collection_); mongoc_client_destroy (client_); } @@ -105,7 +111,6 @@ Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_dupl if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) { return ignore_duplicates ? nullptr : TextError(DBError::kDuplicateID); } - return TextError(std::string(DBError::kInsertError) + " - " + mongo_err.message); } diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h index 7d9d38c08bbd10e84d4b5b6c1649fb95bfb4f017..8ce6ec965cdc63e03c47cc4b14f594ee905e981d 100644 --- a/common/cpp/src/database/mongodb_client.h +++ b/common/cpp/src/database/mongodb_client.h @@ -43,6 +43,7 @@ class MongoDBClient final : public Database { private: mongoc_client_t* client_{nullptr}; mongoc_collection_t* collection_{nullptr}; + mongoc_write_concern_t* write_concern_; bool connected_{false}; void CleanUp(); std::string DBAddress(const std::string& address) const; diff --git a/config/nomad/mongo.nmd.in b/config/nomad/mongo.nmd.in new file mode 100644 index 0000000000000000000000000000000000000000..cdac5fe4a0c48b0e08eb1a36c3fee276d3a799b1 --- /dev/null +++ b/config/nomad/mongo.nmd.in @@ -0,0 +1,53 @@ +job "mongo" { + datacenters = ["dc1"] + + group "mongo" { + count = 1 + + restart { + attempts = 2 + interval = "3m" + delay = "15s" + mode = "delay" + } + + task "mongo" { + driver = "docker" + + config { + network_mode = "host" + image = "mongo:4.0.0" + volumes = ["/tmp/mongo:/data/db"] + args = ["--port","27016"] + } + + resources { + cpu = 1500 + memory = 1256 + network { + port "mongo" { + static = 27016 + } + } + } + + service { + port = "mongo" + name = "mongo" + check { + type = "script" + name = "alive" + command = "mongo" + args = ["--eval","db.version()"] + interval = "10s" + timeout = "5s" + } + check_restart { + limit = 2 + grace = "90s" + ignore_warnings = false + } + } + } + } +} diff --git a/deploy/nomad_jobs/asapo-mongo.nmd.in b/deploy/nomad_jobs/asapo-mongo.nmd.in index 87d25a01820d7d8f5a8e36245e65f2dc09b3a7c6..be05168e3d880c9d15c607985853409f78cb1e11 100644 --- a/deploy/nomad_jobs/asapo-mongo.nmd.in +++ b/deploy/nomad_jobs/asapo-mongo.nmd.in @@ -28,6 +28,8 @@ job "asapo-mongo" { } resources { + cpu = 1500 + memory = 12560 network { port "mongo" { static = 27017 diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index 043b1881a1cc1d5b4b1b4ad741d4d00c11da3dd0..ecac295c84b34621a1a07171bd485fcfebfb3157 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -171,7 +171,8 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { auto err = TrySendToReceiver(request); if (ServerError(err)) { Disconnect(); - log__->Debug("cannot send data to " + receiver_uri + ": " + err->Explain()); + log__->Debug("cannot send data id " + std::to_string(request->header.data_id) + " to " + receiver_uri + ": " + + err->Explain()); continue; } diff --git a/receiver/src/request_handler_db_write.cpp b/receiver/src/request_handler_db_write.cpp index 7da85f40d12bd794abe320df6e4b52a7738fbf50..96d6b48555f811dc47f3ceb9487b10ba5fb77ec6 100644 --- a/receiver/src/request_handler_db_write.cpp +++ b/receiver/src/request_handler_db_write.cpp @@ -18,9 +18,11 @@ Error RequestHandlerDbWrite::ProcessRequest(Request* request) const { file_info.name = request->GetFileName(); file_info.size = request->GetDataSize(); file_info.id = request->GetDataID(); - auto err = db_client__->Insert(file_info, false); + // todo: create flag ignore dups, allow dups for attempts to resend data + auto err = db_client__->Insert(file_info, true); if (!err) { - log__->Debug(std::string{"insert record to "} + kDBCollectionName + " in " + db_name_ + + log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + kDBCollectionName + " in " + + db_name_ + " at " + GetReceiverConfig()->broker_db_uri); } return err; diff --git a/receiver/src/requests_dispatcher.cpp b/receiver/src/requests_dispatcher.cpp index ddd2ad76b415b40b65622e4c4fca241fb33798c3..a9abf4c3b517bdf0262f34b4ea6ac671982fd279 100644 --- a/receiver/src/requests_dispatcher.cpp +++ b/receiver/src/requests_dispatcher.cpp @@ -28,7 +28,7 @@ NetworkErrorCode GetNetworkCodeFromError(const Error& err) { } Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept { - log__->Debug("processing request from " + producer_uri_ ); + log__->Debug("processing request id " + std::to_string(request->GetDataID()) + " from " + producer_uri_ ); Error handle_err; handle_err = request->Handle(statistics__); GenericNetworkResponse generic_response; diff --git a/receiver/unittests/test_requests_dispatcher.cpp b/receiver/unittests/test_requests_dispatcher.cpp index 561bed160e101ac87224256c608a0574fb489efe..ab08460aed608245475260810f0505b610c406d2 100644 --- a/receiver/unittests/test_requests_dispatcher.cpp +++ b/receiver/unittests/test_requests_dispatcher.cpp @@ -149,7 +149,7 @@ class RequestsDispatcherTests : public Test { } void MockHandleRequest(bool error, Error err = asapo::IOErrorTemplates::kUnknownIOError.Generate() ) { - EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request from"), HasSubstr(connected_uri)))); + EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri)))); EXPECT_CALL(mock_request, Handle_t()).WillOnce( Return(error ? err.release() : nullptr) diff --git a/tests/automatic/producer_receiver/CMakeLists.txt b/tests/automatic/producer_receiver/CMakeLists.txt index ad36394912da0e110b2707f73e0270452030d41f..70f089ea89556bb9f0c722f58a3fa0276018e836 100644 --- a/tests/automatic/producer_receiver/CMakeLists.txt +++ b/tests/automatic/producer_receiver/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(transfer_single_file) if (UNIX) add_subdirectory(check_monitoring) + add_subdirectory(check_mongo_restart) endif() \ No newline at end of file diff --git a/tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt b/tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..e76f25ff44af36e70e25e5440bf17658446fb218 --- /dev/null +++ b/tests/automatic/producer_receiver/check_mongo_restart/CMakeLists.txt @@ -0,0 +1,7 @@ +set(TARGET_NAME check-mongo-restart) + +################################ +# Testing +################################ +prepare_asapo() +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem) diff --git a/tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh b/tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh new file mode 100644 index 0000000000000000000000000000000000000000..926b363891f0fd636bd5ddb0bfce3a14e155e0c1 --- /dev/null +++ b/tests/automatic/producer_receiver/check_mongo_restart/check_linux.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +function wait_mongo { +NEXT_WAIT_TIME=0 +until mongo --port 27016 --eval "db.version()" | tail -1 | grep 4.0.0 || [ $NEXT_WAIT_TIME -eq 30 ]; do + echo "Wait for mongo" + NEXT_WAIT_TIME=$(( NEXT_WAIT_TIME++ )) + sleep 1 +done +if (( NEXT_WAIT_TIME == 30 )); then + echo "Timeout" + exit -1 +fi +} + + +database_name=db_test +beamtime_id=asapo_test +beamline=test + +Cleanup() { + echo cleanup + nomad stop receiver + nomad stop discovery + nomad stop authorizer + nomad stop nginx + echo "db.dropDatabase()" | mongo --port 27016 ${beamtime_id} + nomad stop mongo +} + +nomad run mongo.nmd +wait_mongo +# create db before worker starts reading it. todo: git rid of it +echo "db.${beamtime_id}.insert({dummy:1})" | mongo --port 27016 ${beamtime_id} + +sed -i 's/27017/27016/g' receiver.json.tpl +sed -i 's/"WriteToDisk":true/"WriteToDisk":false/g' receiver.json.tpl + + +nomad run authorizer.nmd +nomad run nginx.nmd +nomad run receiver.nmd +nomad run discovery.nmd + +sleep 1 + +nfiles=1000 + +$1 localhost:8400 ${beamtime_id} 100 $nfiles 1 0 200 & + +sleep 0.1 + +docker rm -f `docker ps | grep mongo | awk '{print $1;}'` +sleep 1 + +wait + +echo "db.data.validate(true)" | mongo --port 27016 ${beamtime_id} + +echo processed files: +echo "db.data.count()" | mongo --port 27016 ${beamtime_id} + + +echo "db.data.count()" | mongo --port 27016 ${beamtime_id} | grep $nfiles + +