Skip to content
Snippets Groups Projects
Commit b97353d6 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

prepare to change receiver logs

parent 5f1e68ab
No related branches found
No related tags found
No related merge requests found
Showing
with 128 additions and 38 deletions
......@@ -22,6 +22,9 @@ std::chrono::system_clock::time_point TimePointfromNanosec(uint64_t nanoseconds_
std::string IsoDateFromEpochNanosecs(uint64_t time_from_epoch_nanosec);
uint64_t NanosecsEpochFromISODate(std::string date_time);
std::string HostFromUri(const std::string& uri);
bool TimeFromJson(const JsonStringParser& parser, const std::string& name, std::chrono::system_clock::time_point* val);
class MessageMeta {
......
......@@ -18,7 +18,6 @@ enum class NetworkConnectionType : uint32_t {
kFabric, // Fabric connection (Primarily used for InfiniBand verbs)
};
// do not forget to add new codes to the end!
enum Opcode : uint8_t {
kOpcodeUnknownOp = 1,
......@@ -34,6 +33,30 @@ enum Opcode : uint8_t {
kOpcodeCount,
};
inline std::string OpcodeToString(uint8_t code) {
switch (code) {
case kOpcodeTransferData:
return "transfer data";
case kOpcodeTransferDatasetData:
return "transfer dataset data";
case kOpcodeStreamInfo:
return "stream info";
case kOpcodeLastStream:
return "last stream";
case kOpcodeGetBufferData:
return "get buffer data";
case kOpcodeAuthorize:
return "authorize";
case kOpcodeTransferMetaData:
return "transfer metadata";
case kOpcodeDeleteStream:
return "delete stream";
case kOpcodeGetMeta:
return "get meta";
default:
return "unknown op";
}
}
enum NetworkErrorCode : uint16_t {
kNetErrorNoError,
......@@ -69,40 +92,38 @@ struct GenericRequestHeader {
/* Keep in mind that the message here is just strncpy'ed, you can change the message later */
GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0,
uint64_t i_data_size = 0, uint64_t i_meta_size = 0, const std::string& i_message = "",
const std::string& i_stream = ""):
const std::string& i_stream = "") :
op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size}, meta_size{i_meta_size} {
strncpy(message, i_message.c_str(), kMaxMessageSize);
strncpy(stream, i_stream.c_str(), kMaxMessageSize);
strncpy(api_version, "v0.0", kMaxVersionSize);
}
Opcode op_code;
uint64_t data_id;
uint64_t data_size;
uint64_t meta_size;
CustomRequestData custom_data;
char message[kMaxMessageSize]; /* Can also be a binary message (e.g. MemoryRegionDetails) */
char stream[kMaxMessageSize]; /* Must be a string (strcpy is used) */
char api_version[kMaxVersionSize]; /* Must be a string (strcpy is used) */
Opcode op_code;
uint64_t data_id;
uint64_t data_size;
uint64_t meta_size;
CustomRequestData custom_data;
char message[kMaxMessageSize]; /* Can also be a binary message (e.g. MemoryRegionDetails) */
char stream[kMaxMessageSize]; /* Must be a string (strcpy is used) */
char api_version[kMaxVersionSize]; /* Must be a string (strcpy is used) */
std::string Json() {
std::string s = "{\"id\":" + std::to_string(data_id) + ","
"\"buffer\":\"" + std::string(message) + "\"" + ","
"\"stream\":\"" + std::string(stream) + "\""
"\"stream\":\""
+ std::string(stream) + "\""
+ "}";
return s;
};
};
struct GenericNetworkResponse {
Opcode op_code;
NetworkErrorCode error_code;
char message[kMaxMessageSize];
Opcode op_code;
NetworkErrorCode error_code;
char message[kMaxMessageSize];
};
struct SendResponse : GenericNetworkResponse {
struct SendResponse : GenericNetworkResponse {
};
}
......
......@@ -20,6 +20,7 @@ class LogMessageWithFields {
public:
LogMessageWithFields(std::string key, uint64_t val);
LogMessageWithFields(std::string key, double val, int precision);
LogMessageWithFields(std::string val);
LogMessageWithFields(std::string key, std::string val);
LogMessageWithFields& Append(std::string key, uint64_t val);
LogMessageWithFields& Append(std::string key, double val, int precision);
......
......@@ -234,4 +234,9 @@ uint64_t NanosecsEpochFromISODate(std::string date_time) {
return ns > 0 ? ns : 1;
}
std::string HostFromUri(const std::string& uri) {
return uri.substr(0, uri.find(':'));
}
}
#include <asapo/logger/logger.h>
#include "spd_logger.h"
namespace asapo {
......@@ -57,8 +60,12 @@ LogMessageWithFields::LogMessageWithFields(std::string key, double val, int prec
log_string_ = EncloseQuotes(key) + ":" + string_format("%." + std::to_string(precision) + "f", val);
}
LogMessageWithFields::LogMessageWithFields(std::string val) {
log_string_ = EncloseQuotes("message") + ":" + EncloseQuotes(escape_json(val));
}
LogMessageWithFields::LogMessageWithFields(std::string key, std::string val) {
log_string_ = EncloseQuotes(key) + ":" + EncloseQuotes(val);
log_string_ = EncloseQuotes(key) + ":" + EncloseQuotes(escape_json(val));
}
LogMessageWithFields& LogMessageWithFields::Append(std::string key, uint64_t val) {
......@@ -72,11 +79,12 @@ LogMessageWithFields& LogMessageWithFields::Append(std::string key, double val,
}
LogMessageWithFields& LogMessageWithFields::Append(std::string key, std::string val) {
log_string_ += "," + EncloseQuotes(key) + ":" + EncloseQuotes(val);
log_string_ += "," + EncloseQuotes(key) + ":" + EncloseQuotes(escape_json(val));
return *this;
}
std::string LogMessageWithFields::LogString() const {
return log_string_;
}
}
......@@ -34,6 +34,7 @@ class SpdLogger : public AbstractLogger {
};
std::string EncloseMsg(std::string msg);
std::string escape_json(const std::string& s);
}
......
......@@ -305,4 +305,23 @@ TEST(DeletaStreamOpt, EncodeDecode) {
}
struct TestUri {
std::string uri;
std::string host;
};
auto testsUri = std::vector<TestUri> {
TestUri{"123.234.24.13:123", "123.234.24.13"},
TestUri{"1.1.1.1", "1.1.1.1"},
};
TEST(HostFromUri, HostFromUri) {
for (auto test : testsUri) {
auto res = asapo::HostFromUri(test.uri);
ASSERT_THAT(res, Eq(test.host));
}
}
}
......@@ -25,9 +25,9 @@ set(RECEIVER_CORE_FILES
src/request_handler/request_handler_db_get_meta.cpp
src/request_handler/request_factory.cpp
src/request_handler/request_handler_db.cpp
src/file_processors/write_file_processor.cpp
src/file_processors/file_processor.cpp
src/file_processors/receive_file_processor.cpp
src/request_handler/file_processors/write_file_processor.cpp
src/request_handler/file_processors/file_processor.cpp
src/request_handler/file_processors/receive_file_processor.cpp
src/metrics/receiver_prometheus_metrics.cpp
src/metrics/receiver_mongoose_server.cpp
)
......@@ -109,9 +109,9 @@ set(TEST_SOURCE_FILES
unittests/mock_receiver_config.cpp
unittests/request_handler/test_requests_dispatcher.cpp
unittests/test_datacache.cpp
unittests/file_processors/test_write_file_processor.cpp
unittests/file_processors/test_file_processor.cpp
unittests/file_processors/test_receive_file_processor.cpp
unittests/request_handler/file_processors/test_write_file_processor.cpp
unittests/request_handler/file_processors/test_file_processor.cpp
unittests/request_handler/file_processors/test_receive_file_processor.cpp
)
#
set(TEST_LIBRARIES "${TARGET_NAME};system_io")
......
......@@ -44,7 +44,7 @@ void Connection::Listen() const noexcept {
}
io__->CloseSocket(socket_fd_, nullptr);
statistics__->SendIfNeeded(true);
log__->Info("disconnected from " + address_);
log__->Info(LogMessageWithFields("disconnected from ").Append("origin", HostFromUri(address_)));
}
......
......@@ -13,7 +13,7 @@ DataCache::DataCache(uint64_t cache_size, float keepunlocked_ratio) : cache_size
try {
cache_.reset(new uint8_t[cache_size]);
} catch (std::exception& e) {
std::cout << "Cannot allocate data cache: " << e.what() << std::endl;
std::cout << "cannot allocate data cache: " << e.what() << std::endl;
exit(1);
}
......
......@@ -32,10 +32,11 @@ static void fn(struct mg_connection* c, int ev, void* ev_data, void* fn_data) {
void asapo::ReceiverMongooseServer::ListenAndServe(std::string port,
std::unique_ptr<ReceiverMetricsProvider> provider) {
struct mg_mgr mgr; // Event manager
mg_log_set(0);
mg_mgr_init(&mgr); // Initialise event manager
auto uri = "0.0.0.0:" + port;
if (mg_http_listen(&mgr, uri.c_str(), fn, (void*) provider.get()) == NULL) {
log__->Error("cannot listen on port " + port);
log__->Error("metrics server: cannot listen on port " + port);
mg_mgr_free(&mgr);
return;
}
......
......@@ -54,7 +54,7 @@ void Receiver::ProcessConnections(Error* err) {
}
void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address) {
log__->Info("new connection from " + address);
log__->Info(LogMessageWithFields("new connection").Append("origin", HostFromUri(address)));
auto thread = io__->NewThread("ConFd:" + std::to_string(connection_socket_fd),
[connection_socket_fd, address, this] {
auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, GetReceiverConfig()->tag));
......
#include "receiver_logger.h"
namespace asapo {
#include "request.h"
namespace asapo {
AbstractLogger* GetDefaultReceiverLogger() {
static Logger logger = asapo::CreateDefaultLoggerBin("receiver");
return logger.get();
}
LogMessageWithFields RequestLog(std::string message, const Request* request) {
LogMessageWithFields msg{std::move(message)};
msg.Append("beamtime", request->GetBeamtimeId())
.Append("dataSource", request->GetDataSource())
.Append("stream", request->GetStream())
.Append("origin", request->GetOriginHost())
.Append("operation", OpcodeToString(request->GetOpCode()));
switch (request->GetOpCode()) {
case Opcode::kOpcodeTransferData:
msg.Append("id", request->GetDataID());
break;
case Opcode::kOpcodeTransferDatasetData:
msg.Append("id", request->GetDataID());
msg.Append("substream", request->GetCustomData()[1]);
break;
default:
break;
}
return msg;
}
}
......@@ -5,8 +5,10 @@
namespace asapo {
class Request;
AbstractLogger* GetDefaultReceiverLogger();
LogMessageWithFields RequestLog(std::string message, const Request* request, std::string origin);
}
......
......@@ -10,6 +10,7 @@ Request::Request(const GenericRequestHeader& header,
cache__{cache}, request_header_(header),
socket_fd_{socket_fd}, origin_uri_{std::move(origin_uri)},
check_duplicate_request_handler_{db_check_handler} {
origin_host_ = HostFromUri(origin_uri_);
}
Error Request::PrepareDataBufferAndLockIfNeeded() {
......@@ -205,4 +206,8 @@ SourceType Request::GetSourceType() const {
return source_type_;
}
const std::string& Request::GetOriginHost() const {
return origin_host_;
}
}
......@@ -19,7 +19,6 @@
#include "data_cache.h"
#include "asapo/preprocessor/definitions.h"
#include "file_processors/file_processor.h"
namespace asapo {
......@@ -50,6 +49,7 @@ class Request {
VIRTUAL const char* GetMessage() const;
const std::string& GetOriginUri() const;
const std::string& GetOriginHost() const;
VIRTUAL const std::string& GetMetaData() const;
VIRTUAL const std::string& GetBeamtimeId() const;
VIRTUAL void SetBeamtimeId(std::string beamtime_id);
......@@ -88,6 +88,7 @@ class Request {
void* data_ptr;
RequestHandlerList handlers_;
std::string origin_uri_;
std::string origin_host_;
std::string beamtime_id_;
std::string data_source_;
std::string beamline_;
......
#include "file_processor.h"
#include "asapo/io/io_factory.h"
#include "../receiver_logger.h"
#include "../receiver_config.h"
#include "../request.h"
#include "../../receiver_logger.h"
#include "../../request.h"
namespace asapo {
......
#include "receive_file_processor.h"
#include "asapo/io/io_factory.h"
#include "../receiver_error.h"
#include "asapo/preprocessor/definitions.h"
#include "../request.h"
#include "../receiver_config.h"
#include "../../receiver_error.h"
#include "../../request.h"
#include "../../receiver_config.h"
namespace asapo {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment