From 3e3714ce5059ee4ee53f09af0635e1606b0021d8 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Thu, 21 Feb 2019 17:38:52 +0100 Subject: [PATCH] working pool for dataserver --- common/cpp/include/request/request.h | 4 +++ common/cpp/include/request/request_pool.h | 14 +++++---- common/cpp/src/request/request_pool.cpp | 21 +++++++++---- .../unittests/request/test_request_pool.cpp | 17 +++++++++++ deploy/nomad_jobs/receiver.json.tpl | 1 + producer/api/CMakeLists.txt | 2 +- ...r_data_server_request_handler_factory.cpp} | 0 producer/api/src/receiver_discovery_service.h | 2 +- receiver/CMakeLists.txt | 8 ++--- receiver/src/main.cpp | 2 +- receiver/src/receiver_config.cpp | 1 + receiver/src/receiver_config.h | 1 + .../.receiver_data_server_logger.h.swp | Bin 1024 -> 0 bytes receiver/src/receiver_data_server/common.h | 13 -------- .../src/receiver_data_server/net_server.h | 4 +-- .../receiver_data_server.cpp | 7 +++-- .../receiver_data_server.h | 7 +++-- .../receiver_data_server_logger.h.save | 14 --------- .../receiver_data_server_request.cpp | 14 +++++++++ .../receiver_data_server_request.h | 25 ++++++++++++++++ ...er_data_server_request_handler_factory.cpp | 9 ++++++ ...iver_data_server_request_handler_factory.h | 20 +++++++++++++ receiver/src/receiver_data_server/request.cpp | 8 ----- receiver/src/receiver_data_server/request.h | 19 ------------ .../src/receiver_data_server/request_pool.cpp | 9 ------ .../src/receiver_data_server/request_pool.h | 17 ----------- .../src/receiver_data_server/tcp_server.cpp | 20 +++++++------ .../src/receiver_data_server/tcp_server.h | 8 ++--- receiver/src/request.cpp | 2 +- receiver/src/request.h | 4 +-- receiver/src/request_handler.h | 4 +-- receiver/src/request_handler_authorize.h | 2 +- receiver/src/request_handler_db_write.h | 2 +- receiver/src/request_handler_file_write.h | 2 +- receiver/unittests/mock_receiver_config.cpp | 6 ++-- .../receiver_dataserver_mocking.h | 28 ++++++++++++------ .../test_receiver_data_server.cpp | 21 ++++++------- .../receiver_data_server/test_tcp_server.cpp | 9 +++--- receiver/unittests/test_config.cpp | 6 ++-- receiver/unittests/test_request.cpp | 2 +- .../automatic/settings/receiver.json.tpl.lin | 1 + .../automatic/settings/receiver.json.tpl.win | 1 + .../receiver.json | 1 + .../receiver.json | 1 + 44 files changed, 204 insertions(+), 155 deletions(-) rename producer/api/src/{producer_request_handler_factory.cpp => receiver_data_server_request_handler_factory.cpp} (100%) delete mode 100644 receiver/src/receiver_data_server/.receiver_data_server_logger.h.swp delete mode 100644 receiver/src/receiver_data_server/common.h delete mode 100644 receiver/src/receiver_data_server/receiver_data_server_logger.h.save create mode 100644 receiver/src/receiver_data_server/receiver_data_server_request.cpp create mode 100644 receiver/src/receiver_data_server/receiver_data_server_request.h create mode 100644 receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp create mode 100644 receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h delete mode 100644 receiver/src/receiver_data_server/request.cpp delete mode 100644 receiver/src/receiver_data_server/request.h delete mode 100644 receiver/src/receiver_data_server/request_pool.cpp delete mode 100644 receiver/src/receiver_data_server/request_pool.h diff --git a/common/cpp/include/request/request.h b/common/cpp/include/request/request.h index 42f4ea000..8bb82cb83 100644 --- a/common/cpp/include/request/request.h +++ b/common/cpp/include/request/request.h @@ -7,6 +7,7 @@ namespace asapo { + class GenericRequest { public: GenericRequest() = delete; @@ -15,6 +16,9 @@ class GenericRequest { virtual ~GenericRequest() = default; }; +using GenericRequestPtr = std::unique_ptr<GenericRequest>; +using GenericRequests = std::vector<GenericRequestPtr>; + } #endif //ASAPO_GENERIC_REQUEST_H diff --git a/common/cpp/include/request/request_pool.h b/common/cpp/include/request/request_pool.h index b1eb65157..fe38451ea 100644 --- a/common/cpp/include/request/request_pool.h +++ b/common/cpp/include/request/request_pool.h @@ -21,23 +21,25 @@ class RequestPool { std::unique_lock<std::mutex> lock; }; public: - explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, AbstractLogger* log); - VIRTUAL Error AddRequest(std::unique_ptr<GenericRequest> request); + explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, const AbstractLogger* log); + VIRTUAL Error AddRequest(GenericRequestPtr request); + VIRTUAL Error AddRequests(GenericRequests requests); + ~RequestPool(); uint64_t NRequestsInQueue(); private: - AbstractLogger* log__; + const AbstractLogger* log__; RequestHandlerFactory* request_handler_factory__; std::vector<std::thread> threads_; void ThreadHandler(uint64_t id); bool quit_{false}; std::condition_variable condition_; std::mutex mutex_; - std::deque<std::unique_ptr<GenericRequest>> request_queue_; + std::deque<GenericRequestPtr> request_queue_; bool CanProcessRequest(const std::unique_ptr<RequestHandler>& request_handler); void ProcessRequest(const std::unique_ptr<RequestHandler>& request_handler, ThreadInformation* thread_info); - std::unique_ptr<GenericRequest> GetRequestFromQueue(); - void PutRequestBackToQueue(std::unique_ptr<GenericRequest>request); + GenericRequestPtr GetRequestFromQueue(); + void PutRequestBackToQueue(GenericRequestPtr request); uint64_t shared_counter_{0}; }; diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp index a7b78c3c9..cfcae5c84 100644 --- a/common/cpp/src/request/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -1,10 +1,10 @@ +#include <request/request_pool.h> #include "request/request_pool.h" - namespace asapo { RequestPool:: RequestPool(uint8_t n_threads, - RequestHandlerFactory* request_handler_factory, AbstractLogger* log): log__{log}, + RequestHandlerFactory* request_handler_factory, const AbstractLogger* log): log__{log}, request_handler_factory__{request_handler_factory}, threads_{n_threads} { for(size_t i = 0; i < n_threads; i++) { @@ -15,7 +15,7 @@ RequestPool:: RequestPool(uint8_t n_threads, } -Error RequestPool::AddRequest(std::unique_ptr<GenericRequest> request) { +Error RequestPool::AddRequest(GenericRequestPtr request) { std::unique_lock<std::mutex> lock(mutex_); request_queue_.emplace_back(std::move(request)); lock.unlock(); @@ -29,13 +29,13 @@ bool RequestPool::CanProcessRequest(const std::unique_ptr<RequestHandler>& reque return request_queue_.size() && request_handler->ReadyProcessRequest(); } -std::unique_ptr<GenericRequest> RequestPool::GetRequestFromQueue() { +GenericRequestPtr RequestPool::GetRequestFromQueue() { auto request = std::move(request_queue_.front()); request_queue_.pop_front(); return request; } -void RequestPool::PutRequestBackToQueue(std::unique_ptr<GenericRequest> request) { +void RequestPool::PutRequestBackToQueue(GenericRequestPtr request) { request_queue_.emplace_front(std::move(request)); } @@ -86,5 +86,16 @@ uint64_t RequestPool::NRequestsInQueue() { std::lock_guard<std::mutex> lock{mutex_}; return request_queue_.size(); } +Error RequestPool::AddRequests(GenericRequests requests) { + std::unique_lock<std::mutex> lock(mutex_); + for (auto& elem : requests) { + request_queue_.emplace_front(std::move(elem)); + } + lock.unlock(); +//todo: maybe notify_one is better here + condition_.notify_all(); + return nullptr; + +} } diff --git a/common/cpp/unittests/request/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp index 8fd7bca7f..98f54624b 100644 --- a/common/cpp/unittests/request/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -136,6 +136,23 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { } +TEST_F(RequestPoolTests, AddRequestsOk) { + + TestRequest* request2 = new TestRequest{GenericRequestHeader{}}; + + ExpectSend(mock_request_handler, 2); + + std::vector<std::unique_ptr<GenericRequest>> requests; + requests.push_back(std::move(request)); + requests.push_back(std::unique_ptr<GenericRequest> {request2}); + + auto err = pool.AddRequests(std::move(requests)); + + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + ASSERT_THAT(err, Eq(nullptr)); +} + + TEST_F(RequestPoolTests, FinishProcessingThreads) { EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads); diff --git a/deploy/nomad_jobs/receiver.json.tpl b/deploy/nomad_jobs/receiver.json.tpl index f4585aa7f..b5021a78c 100644 --- a/deploy/nomad_jobs/receiver.json.tpl +++ b/deploy/nomad_jobs/receiver.json.tpl @@ -6,6 +6,7 @@ "AuthorizationInterval": 10000, "ListenPort": {{ env "NOMAD_PORT_recv" }}, "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": { diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index 1c3b0b5a6..d521461b6 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -6,7 +6,7 @@ set(SOURCE_FILES src/request_handler_tcp.cpp src/request_handler_filesystem.cpp src/receiver_discovery_service.cpp - src/producer_request_handler_factory.cpp + src/receiver_data_server_request_handler_factory.cpp src/producer_request.cpp) diff --git a/producer/api/src/producer_request_handler_factory.cpp b/producer/api/src/receiver_data_server_request_handler_factory.cpp similarity index 100% rename from producer/api/src/producer_request_handler_factory.cpp rename to producer/api/src/receiver_data_server_request_handler_factory.cpp diff --git a/producer/api/src/receiver_discovery_service.h b/producer/api/src/receiver_discovery_service.h index c1eab17ff..255240abd 100644 --- a/producer/api/src/receiver_discovery_service.h +++ b/producer/api/src/receiver_discovery_service.h @@ -26,7 +26,7 @@ class ReceiverDiscoveryService { VIRTUAL uint64_t UpdateFrequency(); public: std::unique_ptr<HttpClient> httpclient__; - AbstractLogger* log__; + const AbstractLogger* log__; private: static const std::string kServiceEndpointSuffix; void ThreadHandler(); diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index 121d758ea..c0371b1f9 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -15,10 +15,10 @@ set(SOURCE_FILES src/receiver_data_server/receiver_data_server.cpp src/receiver_data_server/net_server.cpp src/receiver_data_server/tcp_server.cpp - src/receiver_data_server/request_pool.cpp - src/receiver_data_server/request.cpp + src/receiver_data_server/receiver_data_server_request.cpp src/receiver_data_server/receiver_data_server_logger.cpp - src/data_cache.cpp) + src/data_cache.cpp src/receiver_data_server/receiver_data_server_request_handler_factory.cpp +) ################################ @@ -28,7 +28,7 @@ set(SOURCE_FILES add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client> - $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger>) + $<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool>) set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${CURL_INCLUDE_DIRS}) target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} database) diff --git a/receiver/src/main.cpp b/receiver/src/main.cpp index 54bd0b9ea..a1e44c810 100644 --- a/receiver/src/main.cpp +++ b/receiver/src/main.cpp @@ -23,7 +23,7 @@ asapo::Error ReadConfigFile(int argc, char* argv[]) { std::thread StartDataServer(const asapo::ReceiverConfig* config, asapo::SharedCache cache) { static const std::string dataserver_address = "0.0.0.0:" + std::to_string(config->dataserver_listen_port); return std::thread([config] { - asapo::ReceiverDataServer data_server{dataserver_address, config->log_level}; + asapo::ReceiverDataServer data_server{dataserver_address, config->log_level, config->dataserver_nthreads}; data_server.Run(); }); } diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index c8ddc42c2..9c3cbbf97 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -21,6 +21,7 @@ Error ReceiverConfigFactory::SetConfig(std::string file_name) { (err = parser.GetString("MonitorDbAddress", &config.monitor_db_uri)) || (err = parser.GetUInt64("ListenPort", &config.listen_port)) || (err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver_listen_port)) || + (err = parser.Embedded("DataServer").GetUInt64("NThreads", &config.dataserver_nthreads)) || (err = parser.GetBool("WriteToDisk", &config.write_to_disk)) || (err = parser.GetBool("WriteToDb", &config.write_to_db)) || (err = parser.Embedded("DataCache").GetBool("Use", &config.use_datacache)) || diff --git a/receiver/src/receiver_config.h b/receiver/src/receiver_config.h index 19fe552be..cd6ee652d 100644 --- a/receiver/src/receiver_config.h +++ b/receiver/src/receiver_config.h @@ -21,6 +21,7 @@ struct ReceiverConfig { bool use_datacache = true; uint64_t datacache_size_gb = 0; uint64_t datacache_reserved_share = 0; + uint64_t dataserver_nthreads = 1; LogLevel log_level = LogLevel::Info; std::string tag; std::string source_host; diff --git a/receiver/src/receiver_data_server/.receiver_data_server_logger.h.swp b/receiver/src/receiver_data_server/.receiver_data_server_logger.h.swp deleted file mode 100644 index d1055ba044bfcbff70ae3b4cdd0afbf72c39ac32..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1024 zcmYc?$V<%2S1{5u)iY*50*Q7E43&x5rAhf^NTS%8RhcCP$rVQC28L#ODXGPkdMT+` z73yc?=cejIP1P?b%FjwoE-BVeEKV%Q*Dp#<PR%S!Ez&P8N``adQxZ!O<BL;^KoT$& g$b#`X`RVDYMS2-nEgTgb4S~@RKn{UWWDbZ80EB!p00000 diff --git a/receiver/src/receiver_data_server/common.h b/receiver/src/receiver_data_server/common.h deleted file mode 100644 index 552b109b0..000000000 --- a/receiver/src/receiver_data_server/common.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef ASAPO_COMMON_H -#define ASAPO_COMMON_H - -#include <vector> - -#include "request.h" -namespace asapo { - -using Requests = std::vector<asapo::Request>; - -} - -#endif //ASAPO_COMMON_H diff --git a/receiver/src/receiver_data_server/net_server.h b/receiver/src/receiver_data_server/net_server.h index d0ebb83bd..6c3fee986 100644 --- a/receiver/src/receiver_data_server/net_server.h +++ b/receiver/src/receiver_data_server/net_server.h @@ -3,13 +3,13 @@ #include "common/error.h" -#include "common.h" +#include "request/request.h" namespace asapo { class NetServer { public: - virtual Requests GetNewRequests(Error* err) const noexcept = 0; + virtual GenericRequests GetNewRequests(Error* err) const noexcept = 0; virtual ~NetServer() = default; }; diff --git a/receiver/src/receiver_data_server/receiver_data_server.cpp b/receiver/src/receiver_data_server/receiver_data_server.cpp index cbc55c3fb..d2a329812 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.cpp +++ b/receiver/src/receiver_data_server/receiver_data_server.cpp @@ -1,12 +1,15 @@ #include "receiver_data_server.h" #include "tcp_server.h" #include "receiver_data_server_logger.h" +#include "receiver_data_server_request_handler_factory.h" namespace asapo { -ReceiverDataServer::ReceiverDataServer(std::string address, LogLevel log_level) : request_pool__{new RequestPool}, net__{new TcpServer(address)}, +ReceiverDataServer::ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads) : net__{new TcpServer(address)}, log__{GetDefaultReceiverDataServerLogger()} { + request_handler_factory_.reset(new ReceiverDataServerRequestHandlerFactory()); GetDefaultReceiverDataServerLogger()->SetLogLevel(log_level); + request_pool__.reset(new RequestPool{n_threads, request_handler_factory_.get(), log__}); } void ReceiverDataServer::Run() { @@ -17,7 +20,7 @@ void ReceiverDataServer::Run() { continue; } if (!err) { - err = request_pool__->AddRequests(requests); + err = request_pool__->AddRequests(std::move(requests)); } if (err) { log__->Error(std::string("receiver data server stopped: ") + err->Explain()); diff --git a/receiver/src/receiver_data_server/receiver_data_server.h b/receiver/src/receiver_data_server/receiver_data_server.h index 8366bb048..26a11b19b 100644 --- a/receiver/src/receiver_data_server/receiver_data_server.h +++ b/receiver/src/receiver_data_server/receiver_data_server.h @@ -4,14 +4,17 @@ #include <memory> #include "net_server.h" -#include "request_pool.h" +#include "request/request_pool.h" #include "logger/logger.h" namespace asapo { class ReceiverDataServer { + private: + // important to create it before request_pool__ + std::unique_ptr<RequestHandlerFactory> request_handler_factory_; public: - explicit ReceiverDataServer(std::string address, LogLevel log_level); + explicit ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads); std::unique_ptr<RequestPool> request_pool__; std::unique_ptr<NetServer> net__; const AbstractLogger* log__; diff --git a/receiver/src/receiver_data_server/receiver_data_server_logger.h.save b/receiver/src/receiver_data_server/receiver_data_server_logger.h.save deleted file mode 100644 index c391dbe0d..000000000 --- a/receiver/src/receiver_data_server/receiver_data_server_logger.h.save +++ /dev/null @@ -1,14 +0,0 @@ -#ifndef ASAPO_RECEIVER_LOGGER_H -#define ASAPO_RECEIVER_LOGGER_H - -#include "logger/logger.h" - -namespace asapo { - - -AbstractLogger* GetDefaultReceiverDataServerLogger(); - -} - - -#endif //ASAPO_RECEIVER_LOGGER_H diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.cpp b/receiver/src/receiver_data_server/receiver_data_server_request.cpp new file mode 100644 index 000000000..baabf2a88 --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request.cpp @@ -0,0 +1,14 @@ +#include "receiver_data_server_request.h" +#include "receiver_data_server.h" + +namespace asapo { + +ReceiverDataServerRequest::ReceiverDataServerRequest(GenericRequestHeader header, uint64_t net_id, + const NetServer* server) : + GenericRequest(std::move(header)), + net_id{net_id}, server{server} { +} + + + +} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/receiver_data_server_request.h b/receiver/src/receiver_data_server/receiver_data_server_request.h new file mode 100644 index 000000000..c191325d4 --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request.h @@ -0,0 +1,25 @@ +#ifndef ASAPO_REQUEST_H +#define ASAPO_REQUEST_H + +#include "common/networking.h" + +#include "request/request.h" + +namespace asapo { + +class NetServer; + +class ReceiverDataServerRequest : public GenericRequest { + public: + explicit ReceiverDataServerRequest(GenericRequestHeader header, uint64_t net_id, const NetServer* server); + const uint64_t net_id; + const NetServer* server; + ~ReceiverDataServerRequest() = default; + +}; + +using ReceiverDataServerRequestPtr = std::unique_ptr<ReceiverDataServerRequest>; + +} + +#endif //ASAPO_REQUEST_H diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp new file mode 100644 index 000000000..d28ea7471 --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.cpp @@ -0,0 +1,9 @@ +#include "receiver_data_server_request_handler_factory.h" + +namespace asapo { + +std::unique_ptr<RequestHandler> ReceiverDataServerRequestHandlerFactory::NewRequestHandler(uint64_t thread_id, + uint64_t* shared_counter) { + return std::unique_ptr<RequestHandler>(); +} +} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h new file mode 100644 index 000000000..1176dd57d --- /dev/null +++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler_factory.h @@ -0,0 +1,20 @@ +#ifndef ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_FACTORY_H +#define ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_FACTORY_H + +#include "request/request_handler_factory.h" +#include "request/request_handler.h" +#include "preprocessor/definitions.h" + + +namespace asapo { + +class ReceiverDataServerRequestHandlerFactory : public RequestHandlerFactory { + public: + VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) override; + private: +}; + + +} + +#endif //ASAPO_RECEIVER_DATA_SERVER_REQUEST_HANDLER_FACTORY_H diff --git a/receiver/src/receiver_data_server/request.cpp b/receiver/src/receiver_data_server/request.cpp deleted file mode 100644 index fcc6b7fee..000000000 --- a/receiver/src/receiver_data_server/request.cpp +++ /dev/null @@ -1,8 +0,0 @@ -#include "request.h" - -namespace asapo { - -Request::Request(uint64_t net_id, const NetServer* server) : net_id{net_id}, server{server} { - -} -} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/request.h b/receiver/src/receiver_data_server/request.h deleted file mode 100644 index 01fff5946..000000000 --- a/receiver/src/receiver_data_server/request.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef ASAPO_REQUEST_H -#define ASAPO_REQUEST_H - -#include "common/networking.h" - -namespace asapo { - -class NetServer; - -struct Request { - explicit Request(uint64_t net_id, const NetServer* server); - GenericRequestHeader header; - const uint64_t net_id; - const NetServer* server; -}; - -} - -#endif //ASAPO_REQUEST_H diff --git a/receiver/src/receiver_data_server/request_pool.cpp b/receiver/src/receiver_data_server/request_pool.cpp deleted file mode 100644 index 837a62b2f..000000000 --- a/receiver/src/receiver_data_server/request_pool.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "request_pool.h" - -namespace asapo { - -Error RequestPool::AddRequests(const Requests& requests) noexcept { - return nullptr; -} - -} \ No newline at end of file diff --git a/receiver/src/receiver_data_server/request_pool.h b/receiver/src/receiver_data_server/request_pool.h deleted file mode 100644 index 42d46fa20..000000000 --- a/receiver/src/receiver_data_server/request_pool.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef ASAPO_REQUEST_POOL_H -#define ASAPO_REQUEST_POOL_H - -#include "preprocessor/definitions.h" -#include "common/error.h" -#include "common.h" - -namespace asapo { - -class RequestPool { - public: - VIRTUAL Error AddRequests(const Requests& requests) noexcept; -}; - -} - -#endif //ASAPO_REQUEST_POOL_H diff --git a/receiver/src/receiver_data_server/tcp_server.cpp b/receiver/src/receiver_data_server/tcp_server.cpp index 87d7f6e4a..ec8e7ad3e 100644 --- a/receiver/src/receiver_data_server/tcp_server.cpp +++ b/receiver/src/receiver_data_server/tcp_server.cpp @@ -38,22 +38,24 @@ void TcpServer::CloseSocket(SocketDescriptor socket) const noexcept { io__->CloseSocket(socket, nullptr); } -Request TcpServer::ReadRequest(SocketDescriptor socket, Error* err) const noexcept { - Request request{(uint64_t) socket, this}; - io__->Receive(socket, &request.header, +ReceiverDataServerRequestPtr TcpServer::ReadRequest(SocketDescriptor socket, Error* err) const noexcept { + GenericRequestHeader header; + io__->Receive(socket, &header, sizeof(GenericRequestHeader), err); if (*err == ErrorTemplates::kEndOfFile) { CloseSocket(socket); + return nullptr; } else if (*err) { log__->Error("error getting next request from " + io__->AddressFromSocket(socket) + ": " + (*err)-> Explain() ); + return nullptr; } - return request; + return ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{std::move(header), (uint64_t) socket, this}}; } -Requests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noexcept { - Requests requests; +GenericRequests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noexcept { + GenericRequests requests; for (auto client : sockets) { Error err; @@ -61,14 +63,14 @@ Requests TcpServer::ReadRequests(const ListSocketDescriptors& sockets) const noe if (err) { continue; } - log__->Debug("received request opcode: " + std::to_string(request.header.op_code) + " id: " + std::to_string( - request.header.data_id)); + log__->Debug("received request opcode: " + std::to_string(request->header.op_code) + " id: " + std::to_string( + request->header.data_id)); requests.emplace_back(std::move(request)); } return requests; } -Requests TcpServer::GetNewRequests(Error* err) const noexcept { +GenericRequests TcpServer::GetNewRequests(Error* err) const noexcept { if (*err = InitializeMasterSocketIfNeeded()) { return {}; } diff --git a/receiver/src/receiver_data_server/tcp_server.h b/receiver/src/receiver_data_server/tcp_server.h index 2248cf192..24eb78c58 100644 --- a/receiver/src/receiver_data_server/tcp_server.h +++ b/receiver/src/receiver_data_server/tcp_server.h @@ -4,7 +4,7 @@ #include "net_server.h" #include "io/io.h" #include "logger/logger.h" - +#include "receiver_data_server_request.h" namespace asapo { const int kMaxPendingConnections = 5; @@ -13,15 +13,15 @@ class TcpServer : public NetServer { public: TcpServer(std::string address); ~TcpServer(); - virtual Requests GetNewRequests(Error* err) const noexcept override ; + virtual GenericRequests GetNewRequests(Error* err) const noexcept override ; std::unique_ptr<IO> io__; const AbstractLogger* log__; private: void CloseSocket(SocketDescriptor socket) const noexcept; ListSocketDescriptors GetActiveSockets(Error* err) const noexcept; Error InitializeMasterSocketIfNeeded() const noexcept; - Request ReadRequest(SocketDescriptor socket, Error* err) const noexcept; - Requests ReadRequests(const ListSocketDescriptors& sockets) const noexcept; + ReceiverDataServerRequestPtr ReadRequest(SocketDescriptor socket, Error* err) const noexcept; + GenericRequests ReadRequests(const ListSocketDescriptors& sockets) const noexcept; mutable SocketDescriptor master_socket_{kDisconnectedSocketDescriptor}; mutable ListSocketDescriptors sockets_to_listen_; std::string address_; diff --git a/receiver/src/request.cpp b/receiver/src/request.cpp index 89bc98f56..f226e5eb1 100644 --- a/receiver/src/request.cpp +++ b/receiver/src/request.cpp @@ -70,7 +70,7 @@ const RequestHandlerList& Request::GetListHandlers() const { } -void Request::AddHandler(const RequestHandler* handler) { +void Request::AddHandler(const ReceiverRequestHandler* handler) { handlers_.emplace_back(handler); } diff --git a/receiver/src/request.h b/receiver/src/request.h index 422a7744e..7d0a10367 100644 --- a/receiver/src/request.h +++ b/receiver/src/request.h @@ -17,7 +17,7 @@ #include "preprocessor/definitions.h" namespace asapo { -using RequestHandlerList = std::vector<const RequestHandler*>; +using RequestHandlerList = std::vector<const ReceiverRequestHandler*>; class Request { public: @@ -25,7 +25,7 @@ class Request { ~Request() = default; Request(const GenericRequestHeader& request_header, SocketDescriptor socket_fd, std::string origin_uri, DataCache* cache); - VIRTUAL void AddHandler(const RequestHandler*); + VIRTUAL void AddHandler(const ReceiverRequestHandler*); VIRTUAL const RequestHandlerList& GetListHandlers() const; VIRTUAL uint64_t GetDataSize() const; VIRTUAL uint64_t GetDataID() const; diff --git a/receiver/src/request_handler.h b/receiver/src/request_handler.h index b02cca661..604fb23f4 100644 --- a/receiver/src/request_handler.h +++ b/receiver/src/request_handler.h @@ -8,11 +8,11 @@ namespace asapo { class Request; -class RequestHandler { +class ReceiverRequestHandler { public: virtual Error ProcessRequest(Request* request) const = 0; virtual StatisticEntity GetStatisticEntity() const = 0; - virtual ~RequestHandler() = default; + virtual ~ReceiverRequestHandler() = default; private: }; diff --git a/receiver/src/request_handler_authorize.h b/receiver/src/request_handler_authorize.h index 59069240c..8ecf7fee2 100644 --- a/receiver/src/request_handler_authorize.h +++ b/receiver/src/request_handler_authorize.h @@ -12,7 +12,7 @@ namespace asapo { -class RequestHandlerAuthorize final: public RequestHandler { +class RequestHandlerAuthorize final: public ReceiverRequestHandler { public: RequestHandlerAuthorize(); StatisticEntity GetStatisticEntity() const override; diff --git a/receiver/src/request_handler_db_write.h b/receiver/src/request_handler_db_write.h index de3bcaa76..87b5496fc 100644 --- a/receiver/src/request_handler_db_write.h +++ b/receiver/src/request_handler_db_write.h @@ -9,7 +9,7 @@ namespace asapo { -class RequestHandlerDbWrite final: public RequestHandler { +class RequestHandlerDbWrite final: public ReceiverRequestHandler { public: RequestHandlerDbWrite(); StatisticEntity GetStatisticEntity() const override; diff --git a/receiver/src/request_handler_file_write.h b/receiver/src/request_handler_file_write.h index a0d36b48a..2868e8fba 100644 --- a/receiver/src/request_handler_file_write.h +++ b/receiver/src/request_handler_file_write.h @@ -10,7 +10,7 @@ namespace asapo { const uint64_t kMaxFileSize = uint64_t(1024) * 1024 * 1024 * 2; //2GB -class RequestHandlerFileWrite final: public RequestHandler { +class RequestHandlerFileWrite final: public ReceiverRequestHandler { public: RequestHandlerFileWrite(); StatisticEntity GetStatisticEntity() const override; diff --git a/receiver/unittests/mock_receiver_config.cpp b/receiver/unittests/mock_receiver_config.cpp index 8e03129c2..ee0650003 100644 --- a/receiver/unittests/mock_receiver_config.cpp +++ b/receiver/unittests/mock_receiver_config.cpp @@ -45,8 +45,10 @@ Error SetReceiverConfig (const ReceiverConfig& config, std::string error_field) config_string += "," + Key("MonitorDbName", error_field) + "\"" + config.monitor_db_name + "\""; config_string += "," + Key("BrokerDbAddress", error_field) + "\"" + config.broker_db_uri + "\""; config_string += "," + Key("ListenPort", error_field) + std::to_string(config.listen_port); - config_string += "," + Key("DataServer", error_field) + "{" + Key("ListenPort", error_field) + std::to_string( - config.dataserver_listen_port) + "}"; + config_string += "," + Key("DataServer", error_field) + "{"; + config_string += Key("ListenPort", error_field) + std::to_string(config.dataserver_listen_port); + config_string += "," + Key("NThreads", error_field) + std::to_string(config.dataserver_nthreads); + config_string += "}"; config_string += "," + Key("DataCache", error_field) + "{"; config_string += Key("Use", error_field) + (config.use_datacache ? "true" : "false") ; config_string += "," + Key("SizeGB", error_field) + std::to_string(config.datacache_size_gb); diff --git a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h index 07ebfe140..390a628e7 100644 --- a/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h +++ b/receiver/unittests/receiver_data_server/receiver_dataserver_mocking.h @@ -5,32 +5,42 @@ #include <gmock/gmock.h> #include "../../src/receiver_data_server/net_server.h" -#include "../../src/receiver_data_server/request_pool.h" - +#include "request/request_pool.h" +#include "../../src/receiver_data_server/receiver_data_server_request.h" namespace asapo { class MockNetServer : public NetServer { public: - Requests GetNewRequests(Error* err) const noexcept override { + GenericRequests GetNewRequests(Error* err) const noexcept override { ErrorInterface* error = nullptr; auto reqs = GetNewRequests_t(&error); err->reset(error); - return reqs; + GenericRequests res; + for (const auto& preq : reqs) { + ReceiverDataServerRequestPtr ptr = ReceiverDataServerRequestPtr{new ReceiverDataServerRequest{preq.header, preq.net_id, preq.server}}; + res.push_back(std::move(ptr)); + } + return res; } - MOCK_CONST_METHOD1(GetNewRequests_t, Requests (ErrorInterface** - error)); + MOCK_CONST_METHOD1(GetNewRequests_t, std::vector<ReceiverDataServerRequest> (ErrorInterface** + error)); }; class MockPool : public RequestPool { public: - Error AddRequests(const Requests& requests) noexcept override { - return Error(AddRequests_t(requests)); + MockPool(): RequestPool(0, nullptr, nullptr) {}; + Error AddRequests(GenericRequests requests) noexcept override { + std::vector<GenericRequest> reqs; + for (const auto& preq : requests) { + reqs.push_back(GenericRequest{preq->header}); + } + return Error(AddRequests_t(std::move(reqs))); } - MOCK_METHOD1(AddRequests_t, ErrorInterface * (const Requests&)); + MOCK_METHOD1(AddRequests_t, ErrorInterface * (std::vector<GenericRequest>)); }; diff --git a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp index c3875026e..9a2e4c714 100644 --- a/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp +++ b/receiver/unittests/receiver_data_server/test_receiver_data_server.cpp @@ -5,7 +5,6 @@ #include "unittests/MockLogger.h" #include "../../src/receiver_data_server/receiver_data_server.h" #include "../../src/receiver_data_server/tcp_server.h" -#include "../../src/receiver_data_server/common.h" #include "receiver_dataserver_mocking.h" @@ -30,22 +29,24 @@ using ::testing::HasSubstr; using asapo::MockLogger; using asapo::ReceiverDataServer; using asapo::Error; +using asapo::GenericRequests; +using asapo::GenericRequest; +using asapo::ReceiverDataServerRequest; namespace { TEST(ReceiverDataServer, Constructor) { - ReceiverDataServer data_server{"", asapo::LogLevel::Debug}; + ReceiverDataServer data_server{"", asapo::LogLevel::Debug, 4}; ASSERT_THAT(dynamic_cast<const asapo::TcpServer*>(data_server.net__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(data_server.request_pool__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(data_server.log__), Ne(nullptr)); - } class ReceiverDataServerTests : public Test { public: std::string expected_address = "somehost:123"; - ReceiverDataServer data_server{expected_address, asapo::LogLevel::Debug}; + ReceiverDataServer data_server{expected_address, asapo::LogLevel::Debug, 0}; asapo::MockNetServer mock_net; asapo::MockPool mock_pool; NiceMock<asapo::MockLogger> mock_logger; @@ -63,11 +64,11 @@ class ReceiverDataServerTests : public Test { TEST_F(ReceiverDataServerTests, TimeoutGetNewRequests) { EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kTimeout.Generate().release()), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ); @@ -81,7 +82,7 @@ TEST_F(ReceiverDataServerTests, TimeoutGetNewRequests) { TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) { EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ); @@ -95,7 +96,7 @@ TEST_F(ReceiverDataServerTests, ErrorGetNewRequests) { TEST_F(ReceiverDataServerTests, ErrorAddingRequests) { EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(nullptr), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ); @@ -113,11 +114,11 @@ TEST_F(ReceiverDataServerTests, ErrorAddingRequests) { TEST_F(ReceiverDataServerTests, Ok) { EXPECT_CALL(mock_net, GetNewRequests_t(_)).WillOnce( DoAll(SetArgPointee<0>(nullptr), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ).WillOnce( DoAll(SetArgPointee<0>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), - Return(asapo::Requests{}) + Return(std::vector<ReceiverDataServerRequest> {}) ) ); diff --git a/receiver/unittests/receiver_data_server/test_tcp_server.cpp b/receiver/unittests/receiver_data_server/test_tcp_server.cpp index b69fc6cc5..f7c35a80e 100644 --- a/receiver/unittests/receiver_data_server/test_tcp_server.cpp +++ b/receiver/unittests/receiver_data_server/test_tcp_server.cpp @@ -37,7 +37,6 @@ TEST(TCPServer, Constructor) { } -uint64_t expected_id = 124; std::string expected_address = "somehost:123"; class TCPServerTests : public Test { @@ -225,10 +224,10 @@ TEST_F(TCPServerTests, GetNewRequestsReadOk) { ASSERT_THAT(requests.size(), Eq(3)); int i = 0; for (auto conn : expected_client_sockets) { - ASSERT_THAT(requests[i].header.data_id, Eq(conn)); - ASSERT_THAT(requests[i].header.op_code, Eq(asapo::kOpcodeGetBufferData)); - ASSERT_THAT(requests[i].net_id, Eq(conn)); - ASSERT_THAT(requests[i].server, Eq(&tcp_server)); + ASSERT_THAT(requests[i]->header.data_id, Eq(conn)); + ASSERT_THAT(requests[i]->header.op_code, Eq(asapo::kOpcodeGetBufferData)); +// ASSERT_THAT(requests[i]->net_id, Eq(conn)); +// ASSERT_THAT(requests[i]->server, Eq(&tcp_server)); i++; } diff --git a/receiver/unittests/test_config.cpp b/receiver/unittests/test_config.cpp index 450ac82bf..2ea4e9bcc 100644 --- a/receiver/unittests/test_config.cpp +++ b/receiver/unittests/test_config.cpp @@ -60,7 +60,7 @@ class ConfigTests : public Test { test_config.datacache_reserved_share = 10; test_config.datacache_size_gb = 2; test_config.source_host = "host"; - + test_config.dataserver_nthreads = 5; } }; @@ -90,7 +90,7 @@ TEST_F(ConfigTests, ReadSettings) { ASSERT_THAT(config->datacache_reserved_share, Eq(10)); ASSERT_THAT(config->datacache_size_gb, Eq(2)); ASSERT_THAT(config->source_host, Eq("host")); - + ASSERT_THAT(config->dataserver_nthreads, Eq(5)); } @@ -101,7 +101,7 @@ TEST_F(ConfigTests, ErrorReadSettings) { std::vector<std::string>fields {"MonitorDbAddress", "ListenPort", "DataServer", "ListenPort", "WriteToDisk", "WriteToDb", "DataCache", "Use", "SizeGB", "ReservedShare", "BrokerDbAddress", "Tag", "AuthorizationServer", "AuthorizationInterval", "RootFolder", "MonitorDbName", "LogLevel", - "SourceHost"}; + "SourceHost","NThreads"}; for (const auto& field : fields) { auto err = asapo::SetReceiverConfig(test_config, field); ASSERT_THAT(err, Ne(nullptr)); diff --git a/receiver/unittests/test_request.cpp b/receiver/unittests/test_request.cpp index 9ae52adf9..f9181ef55 100644 --- a/receiver/unittests/test_request.cpp +++ b/receiver/unittests/test_request.cpp @@ -46,7 +46,7 @@ using asapo::RequestFactory; namespace { -class MockReqestHandler : public asapo::RequestHandler { +class MockReqestHandler : public asapo::ReceiverRequestHandler { public: Error ProcessRequest(Request* request) const override { return Error{ProcessRequest_t(*request)}; diff --git a/tests/automatic/settings/receiver.json.tpl.lin b/tests/automatic/settings/receiver.json.tpl.lin index 4374a04d2..b8a63465a 100644 --- a/tests/automatic/settings/receiver.json.tpl.lin +++ b/tests/automatic/settings/receiver.json.tpl.lin @@ -3,6 +3,7 @@ "MonitorDbName": "db_test", "BrokerDbAddress":"localhost:27017", "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": { diff --git a/tests/automatic/settings/receiver.json.tpl.win b/tests/automatic/settings/receiver.json.tpl.win index dfca9c68c..a8fd5eec6 100644 --- a/tests/automatic/settings/receiver.json.tpl.win +++ b/tests/automatic/settings/receiver.json.tpl.win @@ -6,6 +6,7 @@ "AuthorizationInterval": 10000, "ListenPort": {{ env "NOMAD_PORT_recv" }}, "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": { diff --git a/tests/manual/performance_full_chain_simple/receiver.json b/tests/manual/performance_full_chain_simple/receiver.json index e06c14e60..b9bcf2a58 100644 --- a/tests/manual/performance_full_chain_simple/receiver.json +++ b/tests/manual/performance_full_chain_simple/receiver.json @@ -6,6 +6,7 @@ "AuthorizationInterval": 10000, "ListenPort":4200, "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": { diff --git a/tests/manual/performance_producer_receiver/receiver.json b/tests/manual/performance_producer_receiver/receiver.json index e06c14e60..b9bcf2a58 100644 --- a/tests/manual/performance_producer_receiver/receiver.json +++ b/tests/manual/performance_producer_receiver/receiver.json @@ -6,6 +6,7 @@ "AuthorizationInterval": 10000, "ListenPort":4200, "DataServer": { + "NThreads": 2, "ListenPort": 23123 }, "DataCache": { -- GitLab