diff --git a/common/cpp/include/request/request.h b/common/cpp/include/request/request.h index 42f4ea0001373f5c4588ed7a68be3fba46d94bd6..8bb82cb83171d1b8b5db98dee5812a7f7f46edae 100644 --- a/common/cpp/include/request/request.h +++ b/common/cpp/include/request/request.h @@ -7,6 +7,7 @@ namespace asapo { + class GenericRequest { public: GenericRequest() = delete; @@ -15,6 +16,9 @@ class GenericRequest { virtual ~GenericRequest() = default; }; +using GenericRequestPtr = std::unique_ptr<GenericRequest>; +using GenericRequests = std::vector<GenericRequestPtr>; + } #endif //ASAPO_GENERIC_REQUEST_H diff --git a/common/cpp/include/request/request_pool.h b/common/cpp/include/request/request_pool.h index b1eb651570b4dbdb4dafb5a71eaf7db1d154977a..fe38451ea706f1b3d6c0185f6bf70d9a27667079 100644 --- a/common/cpp/include/request/request_pool.h +++ b/common/cpp/include/request/request_pool.h @@ -21,23 +21,25 @@ class RequestPool { std::unique_lock<std::mutex> lock; }; public: - explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, AbstractLogger* log); - VIRTUAL Error AddRequest(std::unique_ptr<GenericRequest> request); + explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, const AbstractLogger* log); + VIRTUAL Error AddRequest(GenericRequestPtr request); + VIRTUAL Error AddRequests(GenericRequests requests); + ~RequestPool(); uint64_t NRequestsInQueue(); private: - AbstractLogger* log__; + const AbstractLogger* log__; RequestHandlerFactory* request_handler_factory__; std::vector<std::thread> threads_; void ThreadHandler(uint64_t id); bool quit_{false}; std::condition_variable condition_; std::mutex mutex_; - std::deque<std::unique_ptr<GenericRequest>> request_queue_; + std::deque<GenericRequestPtr> request_queue_; bool CanProcessRequest(const std::unique_ptr<RequestHandler>& request_handler); void ProcessRequest(const std::unique_ptr<RequestHandler>& request_handler, ThreadInformation* thread_info); - std::unique_ptr<GenericRequest> GetRequestFromQueue(); - void PutRequestBackToQueue(std::unique_ptr<GenericRequest>request); + GenericRequestPtr GetRequestFromQueue(); + void PutRequestBackToQueue(GenericRequestPtr request); uint64_t shared_counter_{0}; }; diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index a7b78c3c9a0cc1f705b50c0b5450f2a49d6c5fd8..cfcae5c841b7dcb92ae416ef9497ce4fa2f96283 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -1,10 +1,10 @@ +#include <request/request_pool.h> #include "request/request_pool.h" - namespace asapo { RequestPool:: RequestPool(uint8_t n_threads, - RequestHandlerFactory* request_handler_factory, AbstractLogger* log): log__{log}, + RequestHandlerFactory* request_handler_factory, const AbstractLogger* log): log__{log}, request_handler_factory__{request_handler_factory}, threads_{n_threads} { for(size_t i = 0; i < n_threads; i++) { @@ -15,7 +15,7 @@ RequestPool:: RequestPool(uint8_t n_threads, } -Error RequestPool::AddRequest(std::unique_ptr<GenericRequest> request) { +Error RequestPool::AddRequest(GenericRequestPtr request) { std::unique_lock<std::mutex> lock(mutex_); request_queue_.emplace_back(std::move(request)); lock.unlock(); @@ -29,13 +29,13 @@ bool RequestPool::CanProcessRequest(const std::unique_ptr<RequestHandler>& reque return request_queue_.size() && request_handler->ReadyProcessRequest(); } -std::unique_ptr<GenericRequest> RequestPool::GetRequestFromQueue() { +GenericRequestPtr RequestPool::GetRequestFromQueue() { auto request = std::move(request_queue_.front()); request_queue_.pop_front(); return request; } -void RequestPool::PutRequestBackToQueue(std::unique_ptr<GenericRequest> request) { +void RequestPool::PutRequestBackToQueue(GenericRequestPtr request) { request_queue_.emplace_front(std::move(request)); } @@ -86,5 +86,16 @@ uint64_t RequestPool::NRequestsInQueue() { std::lock_guard<std::mutex> lock{mutex_}; return request_queue_.size(); } +Error RequestPool::AddRequests(GenericRequests requests) { + std::unique_lock<std::mutex> lock(mutex_); + for (auto& elem : requests) { + request_queue_.emplace_front(std::move(elem)); + } + lock.unlock(); +//todo: maybe notify_one is better here + condition_.notify_all(); + return nullptr; + +} } diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 8fd7bca7fdf3944b44aa1286f819508f182931ca..98f54624b8153d3074cdaa5681a93fb1033e4af9 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -136,6 +136,23 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { } +TEST_F(RequestPoolTests, AddRequestsOk) { + + TestRequest* request2 = new TestRequest{GenericRequestHeader{}}; + + ExpectSend(mock_request_handler, 2); + + std::vector<std::unique_ptr<GenericRequest>> requests; + requests.push_back(std::move(request)); + requests.push_back(std::unique_ptr<GenericRequest> {request2}); + + auto err = pool.AddRequests(std::move(requests)); + + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + ASSERT_THAT(err, Eq(nullptr)); +} + + TEST_F(RequestPoolTests, FinishProcessingThreads) { EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); diff --git a/deploy/nomad_jobs/receiver.json.tpl b/deploy/nomad_jobs/receiver.json.tpl index f4585aa7fff08a24c9269f08fe71c3ebd4ec1e69..b5021a78c248963135e1f5bbbc751cc8c3ad684a 100644 --- a/deploy/nomad_jobs/receiver.json.tpl +++ b/deploy/nomad_jobs/receiver.json.tpl @@ -6,6 +6,7 @@ "AuthorizationInterval": 10000, "ListenPort": {{ env "NOMAD_PORT_recv" }}, "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": { diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index 1c3b0b5a6a276c6565a281ceef1db630dfc631ca..d521461b6ed18dbe8eaef09c1d24a0a24d0a400f 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -6,7 +6,7 @@ set(SOURCE_FILES src/request_handler_tcp.cpp src/request_handler_filesystem.cpp src/receiver_discovery_service.cpp - src/producer_request_handler_factory.cpp + src/receiver_data_server_request_handler_factory.cpp src/producer_request.cpp) diff --git a/producer/api/src/producer_request_handler_factory.cpp b/producer/api/src/receiver_data_server_request_handler_factory.cpp similarity index 100% rename from producer/api/src/producer_request_handler_factory.cpp rename to producer/api/src/receiver_data_server_request_handler_factory.cpp diff --git a/producer/api/src/receiver_discovery_service.h b/producer/api/src/receiver_discovery_service.h index c1eab17ff3a8a73fdf4b231d03b2667a6af493f0..255240abd7f990b84b4cfa1059a77a290c4d20ed 100644 --- a/producer/api/src/receiver_discovery_service.h +++ b/producer/api/src/receiver_discovery_service.h @@ -26,7 +26,7 @@ class ReceiverDiscoveryService { VIRTUAL uint64_t UpdateFrequency(); public: std::unique_ptr<HttpClient> httpclient__; - AbstractLogger* log__; + const AbstractLogger* log__; private: static const std::string kServiceEndpointSuffix; void ThreadHandler(); diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 121d758eaec3424f8d6f6a7b43e7ab24bb536838..c0371b1f9da58afd586962fbb67f0a6da26d10a7 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -15,10 +15,10 @@ set(SOURCE_FILES src/receiver_data_server/receiver_data_server.cpp src/receiver_data_server/net_server.cpp src/receiver_data_server/tcp_server.cpp - src/receiver_data_server/request_pool.cpp - src/receiver_data_server/request.cpp + src/receiver_data_server/receiver_data_server_request.cpp src/receiver_data_server/receiver_data_server_logger.cpp - src/data_cache.cpp) + src/data_cache.cpp src/receiver_data_server/receiver_data_server_request_handler_factory.cpp +) ################################ @@ -28,7 +28,7 @@ set(SOURCE_FILES add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client> - $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger>) + $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool>) set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${CURL_INCLUDE_DIRS}) target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} database) diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 54bd0b9ea97cab9979f5180c7dc9cb66ec41f3e5..a1e44c8109f92e1c6dac293124f378353fe83d44 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -23,7 +23,7 @@ asapo::Error ReadConfigFile(int argc, char* argv[]) { std::thread StartDataServer(const asapo::ReceiverConfig* config, asapo::SharedCache cache) { static const std::string dataserver_address = "0.0.0.0:" + std::to_string(config->dataserver_listen_port); return std::thread([config] { - asapo::ReceiverDataServer data_server{dataserver_address, config->log_level}; + asapo::ReceiverDataServer data_server{dataserver_address, config->log_level, config->dataserver_nthreads}; data_server.Run(); }); } diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index c8ddc42c2b80f938bbb9c25f7a5fab40424e7940..9c3cbbf97165107489dae0eb87f3555ac450187c 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -21,6 +21,7 @@ Error ReceiverConfigFactory::SetConfig(std::string file_name) { (err = parser.GetString("MonitorDbAddress", &config.monitor_db_uri)) || (err = parser.GetUInt64("ListenPort", &config.listen_port)) || (err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver_listen_port)) || + (err = parser.Embedded("DataServer").GetUInt64("NThreads", &config.dataserver_nthreads)) || (err = parser.GetBool("WriteToDisk", &config.write_to_disk)) || (err = parser.GetBool("WriteToDb", &config.write_to_db)) || (err = parser.Embedded("DataCache").GetBool("Use", &config.use_datacache)) || diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index 19fe552be587d14bb88e3269ddcf2c7590899b83..cd6ee652d1a07cbfa7746471801cd9e5a04cd0d6 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -21,6 +21,7 @@ struct ReceiverConfig { bool use_datacache = true; uint64_t datacache_size_gb = 0; uint64_t datacache_reserved_share = 0; + uint64_t dataserver_nthreads = 1; LogLevel log_level = LogLevel::Info; std::string tag; std::string source_host; diff --git a/receiver/src/receiver_data_server/.receiver_data_server_logger.h.swp b/receiver/src/receiver_data_server/.receiver_data_server_logger.h.swp deleted file mode 100644 index d1055ba044bfcbff70ae3b4cdd0afbf72c39ac32..0000000000000000000000000000000000000000 Binary files a/receiver/src/receiver_data_server/.receiver_data_server_logger.h.swp and /dev/null differ diff --git a/receiver/src/receiver_data_server/common.h b/receiver/src/receiver_data_server/common.h deleted file mode 100644 index 552b109b034248db40eda40222c02db875bb0e6d..0000000000000000000000000000000000000000 --- a/receiver/src/receiver_data_server/common.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef ASAPO_COMMON_H -#define ASAPO_COMMON_H - -#include <vector> - -#include "request.h" -namespace asapo { - -using Requests = std::vector<asapo::Request>; - -} - -#endif //ASAPO_COMMON_H diff --git a/receiver/src/receiver_data_server/net_server.h b/receiver/src/receiver_data_server/net_server.h index d0ebb83bd9cc2c9564cc26eeee8e4ed3af378b58..6c3fee9868e9b2a8518592251eeecebf25459f2c 100644 --- a/receiver/src/receiver_data_server/net_server.h +++ b/receiver/src/receiver_data_server/net_server.h @@ -3,13 +3,13 @@ #include "common/error.h" -#include "common.h" +#include "request/request.h" namespace asapo { class NetServer { public: - virtual Requests GetNewRequests(Error* err) const noexcept = 0; + virtual GenericRequests GetNewRequests(Error* err) const noexcept = 0; virtual ~NetServer() = default; }; diff --git a/receiver/src/receiver_data_server/receiver_data_server.cpp b/receiver/src/receiver_data_server/receiver_data_server.cpp index cbc55c3fb2cc3507b59ecc73094ba34983256111..d2a329812f9072c520601575be249e12df2092ab 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server.cpp @@ -1,12 +1,15 @@ #include "receiver_data_server.h" #include "tcp_server.h" #include "receiver_data_server_logger.h" +#include "receiver_data_server_request_handler_factory.h" namespace asapo { -ReceiverDataServer::ReceiverDataServer(std::string address, LogLevel log_level) : request_pool__{new RequestPool}, net__{new TcpServer(address)}, +ReceiverDataServer::ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads) : net__{new TcpServer(address)}, log__{GetDefaultReceiverDataServerLogger()} { + request_handler_factory_.reset(new ReceiverDataServerRequestHandlerFactory()); GetDefaultReceiverDataServerLogger()->SetLogLevel(log_level); + request_pool__.reset(new RequestPool{n_threads, request_handler_factory_.get(), log__}); } void ReceiverDataServer::Run() { @@ -17,7 +20,7 @@ void ReceiverDataServer::Run() { continue; } if (!err) { - err = request_pool__->AddRequests(requests); + err = request_pool__->AddRequests(std::move(requests)); } if (err) { log__->Error(std::string("receiver data server stopped: ") + err->Explain()); diff --git a/receiver/src/receiver_data_server/receiver_data_server.h b/receiver/src/receiver_data_server/receiver_data_server.h index 8366bb048f1c65dc88c35613f5154d1f5505f46a..26a11b19b2a0f51b6a7e6e2d58a657347ed7f17e 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.h +++ b/receiver/src/receiver_data_server/receiver_data_server.h @@ -4,14 +4,17 @@ #include <memory> #include "net_server.h" -#include "request_pool.h" +#include "request/request_pool.h" #include "logger/logger.h" namespace asapo { class ReceiverDataServer { + private: + // important to create it before request_pool__ + std::unique_ptr<RequestHandlerFactory> request_handler_factory_; public: - explicit ReceiverDataServer(std::string address, LogLevel log_level); + explicit ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads); std::unique_ptr<RequestPool> request_pool__; std::unique_ptr<NetServer> net__; const AbstractLogger* log__; diff --git a/receiver/src/receiver_data_server/receiver_data_server_logger.h.save b/receiver/src/receiver_data_server/receiver_data_server_logger.h.save deleted file mode 100644 index c391dbe0d827627bf27edb7eb904235a16b62733..0000000000000000000000000000000000000000 --- a/receiver/src/receiver_data_server/receiver_data_server_logger.h.save +++ /dev/null @@ -1,14 +0,0 @@ -#ifndef ASAPO_RECEIVER_LOGGER_H -#define ASAPO_RECEIVER_LOGGER_H - -#include "logger/logger.h" - -namespace asapo { - - -AbstractLogger* GetDefaultReceiverDataServerLogger(); - -} - - -#endif //ASAPO_RECEIVER_LOGGER_H diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.cpp b/receiver/src/receiver_data_server/receiver_data_server_request.cpp new file mode 100644 index 0000000000000000000000000000000000000000..baabf2a8875a51983322a8e6c8f38fbed366f4c6 --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request.cpp @@ -0,0 +1,14 @@ +#include "receiver_data_server_request.h" +#include "receiver_data_server.h" + +namespace asapo { + +ReceiverDataServerRequest::ReceiverDataServerRequest(GenericRequestHeader header, uint64_t net_id, + const NetServer* server) : + GenericRequest(std::move(header)), + net_id{net_id}, server{server} { +} + + + +} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.h b/receiver/src/receiver_data_server/receiver_data_server_request.h new file mode 100644 index 0000000000000000000000000000000000000000..c191325d40a1150b47b0fd7be05b334d4b236073 --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request.h @@ -0,0 +1,25 @@ +#ifndef ASAPO_REQUEST_H +#define ASAPO_REQUEST_H + +#include "common/networking.h" + +#include "request/request.h" + +namespace asapo { + +class NetServer; + +class ReceiverDataServerRequest : public GenericRequest { + public: + explicit ReceiverDataServerRequest(GenericRequestHeader header, uint64_t net_id, const NetServer* server); + const uint64_t net_id; + const NetServer* server; + ~ReceiverDataServerRequest() = default; + +}; + +using ReceiverDataServerRequestPtr = std::unique_ptr<ReceiverDataServerRequest>; + +} + +#endif //ASAPO_REQUEST_H diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d28ea747160f915e7f4261c8ec6e7fa7581fbd88 --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp @@ -0,0 +1,9 @@ +#include "receiver_data_server_request_handler_factory.h" + +namespace asapo { + +std::unique_ptr<RequestHandler> ReceiverDataServerRequestHandlerFactory::NewRequestHandler(uint64_t thread_id, + uint64_t* shared_counter) { + return std::unique_ptr<RequestHandler>(); +} +} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h new file mode 100644 index 0000000000000000000000000000000000000000..1176dd57deb32d07946f69d0c5df2a20d55939cc --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h @@ -0,0 +1,20 @@ +#ifndef ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_FACTORY_H +#define ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_FACTORY_H + +#include "request/request_handler_factory.h" +#include "request/request_handler.h" +#include "preprocessor/definitions.h" + + +namespace asapo { + +class ReceiverDataServerRequestHandlerFactory : public RequestHandlerFactory { + public: + VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) override; + private: +}; + + +} + +#endif //ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_FACTORY_H diff --git a/receiver/src/receiver_data_server/request.cpp b/receiver/src/receiver_data_server/request.cpp deleted file mode 100644 index fcc6b7fee0cbe6f02bab9969c722acd406b7edde..0000000000000000000000000000000000000000 --- a/receiver/src/receiver_data_server/request.cpp +++ /dev/null @@ -1,8 +0,0 @@ -#include "request.h" - -namespace asapo { - -Request::Request(uint64_t net_id, const NetServer* server) : net_id{net_id}, server{server} { - -} -} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/request.h b/receiver/src/receiver_data_server/request.h deleted file mode 100644 index 01fff594673765fd8fb8c2952dd10f2d2a883502..0000000000000000000000000000000000000000 --- a/receiver/src/receiver_data_server/request.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef ASAPO_REQUEST_H -#define ASAPO_REQUEST_H - -#include "common/networking.h" - -namespace asapo { - -class NetServer; - -struct Request { - explicit Request(uint64_t net_id, const NetServer* server); - GenericRequestHeader header; - const uint64_t net_id; - const NetServer* server; -}; - -} - -#endif //ASAPO_REQUEST_H diff --git a/receiver/src/receiver_data_server/request_pool.cpp b/receiver/src/receiver_data_server/request_pool.cpp deleted file mode 100644 index 837a62b2f219b5b2381e56fb78aa907136250082..0000000000000000000000000000000000000000 --- a/receiver/src/receiver_data_server/request_pool.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "request_pool.h" - -namespace asapo { - -Error RequestPool::AddRequests(const Requests& requests) noexcept { - return nullptr; -} - -} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/request_pool.h b/receiver/src/receiver_data_server/request_pool.h deleted file mode 100644 index 42d46fa20da2474ca89affac6d33e91131d8fbbf..0000000000000000000000000000000000000000 --- a/receiver/src/receiver_data_server/request_pool.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef ASAPO_REQUEST_POOL_H -#define ASAPO_REQUEST_POOL_H - -#include "preprocessor/definitions.h" -#include "common/error.h" -#include "common.h" - -namespace asapo { - -class RequestPool { - public: - VIRTUAL Error AddRequests(const Requests& requests) noexcept; -}; - -} - -#endif //ASAPO_REQUEST_POOL_H diff --git a/receiver/src/receiver_data_server/tcp_server.cpp b/receiver/src/receiver_data_server/tcp_server.cpp index 87d7f6e4ace269d8be2945904b0cba575e03b57b..ec8e7ad3e89c33189f223cc4405471e5db1cdb38 100644 --- a/receiver/src/receiver_data_server/tcp_server.cpp +++ b/receiver/src/receiver_data_server/tcp_server.cpp @@ -38,22 +38,24 @@ void TcpServer::CloseSocket(SocketDescriptor socket) const noexcept { io__->CloseSocket(socket, nullptr); } -Request TcpServer::ReadRequest(SocketDescriptor socket, Error* err) const noexcept { - Request request{(uint64_t) socket, this}; - io__->Receive(socket, &request.header, +ReceiverDataServerRequestPtr TcpServer::ReadRequest(SocketDescriptor socket, Error* err) const noexcept { + GenericRequestHeader header; + io__->Receive(socket, &header, sizeof(GenericRequestHeader), err); if (*err == ErrorTemplates::kEndOfFile) { CloseSocket(socket); + return nullptr; } else if (*err) { log__->Error("error getting next request from " + io__->AddressFromSocket(socket) + ": " + (*err)-> Explain() ); + return nullptr; } - return request; + return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{std::move(header), (uint64_t) socket, this}}; } -Requests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noexcept { - Requests requests; +GenericRequests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noexcept { + GenericRequests requests; for (auto client : sockets) { Error err; @@ -61,14 +63,14 @@ Requests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noe if (err) { continue; } - log__->Debug("received request opcode: " + std::to_string(request.header.op_code) + " id: " + std::to_string( - request.header.data_id)); + log__->Debug("received request opcode: " + std::to_string(request->header.op_code) + " id: " + std::to_string( + request->header.data_id)); requests.emplace_back(std::move(request)); } return requests; } -Requests TcpServer::GetNewRequests(Error* err) const noexcept { +GenericRequests TcpServer::GetNewRequests(Error* err) const noexcept { if (*err = InitializeMasterSocketIfNeeded()) { return {}; } diff --git a/receiver/src/receiver_data_server/tcp_server.h b/receiver/src/receiver_data_server/tcp_server.h index 2248cf19256baf6e1921b97bd497c2534060da9e..24eb78c58e0076a652b55dea8e2e65a6c1bb072c 100644 --- a/receiver/src/receiver_data_server/tcp_server.h +++ b/receiver/src/receiver_data_server/tcp_server.h @@ -4,7 +4,7 @@ #include "net_server.h" #include "io/io.h" #include "logger/logger.h" - +#include "receiver_data_server_request.h" namespace asapo { const int kMaxPendingConnections = 5; @@ -13,15 +13,15 @@ class TcpServer : public NetServer { public: TcpServer(std::string address); ~TcpServer(); - virtual Requests GetNewRequests(Error* err) const noexcept override ; + virtual GenericRequests GetNewRequests(Error* err) const noexcept override ; std::unique_ptr<IO> io__; const AbstractLogger* log__; private: void CloseSocket(SocketDescriptor socket) const noexcept; ListSocketDescriptors GetActiveSockets(Error* err) const noexcept; Error InitializeMasterSocketIfNeeded() const noexcept; - Request ReadRequest(SocketDescriptor socket, Error* err) const noexcept; - Requests ReadRequests(const ListSocketDescriptors& sockets) const noexcept; + ReceiverDataServerRequestPtr ReadRequest(SocketDescriptor socket, Error* err) const noexcept; + GenericRequests ReadRequests(const ListSocketDescriptors& sockets) const noexcept; mutable SocketDescriptor master_socket_{kDisconnectedSocketDescriptor}; mutable ListSocketDescriptors sockets_to_listen_; std::string address_; diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 89bc98f56758f70e37a734c41140ff56d24ea97d..f226e5eb1fa0d7f773bf171c6c4c78e1457864ec 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -70,7 +70,7 @@ const RequestHandlerList& Request::GetListHandlers() const { } -void Request::AddHandler(const RequestHandler* handler) { +void Request::AddHandler(const ReceiverRequestHandler* handler) { handlers_.emplace_back(handler); } diff --git a/receiver/src/request.h b/receiver/src/request.h index 422a7744ecdbdd2f35348a43b3b787393a98091c..7d0a1036745bd1271c3056c4e1d88792badf9a2a 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -17,7 +17,7 @@ #include "preprocessor/definitions.h" namespace asapo { -using RequestHandlerList = std::vector<const RequestHandler*>; +using RequestHandlerList = std::vector<const ReceiverRequestHandler*>; class Request { public: @@ -25,7 +25,7 @@ class Request { ~Request() = default; Request(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, DataCache* cache); - VIRTUAL void AddHandler(const RequestHandler*); + VIRTUAL void AddHandler(const ReceiverRequestHandler*); VIRTUAL const RequestHandlerList& GetListHandlers() const; VIRTUAL uint64_t GetDataSize() const; VIRTUAL uint64_t GetDataID() const; diff --git a/receiver/src/request_handler.h b/receiver/src/request_handler.h index b02cca66163b67d9d2c652a35d65c23d6addcb8a..604fb23f4dd1bb467c789a7fd99575a6d4be8791 100644 --- a/receiver/src/request_handler.h +++ b/receiver/src/request_handler.h @@ -8,11 +8,11 @@ namespace asapo { class Request; -class RequestHandler { +class ReceiverRequestHandler { public: virtual Error ProcessRequest(Request* request) const = 0; virtual StatisticEntity GetStatisticEntity() const = 0; - virtual ~RequestHandler() = default; + virtual ~ReceiverRequestHandler() = default; private: }; diff --git a/receiver/src/request_handler_authorize.h b/receiver/src/request_handler_authorize.h index 59069240cbe2988e48bed21c7d78672f75aa62f2..8ecf7fee234801951189743c9c826f868fbce143 100644 --- a/receiver/src/request_handler_authorize.h +++ b/receiver/src/request_handler_authorize.h @@ -12,7 +12,7 @@ namespace asapo { -class RequestHandlerAuthorize final: public RequestHandler { +class RequestHandlerAuthorize final: public ReceiverRequestHandler { public: RequestHandlerAuthorize(); StatisticEntity GetStatisticEntity() const override; diff --git a/receiver/src/request_handler_db_write.h b/receiver/src/request_handler_db_write.h index de3bcaa76bcde633888edbcbcee85eae6b475f74..87b5496fc79f5138d8cd8a3258a9d35c99663244 100644 --- a/receiver/src/request_handler_db_write.h +++ b/receiver/src/request_handler_db_write.h @@ -9,7 +9,7 @@ namespace asapo { -class RequestHandlerDbWrite final: public RequestHandler { +class RequestHandlerDbWrite final: public ReceiverRequestHandler { public: RequestHandlerDbWrite(); StatisticEntity GetStatisticEntity() const override; diff --git a/receiver/src/request_handler_file_write.h b/receiver/src/request_handler_file_write.h index a0d36b48a2aa8dad0a7e6c5a357668746754b2db..2868e8fba9e2844cae4e0c729ec9d79f9e70666b 100644 --- a/receiver/src/request_handler_file_write.h +++ b/receiver/src/request_handler_file_write.h @@ -10,7 +10,7 @@ namespace asapo { const uint64_t kMaxFileSize = uint64_t(1024) * 1024 * 1024 * 2; //2GB -class RequestHandlerFileWrite final: public RequestHandler { +class RequestHandlerFileWrite final: public ReceiverRequestHandler { public: RequestHandlerFileWrite(); StatisticEntity GetStatisticEntity() const override; diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index 8e03129c2a4a13a6c959175e110701004acfdbe2..ee06500035cd640441a76ff89fe7f39debcd772f 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -45,8 +45,10 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field) config_string += "," + Key("MonitorDbName", error_field) + "\"" + config.monitor_db_name + "\""; config_string += "," + Key("BrokerDbAddress", error_field) + "\"" + config.broker_db_uri + "\""; config_string += "," + Key("ListenPort", error_field) + std::to_string(config.listen_port); - config_string += "," + Key("DataServer", error_field) + "{" + Key("ListenPort", error_field) + std::to_string( - config.dataserver_listen_port) + "}"; + config_string += "," + Key("DataServer", error_field) + "{"; + config_string += Key("ListenPort", error_field) + std::to_string(config.dataserver_listen_port); + config_string += "," + Key("NThreads", error_field) + std::to_string(config.dataserver_nthreads); + config_string += "}"; config_string += "," + Key("DataCache", error_field) + "{"; config_string += Key("Use", error_field) + (config.use_datacache ? "true" : "false") ; config_string += "," + Key("SizeGB", error_field) + std::to_string(config.datacache_size_gb); diff --git a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h index 07ebfe1409d1fca06d708cfe6599f2c7b4ed7594..390a628e731e17add3c4153129d24d51aa0cccde 100644 --- a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h +++ b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h @@ -5,32 +5,42 @@ #include <gmock/gmock.h> #include "../../src/receiver_data_server/net_server.h" -#include "../../src/receiver_data_server/request_pool.h" - +#include "request/request_pool.h" +#include "../../src/receiver_data_server/receiver_data_server_request.h" namespace asapo { class MockNetServer : public NetServer { public: - Requests GetNewRequests(Error* err) const noexcept override { + GenericRequests GetNewRequests(Error* err) const noexcept override { ErrorInterface* error = nullptr; auto reqs = GetNewRequests_t(&error); err->reset(error); - return reqs; + GenericRequests res; + for (const auto& preq : reqs) { + ReceiverDataServerRequestPtr ptr = ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{preq.header, preq.net_id, preq.server}}; + res.push_back(std::move(ptr)); + } + return res; } - MOCK_CONST_METHOD1(GetNewRequests_t, Requests (ErrorInterface** - error)); + MOCK_CONST_METHOD1(GetNewRequests_t, std::vector<ReceiverDataServerRequest> (ErrorInterface** + error)); }; class MockPool : public RequestPool { public: - Error AddRequests(const Requests& requests) noexcept override { - return Error(AddRequests_t(requests)); + MockPool(): RequestPool(0, nullptr, nullptr) {}; + Error AddRequests(GenericRequests requests) noexcept override { + std::vector<GenericRequest> reqs; + for (const auto& preq : requests) { + reqs.push_back(GenericRequest{preq->header}); + } + return Error(AddRequests_t(std::move(reqs))); } - MOCK_METHOD1(AddRequests_t, ErrorInterface * (const Requests&)); + MOCK_METHOD1(AddRequests_t, ErrorInterface * (std::vector<GenericRequest>)); }; diff --git a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp index c3875026eb9ee33d4674566c72a3069ca52173bf..9a2e4c7146de86b1cc60d4f08245fe0c5bde34c6 100644 --- a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp +++ b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp @@ -5,7 +5,6 @@ #include "unittests/MockLogger.h" #include "../../src/receiver_data_server/receiver_data_server.h" #include "../../src/receiver_data_server/tcp_server.h" -#include "../../src/receiver_data_server/common.h" #include "receiver_dataserver_mocking.h" @@ -30,22 +29,24 @@ using ::testing::HasSubstr; using asapo::MockLogger; using asapo::ReceiverDataServer; using asapo::Error; +using asapo::GenericRequests; +using asapo::GenericRequest; +using asapo::ReceiverDataServerRequest; namespace { TEST(ReceiverDataServer, Constructor) { - ReceiverDataServer data_server{"", asapo::LogLevel::Debug}; + ReceiverDataServer data_server{"", asapo::LogLevel::Debug, 4}; ASSERT_THAT(dynamic_cast<const asapo::TcpServer*>(data_server.net__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(data_server.request_pool__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(data_server.log__), Ne(nullptr)); - } class ReceiverDataServerTests : public Test { public: std::string expected_address = "somehost:123"; - ReceiverDataServer data_server{expected_address, asapo::LogLevel::Debug}; + ReceiverDataServer data_server{expected_address, asapo::LogLevel::Debug, 0}; asapo::MockNetServer mock_net; asapo::MockPool mock_pool; NiceMock<asapo::MockLogger> mock_logger; @@ -63,11 +64,11 @@ class ReceiverDataServerTests : public Test { TEST_F(ReceiverDataServerTests, TimeoutGetNewRequests) { EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kTimeout.Generate().release()), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ); @@ -81,7 +82,7 @@ TEST_F(ReceiverDataServerTests, TimeoutGetNewRequests) { TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) { EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ); @@ -95,7 +96,7 @@ TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) { TEST_F(ReceiverDataServerTests, ErrorAddingRequests) { EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(nullptr), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ); @@ -113,11 +114,11 @@ TEST_F(ReceiverDataServerTests, ErrorAddingRequests) { TEST_F(ReceiverDataServerTests, Ok) { EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(nullptr), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ); diff --git a/receiver/unittests/receiver_data_server/test_tcp_server.cpp b/receiver/unittests/receiver_data_server/test_tcp_server.cpp index b69fc6cc5af4a99774afcd6650236c4a00ef7bd6..f7c35a80e734244f5af7aeb3cb5003cec224d927 100644 --- a/receiver/unittests/receiver_data_server/test_tcp_server.cpp +++ b/receiver/unittests/receiver_data_server/test_tcp_server.cpp @@ -37,7 +37,6 @@ TEST(TCPServer, Constructor) { } -uint64_t expected_id = 124; std::string expected_address = "somehost:123"; class TCPServerTests : public Test { @@ -225,10 +224,10 @@ TEST_F(TCPServerTests, GetNewRequestsReadOk) { ASSERT_THAT(requests.size(), Eq(3)); int i = 0; for (auto conn : expected_client_sockets) { - ASSERT_THAT(requests[i].header.data_id, Eq(conn)); - ASSERT_THAT(requests[i].header.op_code, Eq(asapo::kOpcodeGetBufferData)); - ASSERT_THAT(requests[i].net_id, Eq(conn)); - ASSERT_THAT(requests[i].server, Eq(&tcp_server)); + ASSERT_THAT(requests[i]->header.data_id, Eq(conn)); + ASSERT_THAT(requests[i]->header.op_code, Eq(asapo::kOpcodeGetBufferData)); +// ASSERT_THAT(requests[i]->net_id, Eq(conn)); +// ASSERT_THAT(requests[i]->server, Eq(&tcp_server)); i++; } diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp index 450ac82bfe4805e481607dbaad98aeb99f94e81d..2ea4e9bcc66cc5c1883c6a8739ac01ba818d65e3 100644 --- a/receiver/unittests/test_config.cpp +++ b/receiver/unittests/test_config.cpp @@ -60,7 +60,7 @@ class ConfigTests : public Test { test_config.datacache_reserved_share = 10; test_config.datacache_size_gb = 2; test_config.source_host = "host"; - + test_config.dataserver_nthreads = 5; } }; @@ -90,7 +90,7 @@ TEST_F(ConfigTests, ReadSettings) { ASSERT_THAT(config->datacache_reserved_share, Eq(10)); ASSERT_THAT(config->datacache_size_gb, Eq(2)); ASSERT_THAT(config->source_host, Eq("host")); - + ASSERT_THAT(config->dataserver_nthreads, Eq(5)); } @@ -101,7 +101,7 @@ TEST_F(ConfigTests, ErrorReadSettings) { std::vector<std::string>fields {"MonitorDbAddress", "ListenPort", "DataServer", "ListenPort", "WriteToDisk", "WriteToDb", "DataCache", "Use", "SizeGB", "ReservedShare", "BrokerDbAddress", "Tag", "AuthorizationServer", "AuthorizationInterval", "RootFolder", "MonitorDbName", "LogLevel", - "SourceHost"}; + "SourceHost","NThreads"}; for (const auto& field : fields) { auto err = asapo::SetReceiverConfig(test_config, field); ASSERT_THAT(err, Ne(nullptr)); diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index 9ae52adf93915174741bc620fc01aff06dba35f1..f9181ef555bacaba24db520f1ba35584ff9a06fa 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -46,7 +46,7 @@ using asapo::RequestFactory; namespace { -class MockReqestHandler : public asapo::RequestHandler { +class MockReqestHandler : public asapo::ReceiverRequestHandler { public: Error ProcessRequest(Request* request) const override { return Error{ProcessRequest_t(*request)}; diff --git a/tests/automatic/settings/receiver.json.tpl.lin b/tests/automatic/settings/receiver.json.tpl.lin index 4374a04d24e2dc49c793f6ea155a887e4e2fbe1f..b8a63465a70376d1fa3c7beee7bbbe30f5d580eb 100644 --- a/tests/automatic/settings/receiver.json.tpl.lin +++ b/tests/automatic/settings/receiver.json.tpl.lin @@ -3,6 +3,7 @@ "MonitorDbName": "db_test", "BrokerDbAddress":"localhost:27017", "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": { diff --git a/tests/automatic/settings/receiver.json.tpl.win b/tests/automatic/settings/receiver.json.tpl.win index dfca9c68c8da9733c6014ab83852e6e250f7a90e..a8fd5eec65934a148d91f96e837a556582317e49 100644 --- a/tests/automatic/settings/receiver.json.tpl.win +++ b/tests/automatic/settings/receiver.json.tpl.win @@ -6,6 +6,7 @@ "AuthorizationInterval": 10000, "ListenPort": {{ env "NOMAD_PORT_recv" }}, "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": { diff --git a/tests/manual/performance_full_chain_simple/receiver.json b/tests/manual/performance_full_chain_simple/receiver.json index e06c14e608de1176bc095164235054bd3cb0e6ab..b9bcf2a5817b4a61bd75645750e2c24c6d14a90d 100644 --- a/tests/manual/performance_full_chain_simple/receiver.json +++ b/tests/manual/performance_full_chain_simple/receiver.json @@ -6,6 +6,7 @@ "AuthorizationInterval": 10000, "ListenPort":4200, "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": { diff --git a/tests/manual/performance_producer_receiver/receiver.json b/tests/manual/performance_producer_receiver/receiver.json index e06c14e608de1176bc095164235054bd3cb0e6ab..b9bcf2a5817b4a61bd75645750e2c24c6d14a90d 100644 --- a/tests/manual/performance_producer_receiver/receiver.json +++ b/tests/manual/performance_producer_receiver/receiver.json @@ -6,6 +6,7 @@ "AuthorizationInterval": 10000, "ListenPort":4200, "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": {