From 6e2c3ea3feb7ed424583df553e691aee0fe73b83 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Mon, 14 May 2018 18:08:58 +0200
Subject: [PATCH] started with request pool

---
 CMakeLists.txt                                |   5 +
 .../dummy_data_producer.cpp                   |  20 +-
 producer/api/CMakeLists.txt                   |   1 +
 producer/api/include/producer/producer.h      |   3 +-
 .../api/include/producer/producer_error.h     |   8 +
 producer/api/src/producer.cpp                 |  18 +-
 producer/api/src/producer_impl.cpp            |   2 +-
 producer/api/src/producer_impl.h              |   3 +-
 producer/api/src/request.cpp                  |  15 +-
 producer/api/src/request.h                    |  13 +-
 producer/api/src/request_pool.cpp             |  80 +++-
 producer/api/src/request_pool.h               |  21 ++
 producer/api/unittests/test_producer.cpp      |  17 +-
 producer/api/unittests/test_producer_impl.cpp |   4 +-
 producer/api/unittests/test_request.cpp       |  44 ++-
 producer/api/unittests/test_request_pool.cpp  | 354 ++++++++++++++++++
 receiver/src/statistics.h                     |  17 +-
 17 files changed, 575 insertions(+), 50 deletions(-)
 create mode 100644 producer/api/unittests/test_request_pool.cpp

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 85845171a..4327c4822 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -16,6 +16,11 @@ ELSEIF(UNIX)
     SET_PROPERTY(GLOBAL PROPERTY ASAPO_COMMON_IO_LIBRARIES Threads::Threads)
 ENDIF(WIN32)
 
+if (CMAKE_BUILD_TYPE STREQUAL "Debug")
+    add_definitions(-DUNIT_TESTS)
+endif (CMAKE_BUILD_TYPE STREQUAL "Debug")
+
+
 option(BUILD_TESTS "Uses googletest to build tests" OFF)
 option(BUILD_INTEGRATION_TESTS "Include integration tests (CMAKE >3.7 is needed)" OFF)
 option(BUILD_DOCS "Uses doxygen to build the documentaion" OFF)
diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index d73fa38c6..38b1071ec 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -7,16 +7,16 @@
 
 using std::chrono::high_resolution_clock;
 
