Skip to content
Snippets Groups Projects
Commit 3e3714ce authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

working pool for dataserver

parent f4ec8b7b
No related branches found
No related tags found
No related merge requests found
Showing
with 106 additions and 51 deletions
......@@ -7,6 +7,7 @@
namespace asapo {
class GenericRequest {
public:
GenericRequest() = delete;
......@@ -15,6 +16,9 @@ class GenericRequest {
virtual ~GenericRequest() = default;
};
using GenericRequestPtr = std::unique_ptr<GenericRequest>;
using GenericRequests = std::vector<GenericRequestPtr>;
}
#endif //ASAPO_GENERIC_REQUEST_H
......@@ -21,23 +21,25 @@ class RequestPool {
std::unique_lock<std::mutex> lock;
};
public:
explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, AbstractLogger* log);
VIRTUAL Error AddRequest(std::unique_ptr<GenericRequest> request);
explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory, const AbstractLogger* log);
VIRTUAL Error AddRequest(GenericRequestPtr request);
VIRTUAL Error AddRequests(GenericRequests requests);
~RequestPool();
uint64_t NRequestsInQueue();
private:
AbstractLogger* log__;
const AbstractLogger* log__;
RequestHandlerFactory* request_handler_factory__;
std::vector<std::thread> threads_;
void ThreadHandler(uint64_t id);
bool quit_{false};
std::condition_variable condition_;
std::mutex mutex_;
std::deque<std::unique_ptr<GenericRequest>> request_queue_;
std::deque<GenericRequestPtr> request_queue_;
bool CanProcessRequest(const std::unique_ptr<RequestHandler>& request_handler);
void ProcessRequest(const std::unique_ptr<RequestHandler>& request_handler, ThreadInformation* thread_info);
std::unique_ptr<GenericRequest> GetRequestFromQueue();
void PutRequestBackToQueue(std::unique_ptr<GenericRequest>request);
GenericRequestPtr GetRequestFromQueue();
void PutRequestBackToQueue(GenericRequestPtr request);
uint64_t shared_counter_{0};
};
......
#include <request/request_pool.h>
#include "request/request_pool.h"
namespace asapo {
RequestPool:: RequestPool(uint8_t n_threads,
RequestHandlerFactory* request_handler_factory, AbstractLogger* log): log__{log},
RequestHandlerFactory* request_handler_factory, const AbstractLogger* log): log__{log},
request_handler_factory__{request_handler_factory},
threads_{n_threads} {
for(size_t i = 0; i < n_threads; i++) {
......@@ -15,7 +15,7 @@ RequestPool:: RequestPool(uint8_t n_threads,
}
Error RequestPool::AddRequest(std::unique_ptr<GenericRequest> request) {
Error RequestPool::AddRequest(GenericRequestPtr request) {
std::unique_lock<std::mutex> lock(mutex_);
request_queue_.emplace_back(std::move(request));
lock.unlock();
......@@ -29,13 +29,13 @@ bool RequestPool::CanProcessRequest(const std::unique_ptr<RequestHandler>& reque
return request_queue_.size() && request_handler->ReadyProcessRequest();
}
std::unique_ptr<GenericRequest> RequestPool::GetRequestFromQueue() {
GenericRequestPtr RequestPool::GetRequestFromQueue() {
auto request = std::move(request_queue_.front());
request_queue_.pop_front();
return request;
}
void RequestPool::PutRequestBackToQueue(std::unique_ptr<GenericRequest> request) {
void RequestPool::PutRequestBackToQueue(GenericRequestPtr request) {
request_queue_.emplace_front(std::move(request));
}
......@@ -86,5 +86,16 @@ uint64_t RequestPool::NRequestsInQueue() {
std::lock_guard<std::mutex> lock{mutex_};
return request_queue_.size();
}
Error RequestPool::AddRequests(GenericRequests requests) {
std::unique_lock<std::mutex> lock(mutex_);
for (auto& elem : requests) {
request_queue_.emplace_front(std::move(elem));
}
lock.unlock();
//todo: maybe notify_one is better here
condition_.notify_all();
return nullptr;
}
}
......@@ -136,6 +136,23 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) {
}
TEST_F(RequestPoolTests, AddRequestsOk) {
TestRequest* request2 = new TestRequest{GenericRequestHeader{}};
ExpectSend(mock_request_handler, 2);
std::vector<std::unique_ptr<GenericRequest>> requests;
requests.push_back(std::move(request));
requests.push_back(std::unique_ptr<GenericRequest> {request2});
auto err = pool.AddRequests(std::move(requests));
std::this_thread::sleep_for(std::chrono::milliseconds(30));
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(RequestPoolTests, FinishProcessingThreads) {
EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads);
......
......@@ -6,6 +6,7 @@
"AuthorizationInterval": 10000,
"ListenPort": {{ env "NOMAD_PORT_recv" }},
"DataServer": {
"NThreads": 2,
"ListenPort": 23123
},
"DataCache": {
......
......@@ -6,7 +6,7 @@ set(SOURCE_FILES
src/request_handler_tcp.cpp
src/request_handler_filesystem.cpp
src/receiver_discovery_service.cpp
src/producer_request_handler_factory.cpp
src/receiver_data_server_request_handler_factory.cpp
src/producer_request.cpp)
......
......@@ -26,7 +26,7 @@ class ReceiverDiscoveryService {
VIRTUAL uint64_t UpdateFrequency();
public:
std::unique_ptr<HttpClient> httpclient__;
AbstractLogger* log__;
const AbstractLogger* log__;
private:
static const std::string kServiceEndpointSuffix;
void ThreadHandler();
......
......@@ -15,10 +15,10 @@ set(SOURCE_FILES
src/receiver_data_server/receiver_data_server.cpp
src/receiver_data_server/net_server.cpp
src/receiver_data_server/tcp_server.cpp
src/receiver_data_server/request_pool.cpp
src/receiver_data_server/request.cpp
src/receiver_data_server/receiver_data_server_request.cpp
src/receiver_data_server/receiver_data_server_logger.cpp
src/data_cache.cpp)
src/data_cache.cpp src/receiver_data_server/receiver_data_server_request_handler_factory.cpp
)
################################
......@@ -28,7 +28,7 @@ set(SOURCE_FILES
add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:curl_http_client>
$<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger>)
$<TARGET_OBJECTS:json_parser> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:request_pool>)
set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX)
target_include_directories(${TARGET_NAME} PUBLIC ${ASAPO_CXX_COMMON_INCLUDE_DIR} ${CURL_INCLUDE_DIRS})
target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} database)
......
......@@ -23,7 +23,7 @@ asapo::Error ReadConfigFile(int argc, char* argv[]) {
std::thread StartDataServer(const asapo::ReceiverConfig* config, asapo::SharedCache cache) {
static const std::string dataserver_address = "0.0.0.0:" + std::to_string(config->dataserver_listen_port);
return std::thread([config] {
asapo::ReceiverDataServer data_server{dataserver_address, config->log_level};
asapo::ReceiverDataServer data_server{dataserver_address, config->log_level, config->dataserver_nthreads};
data_server.Run();
});
}
......
......@@ -21,6 +21,7 @@ Error ReceiverConfigFactory::SetConfig(std::string file_name) {
(err = parser.GetString("MonitorDbAddress", &config.monitor_db_uri)) ||
(err = parser.GetUInt64("ListenPort", &config.listen_port)) ||
(err = parser.Embedded("DataServer").GetUInt64("ListenPort", &config.dataserver_listen_port)) ||
(err = parser.Embedded("DataServer").GetUInt64("NThreads", &config.dataserver_nthreads)) ||
(err = parser.GetBool("WriteToDisk", &config.write_to_disk)) ||
(err = parser.GetBool("WriteToDb", &config.write_to_db)) ||
(err = parser.Embedded("DataCache").GetBool("Use", &config.use_datacache)) ||
......
......@@ -21,6 +21,7 @@ struct ReceiverConfig {
bool use_datacache = true;
uint64_t datacache_size_gb = 0;
uint64_t datacache_reserved_share = 0;
uint64_t dataserver_nthreads = 1;
LogLevel log_level = LogLevel::Info;
std::string tag;
std::string source_host;
......
File deleted
#ifndef ASAPO_COMMON_H
#define ASAPO_COMMON_H
#include <vector>
#include "request.h"
namespace asapo {
using Requests = std::vector<asapo::Request>;
}
#endif //ASAPO_COMMON_H
......@@ -3,13 +3,13 @@
#include "common/error.h"
#include "common.h"
#include "request/request.h"
namespace asapo {
class NetServer {
public:
virtual Requests GetNewRequests(Error* err) const noexcept = 0;
virtual GenericRequests GetNewRequests(Error* err) const noexcept = 0;
virtual ~NetServer() = default;
};
......
#include "receiver_data_server.h"
#include "tcp_server.h"
#include "receiver_data_server_logger.h"
#include "receiver_data_server_request_handler_factory.h"
namespace asapo {
ReceiverDataServer::ReceiverDataServer(std::string address, LogLevel log_level) : request_pool__{new RequestPool}, net__{new TcpServer(address)},
ReceiverDataServer::ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads) : net__{new TcpServer(address)},
log__{GetDefaultReceiverDataServerLogger()} {
request_handler_factory_.reset(new ReceiverDataServerRequestHandlerFactory());
GetDefaultReceiverDataServerLogger()->SetLogLevel(log_level);
request_pool__.reset(new RequestPool{n_threads, request_handler_factory_.get(), log__});
}
void ReceiverDataServer::Run() {
......@@ -17,7 +20,7 @@ void ReceiverDataServer::Run() {
continue;
}
if (!err) {
err = request_pool__->AddRequests(requests);
err = request_pool__->AddRequests(std::move(requests));
}
if (err) {
log__->Error(std::string("receiver data server stopped: ") + err->Explain());
......
......@@ -4,14 +4,17 @@
#include <memory>
#include "net_server.h"
#include "request_pool.h"
#include "request/request_pool.h"
#include "logger/logger.h"
namespace asapo {
class ReceiverDataServer {
private:
// important to create it before request_pool__
std::unique_ptr<RequestHandlerFactory> request_handler_factory_;
public:
explicit ReceiverDataServer(std::string address, LogLevel log_level);
explicit ReceiverDataServer(std::string address, LogLevel log_level, uint8_t n_threads);
std::unique_ptr<RequestPool> request_pool__;
std::unique_ptr<NetServer> net__;
const AbstractLogger* log__;
......
#ifndef ASAPO_RECEIVER_LOGGER_H
#define ASAPO_RECEIVER_LOGGER_H
#include "logger/logger.h"
namespace asapo {
AbstractLogger* GetDefaultReceiverDataServerLogger();
}
#endif //ASAPO_RECEIVER_LOGGER_H
#include "receiver_data_server_request.h"
#include "receiver_data_server.h"
namespace asapo {
ReceiverDataServerRequest::ReceiverDataServerRequest(GenericRequestHeader header, uint64_t net_id,
const NetServer* server) :
GenericRequest(std::move(header)),
net_id{net_id}, server{server} {
}
}
\ No newline at end of file
......@@ -3,17 +3,23 @@
#include "common/networking.h"
#include "request/request.h"
namespace asapo {
class NetServer;
struct Request {
explicit Request(uint64_t net_id, const NetServer* server);
GenericRequestHeader header;
class ReceiverDataServerRequest : public GenericRequest {
public:
explicit ReceiverDataServerRequest(GenericRequestHeader header, uint64_t net_id, const NetServer* server);
const uint64_t net_id;
const NetServer* server;
~ReceiverDataServerRequest() = default;
};
using ReceiverDataServerRequestPtr = std::unique_ptr<ReceiverDataServerRequest>;
}
#endif //ASAPO_REQUEST_H
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment