Skip to content
Snippets Groups Projects
Commit 980d9712 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

journal write concern for receiver's mongo client, test to recover after mongo restart

parent 0f96af5d
No related branches found
No related tags found
No related merge requests found
Showing with 149 additions and 6 deletions
...@@ -11,6 +11,7 @@ function(prepare_asapo) ...@@ -11,6 +11,7 @@ function(prepare_asapo)
configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json.tpl.lin receiver.json.tpl COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json.tpl.lin receiver.json.tpl COPYONLY)
endif() endif()
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/receiver.nmd.in receiver.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/mongo.nmd.in mongo.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/discovery.nmd.in discovery.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/discovery.nmd.in discovery.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/authorizer.nmd.in authorizer.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/authorizer.nmd.in authorizer.nmd @ONLY)
configure_file(${CMAKE_SOURCE_DIR}/config/nomad/broker.nmd.in broker.nmd @ONLY) configure_file(${CMAKE_SOURCE_DIR}/config/nomad/broker.nmd.in broker.nmd @ONLY)
......
...@@ -47,6 +47,11 @@ void MongoDBClient::InitializeCollection(const string& database_name, ...@@ -47,6 +47,11 @@ void MongoDBClient::InitializeCollection(const string& database_name,
const string& collection_name) { const string& collection_name) {
collection_ = mongoc_client_get_collection (client_, database_name.c_str(), collection_ = mongoc_client_get_collection (client_, database_name.c_str(),
collection_name.c_str()); collection_name.c_str());
write_concern_ = mongoc_write_concern_new ();
mongoc_write_concern_set_w (write_concern_, MONGOC_WRITE_CONCERN_W_DEFAULT);
mongoc_write_concern_set_journal (write_concern_, true);
mongoc_collection_set_write_concern (collection_, write_concern_);
} }
Error MongoDBClient::TryConnectDatabase() { Error MongoDBClient::TryConnectDatabase() {
...@@ -82,6 +87,7 @@ string MongoDBClient::DBAddress(const string& address) const { ...@@ -82,6 +87,7 @@ string MongoDBClient::DBAddress(const string& address) const {
} }
void MongoDBClient::CleanUp() { void MongoDBClient::CleanUp() {
mongoc_write_concern_destroy(write_concern_);
mongoc_collection_destroy (collection_); mongoc_collection_destroy (collection_);
mongoc_client_destroy (client_); mongoc_client_destroy (client_);
} }
...@@ -105,7 +111,6 @@ Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_dupl ...@@ -105,7 +111,6 @@ Error MongoDBClient::InsertBsonDocument(const bson_p& document, bool ignore_dupl
if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) { if (mongo_err.code == MONGOC_ERROR_DUPLICATE_KEY) {
return ignore_duplicates ? nullptr : TextError(DBError::kDuplicateID); return ignore_duplicates ? nullptr : TextError(DBError::kDuplicateID);
} }
return TextError(std::string(DBError::kInsertError) + " - " + mongo_err.message); return TextError(std::string(DBError::kInsertError) + " - " + mongo_err.message);
} }
......
...@@ -43,6 +43,7 @@ class MongoDBClient final : public Database { ...@@ -43,6 +43,7 @@ class MongoDBClient final : public Database {
private: private:
mongoc_client_t* client_{nullptr}; mongoc_client_t* client_{nullptr};
mongoc_collection_t* collection_{nullptr}; mongoc_collection_t* collection_{nullptr};
mongoc_write_concern_t* write_concern_;
bool connected_{false}; bool connected_{false};
void CleanUp(); void CleanUp();
std::string DBAddress(const std::string& address) const; std::string DBAddress(const std::string& address) const;
......
job "mongo" {
datacenters = ["dc1"]
group "mongo" {
count = 1
restart {
attempts = 2
interval = "3m"
delay = "15s"
mode = "delay"
}
task "mongo" {
driver = "docker"
config {
network_mode = "host"
image = "mongo:4.0.0"
volumes = ["/tmp/mongo:/data/db"]
args = ["--port","27016"]
}
resources {
cpu = 1500
memory = 1256
network {
port "mongo" {
static = 27016
}
}
}
service {
port = "mongo"
name = "mongo"
check {
type = "script"
name = "alive"
command = "mongo"
args = ["--eval","db.version()"]
interval = "10s"
timeout = "5s"
}
check_restart {
limit = 2
grace = "90s"
ignore_warnings = false
}
}
}
}
}
...@@ -28,6 +28,8 @@ job "asapo-mongo" { ...@@ -28,6 +28,8 @@ job "asapo-mongo" {
} }
resources { resources {
cpu = 1500
memory = 12560
network { network {
port "mongo" { port "mongo" {
static = 27017 static = 27017
......
...@@ -171,7 +171,8 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { ...@@ -171,7 +171,8 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) {
auto err = TrySendToReceiver(request); auto err = TrySendToReceiver(request);
if (ServerError(err)) { if (ServerError(err)) {
Disconnect(); Disconnect();
log__->Debug("cannot send data to " + receiver_uri + ": " + err->Explain()); log__->Debug("cannot send data id " + std::to_string(request->header.data_id) + " to " + receiver_uri + ": " +
err->Explain());
continue; continue;
} }
......
...@@ -18,9 +18,11 @@ Error RequestHandlerDbWrite::ProcessRequest(Request* request) const { ...@@ -18,9 +18,11 @@ Error RequestHandlerDbWrite::ProcessRequest(Request* request) const {
file_info.name = request->GetFileName(); file_info.name = request->GetFileName();
file_info.size = request->GetDataSize(); file_info.size = request->GetDataSize();
file_info.id = request->GetDataID(); file_info.id = request->GetDataID();
auto err = db_client__->Insert(file_info, false); // todo: create flag ignore dups, allow dups for attempts to resend data
auto err = db_client__->Insert(file_info, true);
if (!err) { if (!err) {
log__->Debug(std::string{"insert record to "} + kDBCollectionName + " in " + db_name_ + log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + kDBCollectionName + " in " +
db_name_ +
" at " + GetReceiverConfig()->broker_db_uri); " at " + GetReceiverConfig()->broker_db_uri);
} }
return err; return err;
......
...@@ -28,7 +28,7 @@ NetworkErrorCode GetNetworkCodeFromError(const Error& err) { ...@@ -28,7 +28,7 @@ NetworkErrorCode GetNetworkCodeFromError(const Error& err) {
} }
Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept { Error RequestsDispatcher::ProcessRequest(const std::unique_ptr<Request>& request) const noexcept {
log__->Debug("processing request from " + producer_uri_ ); log__->Debug("processing request id " + std::to_string(request->GetDataID()) + " from " + producer_uri_ );
Error handle_err; Error handle_err;
handle_err = request->Handle(statistics__); handle_err = request->Handle(statistics__);
GenericNetworkResponse generic_response; GenericNetworkResponse generic_response;
......
...@@ -149,7 +149,7 @@ class RequestsDispatcherTests : public Test { ...@@ -149,7 +149,7 @@ class RequestsDispatcherTests : public Test {
} }
void MockHandleRequest(bool error, Error err = asapo::IOErrorTemplates::kUnknownIOError.Generate() ) { void MockHandleRequest(bool error, Error err = asapo::IOErrorTemplates::kUnknownIOError.Generate() ) {
EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request from"), HasSubstr(connected_uri)))); EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("processing request"), HasSubstr(connected_uri))));
EXPECT_CALL(mock_request, Handle_t()).WillOnce( EXPECT_CALL(mock_request, Handle_t()).WillOnce(
Return(error ? err.release() : nullptr) Return(error ? err.release() : nullptr)
......
add_subdirectory(transfer_single_file) add_subdirectory(transfer_single_file)
if (UNIX) if (UNIX)
add_subdirectory(check_monitoring) add_subdirectory(check_monitoring)
add_subdirectory(check_mongo_restart)
endif() endif()
\ No newline at end of file
set(TARGET_NAME check-mongo-restart)
################################
# Testing
################################
prepare_asapo()
add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer>" nomem)
#!/usr/bin/env bash
set -e
trap Cleanup EXIT
function wait_mongo {
NEXT_WAIT_TIME=0
until mongo --port 27016 --eval "db.version()" | tail -1 | grep 4.0.0 || [ $NEXT_WAIT_TIME -eq 30 ]; do
echo "Wait for mongo"
NEXT_WAIT_TIME=$(( NEXT_WAIT_TIME++ ))
sleep 1
done
if (( NEXT_WAIT_TIME == 30 )); then
echo "Timeout"
exit -1
fi
}
database_name=db_test
beamtime_id=asapo_test
beamline=test
Cleanup() {
echo cleanup
nomad stop receiver
nomad stop discovery
nomad stop authorizer
nomad stop nginx
echo "db.dropDatabase()" | mongo --port 27016 ${beamtime_id}
nomad stop mongo
}
nomad run mongo.nmd
wait_mongo
# create db before worker starts reading it. todo: git rid of it
echo "db.${beamtime_id}.insert({dummy:1})" | mongo --port 27016 ${beamtime_id}
sed -i 's/27017/27016/g' receiver.json.tpl
sed -i 's/"WriteToDisk":true/"WriteToDisk":false/g' receiver.json.tpl
nomad run authorizer.nmd
nomad run nginx.nmd
nomad run receiver.nmd
nomad run discovery.nmd
sleep 1
nfiles=1000
$1 localhost:8400 ${beamtime_id} 100 $nfiles 1 0 200 &
sleep 0.1
docker rm -f `docker ps | grep mongo | awk '{print $1;}'`
sleep 1
wait
echo "db.data.validate(true)" | mongo --port 27016 ${beamtime_id}
echo processed files:
echo "db.data.count()" | mongo --port 27016 ${beamtime_id}
echo "db.data.count()" | mongo --port 27016 ${beamtime_id} | grep $nfiles
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment