From 8ae4208c94376f72382fa9fd3d6762668d485cee Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 22 Sep 2021 10:37:18 +0200 Subject: [PATCH] fix possible race condition and log format --- broker/src/asapo_broker/database/mongodb.go | 14 +++++---- common/cpp/include/asapo/common/io_error.h | 8 +++-- common/cpp/src/logger/spd_logger.cpp | 30 ++++++++++++++++++- .../cpp/src/system_io/system_io_linux_mac.cpp | 2 ++ common/cpp/unittests/logger/test_logger.cpp | 4 +-- 5 files changed, 48 insertions(+), 10 deletions(-) diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index d248f016f..37eb03008 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -75,6 +75,7 @@ const stream_filter_finished = "finished" const stream_filter_unfinished = "unfinished" var dbSessionLock sync.Mutex +var dbClientLock sync.RWMutex type SizeRecord struct { Size int `bson:"size" json:"size"` @@ -101,8 +102,8 @@ func (db *Mongodb) Ping() (err error) { } func (db *Mongodb) Connect(address string) (err error) { - dbSessionLock.Lock() - defer dbSessionLock.Unlock() + dbClientLock.Lock() + defer dbClientLock.Unlock() if db.client != nil { return &DBError{utils.StatusServiceUnavailable, already_connected_msg} @@ -127,8 +128,8 @@ func (db *Mongodb) Connect(address string) (err error) { } func (db *Mongodb) Close() { - dbSessionLock.Lock() - defer dbSessionLock.Unlock() + dbClientLock.Lock() + defer dbClientLock.Unlock() if db.client != nil { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -672,7 +673,7 @@ func (db *Mongodb) resetCounter(request Request) ([]byte, error) { c := db.client.Database(request.DbName).Collection(inprocess_collection_name_prefix + request.Stream + "_" + request.GroupId) _, err_del := c.DeleteMany(context.Background(), bson.M{"_id": bson.M{"$gte": id}}) if err_del != nil { - return nil, &DBError{utils.StatusWrongInput, err.Error()} + return nil, &DBError{utils.StatusWrongInput, err_del.Error()} } return []byte(""), nil @@ -1021,6 +1022,9 @@ func (db *Mongodb) getStreams(request Request) ([]byte, error) { } func (db *Mongodb) ProcessRequest(request Request) (answer []byte, err error) { + dbClientLock.RLock() + defer dbClientLock.RUnlock() + if err := db.checkDatabaseOperationPrerequisites(request); err != nil { return nil, err } diff --git a/common/cpp/include/asapo/common/io_error.h b/common/cpp/include/asapo/common/io_error.h index 7375ec548..e11399e00 100644 --- a/common/cpp/include/asapo/common/io_error.h +++ b/common/cpp/include/asapo/common/io_error.h @@ -27,8 +27,8 @@ enum class IOErrorType { kSocketOperationUnknownAtLevel, kSocketOperationValueOutOfBound, kAddressNotValid, - kBrokenPipe - + kBrokenPipe, + kNotConnected }; using IOError = ServiceError<IOErrorType, ErrorType::kIOError>; @@ -67,6 +67,10 @@ auto const kAddressAlreadyInUse = IOErrorTemplate { auto const kConnectionRefused = IOErrorTemplate { "Connection refused", IOErrorType::kConnectionRefused }; +auto const kNotConnected = IOErrorTemplate { + "Not connected", IOErrorType::kNotConnected +}; + auto const kConnectionResetByPeer = IOErrorTemplate { "kConnectionResetByPeer", IOErrorType::kConnectionResetByPeer }; diff --git a/common/cpp/src/logger/spd_logger.cpp b/common/cpp/src/logger/spd_logger.cpp index 9228b8f61..01a7606ea 100644 --- a/common/cpp/src/logger/spd_logger.cpp +++ b/common/cpp/src/logger/spd_logger.cpp @@ -2,6 +2,10 @@ #include "fluentd_sink.h" +#include <sstream> +#include <iomanip> + + namespace asapo { void SpdLogger::SetLogLevel(LogLevel level) { @@ -25,9 +29,33 @@ void SpdLogger::SetLogLevel(LogLevel level) { } } } + +std::string escape_json(const std::string &s) { + std::ostringstream o; + for (auto c = s.cbegin(); c != s.cend(); c++) { + switch (*c) { + case '"': o << "\\\""; break; + case '\\': o << "\\\\"; break; + case '\b': o << "\\b"; break; + case '\f': o << "\\f"; break; + case '\n': o << "\\n"; break; + case '\r': o << "\\r"; break; + case '\t': o << "\\t"; break; + default: + if ('\x00' <= *c && *c <= '\x1f') { + o << "\\u" + << std::hex << std::setw(4) << std::setfill('0') << (int)*c; + } else { + o << *c; + } + } + } + return o.str(); +} + std::string EncloseMsg(std::string msg) { if (msg.find("\"") != 0) { - return std::string(R"("message":")") + msg + "\""; + return std::string(R"("message":")") + escape_json(msg) + "\""; } else { return msg; } diff --git a/common/cpp/src/system_io/system_io_linux_mac.cpp b/common/cpp/src/system_io/system_io_linux_mac.cpp index b933a506e..f5fc7368b 100644 --- a/common/cpp/src/system_io/system_io_linux_mac.cpp +++ b/common/cpp/src/system_io/system_io_linux_mac.cpp @@ -50,6 +50,8 @@ Error GetLastErrorFromErrno() { return IOErrorTemplates::kFileAlreadyExists.Generate(); case ENOSPC: return IOErrorTemplates::kNoSpaceLeft.Generate(); + case ENOTCONN: + return IOErrorTemplates::kNotConnected.Generate(); case ECONNREFUSED: return IOErrorTemplates::kConnectionRefused.Generate(); case EADDRINUSE: diff --git a/common/cpp/unittests/logger/test_logger.cpp b/common/cpp/unittests/logger/test_logger.cpp index 6dc03c841..ed67359e1 100644 --- a/common/cpp/unittests/logger/test_logger.cpp +++ b/common/cpp/unittests/logger/test_logger.cpp @@ -71,11 +71,11 @@ class LoggerTests : public Test { spdlog::details::log_msg msg; spdlog::details::log_msg msg_json; - std::string test_string{"Hello"}; + std::string test_string{"Hello\""}; std::string test_string_json{R"("Hello":"test","int":1,"double":123.234)"}; void SetUp() override { - msg.raw << R"("message":"Hello")"; + msg.raw << R"("message":"Hello\"")"; msg_json.raw << R"("Hello":"test","int":1,"double":123.234)"; log.reset(new spdlog::logger("mylogger", mock_sink)); logger.log__ = std::move(log); -- GitLab