-typedef std::tuple<std::string, size_t, uint64_t> ArgumentTuple;
+typedef std::tuple<std::string, size_t, uint64_t, uint8_t> ArgumentTuple;
 ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) {
-    if (argc != 4) {
+    if (argc != 5) {
         std::cout <<
-                  "Usage: " << argv[0] << " <receiver_address> <number_of_byte> <iterations>"
+                  "Usage: " << argv[0] << " <receiver_address> <number_of_byte> <iterations> <nthreads>"
                   << std::endl;
         exit(EXIT_FAILURE);
     }
     try {
-        return ArgumentTuple(argv[1], std::stoull(argv[2]), std::stoull(argv[3]));
+        return ArgumentTuple(argv[1], std::stoull(argv[2]), std::stoull(argv[3]), std::stoull(argv[4]));
     } catch(std::exception& e) {
         std::cerr << "Fail to parse arguments" << std::endl;
         std::cerr << e.what() << std::endl;
@@ -47,15 +47,21 @@ int main (int argc, char* argv[]) {
     std::string receiver_address;
     size_t number_of_kbytes;
     uint64_t iterations;
-    std::tie(receiver_address, number_of_kbytes, iterations) = ProcessCommandArguments(argc, argv);
+    uint8_t nthreads;
+    std::tie(receiver_address, number_of_kbytes, iterations, nthreads) = ProcessCommandArguments(argc, argv);
 
     std::cout << "receiver_address: " << receiver_address << std::endl
               << "Package size: " << number_of_kbytes << "k" << std::endl
               << "iterations: " << iterations << std::endl
               << std::endl;
 
-    auto producer = asapo::Producer::Create();
-    auto err = producer->ConnectToReceiver(receiver_address);
+    asapo::Error err;
+    auto producer = asapo::Producer::Create(nthreads, &err);
+    if(err) {
+        std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
+        return EXIT_FAILURE;
+    }
+    err = producer->ConnectToReceiver(receiver_address);
     if(err) {
         std::cerr << "Failed to connect to receiver. ProducerError: " << err << std::endl;
         return EXIT_FAILURE;
diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt
index 818132ff3..e462956c2 100644
--- a/producer/api/CMakeLists.txt
+++ b/producer/api/CMakeLists.txt
@@ -22,6 +22,7 @@ set(TEST_SOURCE_FILES
         unittests/test_producer_impl.cpp
         unittests/test_producer.cpp
         unittests/test_request.cpp
+        unittests/test_request_pool.cpp
         )
 set(TEST_LIBRARIES "${TARGET_NAME}")
 
diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h
index 7408e3f78..0b1c6a209 100644
--- a/producer/api/include/producer/producer.h
+++ b/producer/api/include/producer/producer.h
@@ -16,6 +16,7 @@ enum class ProducerStatus {
     kConnected,
 };
 
+const uint8_t kMaxProcessingThreads = 32;
 
 using RequestCallback =  std::function<void(GenericNetworkRequestHeader, Error)>;
 
@@ -26,7 +27,7 @@ class Producer {
     /*!
      * @return A unique_ptr to a new producer instance
      */
-    static std::unique_ptr<Producer> Create();
+    static std::unique_ptr<Producer> Create(uint8_t n_processing_threads, Error* err);
 
     virtual ~Producer() = default;
 
diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h
index fccba0c10..13d43f255 100644
--- a/producer/api/include/producer/producer_error.h
+++ b/producer/api/include/producer/producer_error.h
@@ -12,6 +12,7 @@ enum class ProducerErrorType {
     kFileIdAlreadyInUse,
     kUnknownServerError,
     kCannotSendDataToReceivers,
+    kRequestPoolIsFull
 };
 
 //TODO Make a marco to create error class and error template class
@@ -76,6 +77,13 @@ auto const kCannotSendDataToReceivers = ProducerErrorTemplate {
     "Cannot connect/send data to receivers", ProducerErrorType::kCannotSendDataToReceivers
 };
 
+auto const kRequestPoolIsFull = ProducerErrorTemplate {
+    "Cannot add request to poll - hit pool size limit", ProducerErrorType::kRequestPoolIsFull
+};
+
+
+
+
 };
 }
 
diff --git a/producer/api/src/producer.cpp b/producer/api/src/producer.cpp
index c69f21e14..125c6cb60 100644
--- a/producer/api/src/producer.cpp
+++ b/producer/api/src/producer.cpp
@@ -1,6 +1,20 @@
 #include "producer/producer.h"
 #include "producer_impl.h"
 
-std::unique_ptr<asapo::Producer> asapo::Producer::Create() {
-    return std::unique_ptr<asapo::Producer>(new ProducerImpl());
+std::unique_ptr<asapo::Producer> asapo::Producer::Create(uint8_t n_processing_threads, Error* err) {
+    if (n_processing_threads > kMaxProcessingThreads) {
+        *err = TextError("Too many processing threads: " + std::to_string(n_processing_threads));
+        return nullptr;
+    }
+
+    try {
+        *err = nullptr;
+        return std::unique_ptr<asapo::Producer>(new ProducerImpl(n_processing_threads));
+    } catch (const std::exception& ex) {
+        *err = TextError(ex.what());
+        return nullptr;
+    } catch (...) {
+        *err = TextError("Unknown exception in producer_api ");
+        return nullptr;
+    }
 }
diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp
index 6763e92ff..7e03097c7 100644
--- a/producer/api/src/producer_impl.cpp
+++ b/producer/api/src/producer_impl.cpp
@@ -10,7 +10,7 @@ namespace  asapo {
 const uint32_t ProducerImpl::kVersion = 1;
 const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t(1024) * size_t(2); //2GiByte
 
-ProducerImpl::ProducerImpl(): io__{GenerateDefaultIO()},log__{GetDefaultProducerLogger()} {
+ProducerImpl::ProducerImpl(uint8_t n_processing_threads): io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()} {
 }
 
 uint64_t ProducerImpl::GetVersion() const {
diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h
index 95d9af218..31ad897aa 100644
--- a/producer/api/src/producer_impl.h
+++ b/producer/api/src/producer_impl.h
@@ -8,6 +8,7 @@
 #include "logger/logger.h"
 
 namespace asapo {
+
 class ProducerImpl : public Producer {
   private:
     static const uint32_t kVersion;
@@ -26,7 +27,7 @@ class ProducerImpl : public Producer {
   public:
     static const size_t kMaxChunkSize;
 
-    ProducerImpl();
+    explicit ProducerImpl(uint8_t n_processing_threads);
     ProducerImpl(const ProducerImpl&) = delete;
     ProducerImpl& operator=(const ProducerImpl&) = delete;
 
diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp
index bde4fcf67..c4a4035a7 100644
--- a/producer/api/src/request.cpp
+++ b/producer/api/src/request.cpp
@@ -6,7 +6,7 @@ namespace asapo {
 
 Request::Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data,
                  RequestCallback callback):
-    io__{io}, log__{GetDefaultProducerLogger()},header_(header), data_{data}, callback_{std::move(callback)} {
+    io__{io}, log__{GetDefaultProducerLogger()}, header_(header), data_{data}, callback_{std::move(callback)} {
 
 }
 
@@ -21,7 +21,7 @@ Error Request::ConnectToReceiver(SocketDescriptor* sd, const std::string& receiv
     return nullptr;
 }
 
-Error Request::SendHeaderAndData(SocketDescriptor sd,const std::string& receiver_address) {
+Error Request::SendHeaderAndData(SocketDescriptor sd, const std::string& receiver_address) {
     Error io_error;
     io__->Send(sd, &header_, sizeof(header_), &io_error);
     if(io_error) {
@@ -40,7 +40,7 @@ Error Request::SendHeaderAndData(SocketDescriptor sd,const std::string& receiver
 }
 
 
-Error Request::ReceiveResponse(SocketDescriptor sd, const std::string &receiver_address) {
+Error Request::ReceiveResponse(SocketDescriptor sd, const std::string& receiver_address) {
     Error err;
     SendDataResponse sendDataResponse;
     io__->Receive(sd, &sendDataResponse, sizeof(sendDataResponse), &err);
@@ -59,7 +59,7 @@ Error Request::ReceiveResponse(SocketDescriptor sd, const std::string &receiver_
 }
 
 Error Request::TrySendToReceiver(SocketDescriptor sd, const std::string& receiver_address) {
-    auto err = SendHeaderAndData(sd,receiver_address);
+    auto err = SendHeaderAndData(sd, receiver_address);
     if (err)  {
         return err;
     }
@@ -82,7 +82,7 @@ Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) {
             if (err != nullptr ) continue;
         }
 
-        auto err = TrySendToReceiver(*sd,receiver_uri);
+        auto err = TrySendToReceiver(*sd, receiver_uri);
         if (err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse)  {
             io__->CloseSocket(*sd, nullptr);
             *sd = kDisconnectedSocketDescriptor;
@@ -96,4 +96,9 @@ Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list) {
     return ProducerErrorTemplates::kCannotSendDataToReceivers.Generate();
 }
 
+uint64_t Request::GetMemoryRequitements() {
+    return header_.data_size + sizeof(Request);
+}
+
+
 }
diff --git a/producer/api/src/request.h b/producer/api/src/request.h
index f12627176..b245e7134 100644
--- a/producer/api/src/request.h
+++ b/producer/api/src/request.h
@@ -8,19 +8,26 @@
 
 #include "producer/producer.h"
 
+#ifdef UNIT_TESTS
+#define VIRTUAL virtual
+#endif
+
+
 namespace asapo {
 
 class Request {
   public:
     explicit Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data,
                      RequestCallback callback);
-    Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list);
+    VIRTUAL Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list);
+    VIRTUAL ~Request()=default;
     const IO* io__;
     const AbstractLogger* log__;
+    uint64_t GetMemoryRequitements();
   private:
     Error ConnectToReceiver(SocketDescriptor* sd, const std::string& receiver_address);
-    Error SendHeaderAndData(SocketDescriptor sd,const std::string& receiver_address);
-    Error ReceiveResponse(SocketDescriptor sd, const std::string &receiver_address);
+    Error SendHeaderAndData(SocketDescriptor sd, const std::string& receiver_address);
+    Error ReceiveResponse(SocketDescriptor sd, const std::string& receiver_address);
     Error TrySendToReceiver(SocketDescriptor sd, const std::string& receiver_address);
     GenericNetworkRequestHeader header_;
     const void* data_;
diff --git a/producer/api/src/request_pool.cpp b/producer/api/src/request_pool.cpp
index d4135e6f8..ac919a860 100644
--- a/producer/api/src/request_pool.cpp
+++ b/producer/api/src/request_pool.cpp
@@ -1,7 +1,85 @@
-#include <vector>
 #include "request_pool.h"
+#include "producer_logger.h"
 
 namespace asapo {
 
+RequestPool:: RequestPool(uint8_t n_threads, uint64_t max_pool_volume): log__{GetDefaultProducerLogger()}, threads_{n_threads},
+    max_pool_volume_{max_pool_volume} {
+    for(size_t i = 0; i < threads_.size(); i++) {
+        log__->Debug("starting thread " + std::to_string(i));
+        threads_[i] = std::thread(
+                          std::bind(&RequestPool::ThreadHandler, this));
+    }
+
+}
+
+bool RequestPool::RequestWouldFit(const std::unique_ptr<Request>& request) {
+    return request->GetMemoryRequitements() + current_pool_volume_ < max_pool_volume_;
+}
+
+Error RequestPool::AddRequest(std::unique_ptr<Request> request) {
+    if (!RequestWouldFit(request)) {
+        return ProducerErrorTemplates::kRequestPoolIsFull.Generate();
+    }
+
+    std::unique_lock<std::mutex> lock(mutex_);
+    request_queue_.emplace_back(std::move(request));
+    lock.unlock();
+//todo: maybe notify_one is better here
+    condition_.notify_all();
+
+
+    return nullptr;
+}
+
+
+void RequestPool::ThreadHandler(void) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    SocketDescriptor thread_sd = kDisconnectedSocketDescriptor;
+    ReceiversList thread_receivers;
+    do {
+        condition_.wait(lock, [this] {
+            return (request_queue_.size() || quit_);
+        });
+        //after wait, we own the lock
+
+        if (request_queue_.size() && !quit_) {
+
+            if (thread_sd == kDisconnectedSocketDescriptor) {
+                thread_receivers = ReceiversList{"test"};
+            }
+
+            auto request = std::move(request_queue_.front());
+            request_queue_.pop_front();
+
+            //unlock now that we're done messing with the queue
+            lock.unlock();
+
+            auto err = request->Send(&thread_sd,thread_receivers);
+            // we should lock again for the next wait since we did unlock
+            lock.lock();
+            if (err) {
+                // could not send from this thread - place request back in the queue
+                request_queue_.emplace_front(std::move(request));
+            }
+        };
+    } while (!quit_);
+}
+
+RequestPool::~RequestPool() {
+    mutex_.lock();
+    quit_ = true;
+    mutex_.unlock();
+    condition_.notify_all();
+
+    for(size_t i = 0; i < threads_.size(); i++) {
+        if(threads_[i].joinable()) {
+            log__->Debug("finishing thread " + std::to_string(i));
+            threads_[i].join();
+        }
+    }
+}
+
+
 
 }
diff --git a/producer/api/src/request_pool.h b/producer/api/src/request_pool.h
index df6a1316b..e898be05a 100644
--- a/producer/api/src/request_pool.h
+++ b/producer/api/src/request_pool.h
@@ -4,12 +4,33 @@
 #include <string>
 #include <vector>
 #include <mutex>
+#include <thread>
+#include <atomic>
+#include <condition_variable>
+#include <queue>
+
+#include "logger/logger.h"
+#include "request.h"
+
 
 namespace asapo {
 
 class RequestPool {
   public:
+    explicit RequestPool(uint8_t n_threads, uint64_t max_pool_volume);
+    Error AddRequest(std::unique_ptr<Request> request);
+    ~RequestPool();
+    AbstractLogger* log__;
   private:
+    std::vector<std::thread> threads_;
+    void ThreadHandler();
+    bool quit_{false};
+    std::condition_variable condition_;
+    std::mutex mutex_;
+    std::deque<std::unique_ptr<Request>> request_queue_;
+    uint64_t max_pool_volume_;
+    uint64_t current_pool_volume_{0};
+    bool RequestWouldFit(const std::unique_ptr<Request>& request);
 };
 
 }
diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp
index 80072d7c1..4d3b36f76 100644
--- a/producer/api/unittests/test_producer.cpp
+++ b/producer/api/unittests/test_producer.cpp
@@ -4,13 +4,26 @@
 #include "producer/producer.h"
 #include "../src/producer_impl.h"
 using ::testing::Ne;
+using ::testing::Eq;
 
 namespace {
 
 TEST(CreateProducer, PointerIsNotNullptr) {
-    std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create();
+    asapo::Error err;
+    std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(4, &err);
     ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr));
-    ASSERT_THAT(producer.get(), Ne(nullptr));
+    ASSERT_THAT(err, Eq(nullptr));
+
+}
+
+TEST(CreateProducer, TooManyThreads) {
+    asapo::Error err;
+    std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(asapo::kMaxProcessingThreads + 1, &err);
+    ASSERT_THAT(producer, Eq(nullptr));
+    ASSERT_THAT(err, Ne(nullptr));
 }
 
+
+
+
 }
diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp
index 6eb4b2743..bcf40f051 100644
--- a/producer/api/unittests/test_producer_impl.cpp
+++ b/producer/api/unittests/test_producer_impl.cpp
@@ -22,13 +22,13 @@ using ::testing::InSequence;
 using ::testing::HasSubstr;
 
 TEST(get_version, VersionAboveZero) {
-    asapo::ProducerImpl producer;
+    asapo::ProducerImpl producer{4};
     EXPECT_GE(producer.GetVersion(), 0);
 }
 
 
 TEST(Producer, Logger) {
-    asapo::ProducerImpl producer;
+    asapo::ProducerImpl producer{4};
     ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr));
 }
 
diff --git a/producer/api/unittests/test_request.cpp b/producer/api/unittests/test_request.cpp
index 8e65926f7..183b5de4b 100644
--- a/producer/api/unittests/test_request.cpp
+++ b/producer/api/unittests/test_request.cpp
@@ -30,7 +30,7 @@ using ::testing::InSequence;
 using ::testing::HasSubstr;
 
 
-TEST(Producer, Constructor) {
+TEST(Request, Constructor) {
     auto  io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()};
     asapo::GenericNetworkRequestHeader header;
     asapo::Request request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}};
@@ -122,10 +122,10 @@ void RequestTests::ExpectFailSendHeader(bool only_once) {
                 Return(-1)
             ));
         EXPECT_CALL(mock_logger, Debug(AllOf(
-            HasSubstr("cannot send header"),
-            HasSubstr(receivers_list[i])
+                                           HasSubstr("cannot send header"),
+                                           HasSubstr(receivers_list[i])
                                        )
-        ));
+                                      ));
         EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
         if (only_once) break;
         i++;
@@ -144,10 +144,10 @@ void RequestTests::ExpectFailSendData(bool only_once) {
                 Return(-1)
             ));
         EXPECT_CALL(mock_logger, Debug(AllOf(
-            HasSubstr("cannot send data"),
-            HasSubstr(receivers_list[i])
+                                           HasSubstr("cannot send data"),
+                                           HasSubstr(receivers_list[i])
                                        )
-        ));
+                                      ));
         EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
         if (only_once) break;
         i++;
@@ -167,10 +167,10 @@ void RequestTests::ExpectFailReceive(bool only_once) {
                 testing::Return(-1)
             ));
         EXPECT_CALL(mock_logger, Debug(AllOf(
-            HasSubstr("cannot receive"),
-            HasSubstr(receivers_list[i])
-                                      )
-        ));
+                                           HasSubstr("cannot receive"),
+                                           HasSubstr(receivers_list[i])
+                                       )
+                                      ));
         EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
         if (only_once) break;
         i++;
@@ -221,10 +221,10 @@ void RequestTests::ExpectOKConnect(bool only_once) {
                 Return(expected_sds[i])
             ));
         EXPECT_CALL(mock_logger, Info(AllOf(
-            HasSubstr("connected"),
-            HasSubstr(expected_address)
-          )
-        ));
+                                          HasSubstr("connected"),
+                                          HasSubstr(expected_address)
+                                      )
+                                     ));
         if (only_once) break;
         i++;
     }
@@ -242,16 +242,24 @@ void RequestTests::ExpectOKReceive(bool only_once) {
                 testing::ReturnArg<2>()
             ));
         EXPECT_CALL(mock_logger, Debug(AllOf(
-            HasSubstr("sent data"),
-            HasSubstr(receivers_list[i])
+                                           HasSubstr("sent data"),
+                                           HasSubstr(receivers_list[i])
                                        )
-        ));
+                                      ));
         if (only_once) break;
         i++;
     }
 }
 
 
+TEST_F(RequestTests, MemoryRequirements) {
+
+    auto size = request.GetMemoryRequitements();
+
+    ASSERT_THAT(size, Eq(sizeof(asapo::Request) + expected_file_size));
+}
+
+
 TEST_F(RequestTests, TriesConnectWhenNotConnected) {
     ExpectFailConnect();
 
diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp
new file mode 100644
index 000000000..ae2bd666c
--- /dev/null
+++ b/producer/api/unittests/test_request_pool.cpp
@@ -0,0 +1,354 @@
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+#include <chrono>
+
+#include "unittests/MockLogger.h"
+#include "common/error.h"
+
+#include "../src/request.h"
+#include "../src/request_pool.h"
+#include "io/io_factory.h"
+
+namespace {
+
+using ::testing::Return;
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::SetArgReferee;
+using ::testing::Gt;
+using ::testing::Eq;
+using ::testing::Ne;
+using ::testing::Mock;
+using ::testing::AllOf;
+
+
+using ::testing::InSequence;
+using ::testing::HasSubstr;
+
+using asapo::Request;
+using asapo::RequestPool;
+using asapo::Error;
+
+
+TEST(RequestPool, Constructor) {
+    asapo::RequestPool pool{4, 4};
+    ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr));
+}
+
+
+class MockRequest : public Request {
+  public:
+    MockRequest() : Request(asapo::GenerateDefaultIO(), asapo::GenericNetworkRequestHeader{}, nullptr, nullptr) {};
+    Error Send(asapo::SocketDescriptor* sd, const asapo::ReceiversList& receivers_list) override {
+        return Error {Send_t(sd, receivers_list)};
+    }
+
+    MOCK_METHOD2(Send_t, asapo::SimpleError*(asapo::SocketDescriptor*, const asapo::ReceiversList&));
+};
+
+class RequestPoolTests : public testing::Test {
+  public:
+    testing::NiceMock<asapo::MockLogger> mock_logger;
+    const uint8_t nthreads = 4;
+    const uint64_t max_size = 1024 * 1024 * 1024;
+    asapo::RequestPool pool {nthreads, max_size};
+    std::unique_ptr<Request> request;
+    MockRequest* mock_request = new MockRequest;
+    void SetUp() override {
+        pool.log__ = &mock_logger;
+        request.reset(mock_request);
+    }
+    void TearDown() override {
+    }
+};
+
+
+
+TEST(RequestPool, AddRequestFailsDueToSize) {
+    RequestPool pool{4, 0};
+
+    auto  io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()};
+    asapo::GenericNetworkRequestHeader header;
+    std::unique_ptr<Request> request{new Request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}}};
+    auto err = pool.AddRequest(std::move(request));
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull));
+
+}
+
+TEST_F(RequestPoolTests, AddRequestCallsSend) {
+    EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("test"))).
+        WillOnce(
+            Return(nullptr)
+        );
+
+
+    auto err = pool.AddRequest(std::move(request));
+    std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+    ASSERT_THAT(err, Eq(nullptr));
+}
+
+TEST_F(RequestPoolTests, AddRequestCallsSendTwice) {
+    asapo::SimpleError* send_error = new asapo::SimpleError("www");
+    EXPECT_CALL(*mock_request, Send_t(testing::Pointee(asapo::kDisconnectedSocketDescriptor), testing::ElementsAre("test")))
+        .Times(2)
+        .WillOnce(Return(send_error))
+        .WillOnce(Return(nullptr));
+
+    auto err = pool.AddRequest(std::move(request));
+    std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+    ASSERT_THAT(err, Eq(nullptr));
+
+
+}
+
+
+TEST_F(RequestPoolTests, FinishProcessingThreads) {
+    EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(nthreads);
+}
+
+
+/*
+
+void RequestTests::ExpectFailSendHeader(bool only_once) {
+    int i = 0;
+    for (auto expected_sd : expected_sds) {
+        EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id,
+                                    expected_file_size),
+                                    sizeof(asapo::GenericNetworkRequestHeader), _))
+        .WillOnce(
+            DoAll(
+                testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()),
+                Return(-1)
+            ));
+        EXPECT_CALL(mock_logger, Debug(AllOf(
+            HasSubstr("cannot send header"),
+            HasSubstr(receivers_list[i])
+                                       )
+        ));
+        EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
+        if (only_once) break;
+        i++;
+    }
+
+}
+
+void RequestTests::ExpectFailSendData(bool only_once) {
+    int i = 0;
+    for (auto expected_sd : expected_sds) {
+        EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _))
+        .Times(1)
+        .WillOnce(
+            DoAll(
+                testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()),
+                Return(-1)
+            ));
+        EXPECT_CALL(mock_logger, Debug(AllOf(
+            HasSubstr("cannot send data"),
+            HasSubstr(receivers_list[i])
+                                       )
+        ));
+        EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
+        if (only_once) break;
+        i++;
+    }
+
+}
+
+
+void RequestTests::ExpectFailReceive(bool only_once) {
+    int i = 0;
+    for (auto expected_sd : expected_sds) {
+        EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _))
+        .Times(1)
+        .WillOnce(
+            DoAll(
+                testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()),
+                testing::Return(-1)
+            ));
+        EXPECT_CALL(mock_logger, Debug(AllOf(
+            HasSubstr("cannot receive"),
+            HasSubstr(receivers_list[i])
+                                      )
+        ));
+        EXPECT_CALL(mock_io, CloseSocket_t(expected_sd, _));
+        if (only_once) break;
+        i++;
+    }
+
+}
+
+
+void RequestTests::ExpectOKSendData(bool only_once) {
+    for (auto expected_sd : expected_sds) {
+        EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _))
+        .Times(1)
+        .WillOnce(
+            DoAll(
+                testing::SetArgPointee<3>(nullptr),
+                Return(expected_file_size)
+            ));
+        if (only_once) break;
+    }
+
+}
+
+
+
+void RequestTests::ExpectOKSendHeader(bool only_once) {
+    for (auto expected_sd : expected_sds) {
+        EXPECT_CALL(mock_io, Send_t(expected_sd, M_CheckSendDataRequest(expected_file_id,
+                                    expected_file_size),
+                                    sizeof(asapo::GenericNetworkRequestHeader), _))
+        .WillOnce(
+            DoAll(
+                testing::SetArgPointee<3>(nullptr),
+                Return(sizeof(asapo::GenericNetworkRequestHeader))
+            ));
+        if (only_once) break;
+    }
+
+}
+
+
+void RequestTests::ExpectOKConnect(bool only_once) {
+    int i = 0;
+    for (auto expected_address : receivers_list) {
+        EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _))
+        .WillOnce(
+            DoAll(
+                testing::SetArgPointee<1>(nullptr),
+                Return(expected_sds[i])
+            ));
+        EXPECT_CALL(mock_logger, Info(AllOf(
+            HasSubstr("connected"),
+            HasSubstr(expected_address)
+          )
+        ));
+        if (only_once) break;
+        i++;
+    }
+}
+
+
+void RequestTests::ExpectOKReceive(bool only_once) {
+    int i = 0;
+    for (auto expected_sd : expected_sds) {
+        EXPECT_CALL(mock_io, Receive_t(expected_sd, _, sizeof(asapo::SendDataResponse), _))
+        .WillOnce(
+            DoAll(
+                testing::SetArgPointee<3>(nullptr),
+                A_WriteSendDataResponse(asapo::kNetErrorNoError),
+                testing::ReturnArg<2>()
+            ));
+        EXPECT_CALL(mock_logger, Debug(AllOf(
+            HasSubstr("sent data"),
+            HasSubstr(receivers_list[i])
+                                       )
+        ));
+        if (only_once) break;
+        i++;
+    }
+}
+
+
+TEST_F(RequestTests, TriesConnectWhenNotConnected) {
+    ExpectFailConnect();
+
+    auto err = request.Send(&sd, receivers_list);
+
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers));
+}
+
+TEST_F(RequestTests, DoesNotTryConnectWhenConnected) {
+    sd = expected_sds[0];
+    EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(_, _))
+    .Times(0);
+    ExpectFailSendHeader(true);
+
+
+    auto err = request.Send(&sd, asapo::ReceiversList{expected_address1});
+
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers));
+}
+
+
+
+TEST_F(RequestTests, ErrorWhenCannotSendHeader) {
+    ExpectOKConnect();
+    ExpectFailSendHeader();
+
+    auto err = request.Send(&sd, receivers_list);
+
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers));
+}
+
+TEST_F(RequestTests, ErrorWhenCannotSendData) {
+    ExpectOKConnect();
+    ExpectOKSendHeader();
+    ExpectFailSendData();
+
+    auto err = request.Send(&sd, receivers_list);
+
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers));
+}
+
+TEST_F(RequestTests, ErrorWhenCannotReceiveData) {
+    ExpectOKConnect();
+    ExpectOKSendHeader();
+    ExpectOKSendData();
+
+    ExpectFailReceive();
+
+    auto err = request.Send(&sd, receivers_list);
+
+    ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers));
+}
+
+
+
+
+TEST_F(RequestTests, ImmediatelyCalBackErrorIfFileAlreadyInUse) {
+    ExpectOKConnect(true);
+    ExpectOKSendHeader(true);
+    ExpectOKSendData(true);
+
+    EXPECT_CALL(mock_io, Receive_t(expected_sds[0], _, sizeof(asapo::SendDataResponse), _))
+    .WillOnce(
+        DoAll(
+            testing::SetArgPointee<3>(nullptr),
+            A_WriteSendDataResponse(asapo::kNetErrorFileIdAlreadyInUse),
+            testing::ReturnArg<2>()
+        ));
+
+
+    auto err = request.Send(&sd, receivers_list);
+
+    ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kFileIdAlreadyInUse));
+    ASSERT_THAT(called, Eq(true));
+    ASSERT_THAT(err, Eq(nullptr));
+}
+
+
+TEST_F(RequestTests, SendOK) {
+    ExpectOKConnect(true);
+    ExpectOKSendHeader(true);
+    ExpectOKSendData(true);
+    ExpectOKReceive();
+
+    auto err = request.Send(&sd, receivers_list);
+
+    ASSERT_THAT(err, Eq(nullptr));
+    ASSERT_THAT(sd, Eq(expected_sds[0]));
+    ASSERT_THAT(callback_err, Eq(nullptr));
+    ASSERT_THAT(called, Eq(true));
+    ASSERT_THAT(callback_header.data_size, Eq(header.data_size));
+    ASSERT_THAT(callback_header.op_code, Eq(header.op_code));
+    ASSERT_THAT(callback_header.data_id, Eq(header.data_id));
+
+}
+
+
+*/
+}
diff --git a/receiver/src/statistics.h b/receiver/src/statistics.h
index 1f7875446..35a3ef9b5 100644
--- a/receiver/src/statistics.h
+++ b/receiver/src/statistics.h
@@ -22,16 +22,19 @@ struct StatisticsToSend {
     uint64_t n_requests;
 };
 
+#ifdef UNIT_TESTS
+#define VIRTUAL virtual
+#endif
+
 class Statistics {
   public:
-// virtual needed for unittests, could be replaced with #define VIRTUAL ... in case of performance issues
-    virtual void SendIfNeeded() noexcept;
-    virtual void Send() noexcept;
+    VIRTUAL void SendIfNeeded() noexcept;
+    VIRTUAL void Send() noexcept;
     explicit Statistics(unsigned int write_interval = kDefaultStatisticWriteIntervalMs);
-    virtual void IncreaseRequestCounter() noexcept;
-    virtual void StartTimer(const StatisticEntity& entity) noexcept;
-    virtual void IncreaseRequestDataVolume(uint64_t transferred_data_volume) noexcept;
-    virtual void StopTimer() noexcept;
+    VIRTUAL void IncreaseRequestCounter() noexcept;
+    VIRTUAL void StartTimer(const StatisticEntity& entity) noexcept;
+    VIRTUAL void IncreaseRequestDataVolume(uint64_t transferred_data_volume) noexcept;
+    VIRTUAL void StopTimer() noexcept;
 
     void SetWriteInterval(uint64_t interval_ms);
     std::unique_ptr<StatisticsSender> statistics_sender__;
-- 
GitLab