From e5ce50dfcee082dc68d746d153446b9cb66c2881 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Wed, 16 May 2018 18:42:02 +0200 Subject: [PATCH] update dummy producer --- .../dummy_data_producer.cpp | 40 ++- producer/api/CMakeLists.txt | 2 +- producer/api/include/producer/producer.h | 20 +- .../api/include/producer/producer_error.h | 5 + producer/api/src/producer.cpp | 5 +- producer/api/src/producer_impl.cpp | 91 +---- producer/api/src/producer_impl.h | 34 +- .../api/src/receiver_discovery_service.cpp | 6 +- producer/api/src/receiver_discovery_service.h | 2 +- producer/api/src/request.cpp | 9 +- producer/api/src/request.h | 4 +- producer/api/src/request_pool.cpp | 4 +- producer/api/src/request_pool.h | 8 +- producer/api/unittests/test_producer.cpp | 13 +- producer/api/unittests/test_producer_impl.cpp | 316 +++--------------- ...pp => test_receiver_discovery_service.cpp} | 24 +- producer/api/unittests/test_request.cpp | 33 +- producer/api/unittests/test_request_pool.cpp | 10 +- receiver/src/receiver_error.h | 5 + 19 files changed, 195 insertions(+), 436 deletions(-) rename producer/api/unittests/{test_receivers_status.cpp => test_receiver_discovery_service.cpp} (86%) diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 38b1071ec..3350c099c 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -2,12 +2,17 @@ #include <chrono> #include <vector> #include <tuple> +#include <mutex> +#include <thread> #include "asapo_producer.h" + using std::chrono::high_resolution_clock; -typedef std::tuple<std::string, size_t, uint64_t, uint8_t> ArgumentTuple; +std::mutex mutex; + +typedef std::tuple<std::string, size_t, uint64_t, uint64_t> ArgumentTuple; ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { if (argc != 5) { std::cout << @@ -24,19 +29,25 @@ ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { } } +void work(asapo::GenericNetworkRequestHeader header, asapo::Error err) { + mutex.lock(); + if (err) { + std::cerr << "File was not successfully send: " << err << std::endl; + return; + } + std::cerr << "File was successfully send." << header.data_id << std::endl; + mutex.unlock(); +} + bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations) { auto buffer = std::unique_ptr<uint8_t>(new uint8_t[number_of_byte]); for(uint64_t i = 0; i < iterations; i++) { // std::cerr << "Send file " << i + 1 << "/" << iterations << std::endl; - - auto err = producer->Send(i + 1, buffer.get(), number_of_byte); - + auto err = producer->Send(i + 1, buffer.get(), number_of_byte, &work); if (err) { - std::cerr << "File was not successfully send: " << err << std::endl; + std::cerr << "Cannot send file: " << err << std::endl; return false; - } else { -// std::cerr << "File was successfully send." << std::endl; } } @@ -47,26 +58,25 @@ int main (int argc, char* argv[]) { std::string receiver_address; size_t number_of_kbytes; uint64_t iterations; - uint8_t nthreads; + uint64_t nthreads; std::tie(receiver_address, number_of_kbytes, iterations, nthreads) = ProcessCommandArguments(argc, argv); std::cout << "receiver_address: " << receiver_address << std::endl << "Package size: " << number_of_kbytes << "k" << std::endl << "iterations: " << iterations << std::endl + << "nthreads: " << nthreads << std::endl << std::endl; asapo::Error err; - auto producer = asapo::Producer::Create(nthreads, &err); + auto producer = asapo::Producer::Create(receiver_address, nthreads, &err); + producer->EnableLocalLog(true); + producer->SetLogLevel(asapo::LogLevel::Debug); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; return EXIT_FAILURE; } - err = producer->ConnectToReceiver(receiver_address); - if(err) { - std::cerr << "Failed to connect to receiver. ProducerError: " << err << std::endl; - return EXIT_FAILURE; - } - std::cout << "Successfully connected" << std::endl; high_resolution_clock::time_point t1 = high_resolution_clock::now(); if(!SendDummyData(producer.get(), number_of_kbytes * 1024, iterations)) { diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index 7ecf68637..fe58157d1 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -23,7 +23,7 @@ set(TEST_SOURCE_FILES unittests/test_producer.cpp unittests/test_request.cpp unittests/test_request_pool.cpp - unittests/test_receivers_status.cpp + unittests/test_receiver_discovery_service.cpp ) set(TEST_LIBRARIES "${TARGET_NAME}") diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index 0b1c6a209..c1fd27ce8 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -27,26 +27,10 @@ class Producer { /*! * @return A unique_ptr to a new producer instance */ - static std::unique_ptr<Producer> Create(uint8_t n_processing_threads, Error* err); + static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, Error* err); virtual ~Producer() = default; - /*! - * @return The version of the producer - */ - virtual uint64_t GetVersion() const = 0; - - /*! - * @return The current status of the producer - */ - virtual ProducerStatus GetStatus() const = 0; - - //! Connects to a receiver - /*! - \param receiver_address - The address of the receiver. E.g. 127.0.0.1:4200 - \return Error - nullptr on success - */ - virtual Error ConnectToReceiver(const std::string& receiver_address) = 0; //! Sends data to the receiver /*! \param file_id - The id of the file. An error will be returned if this file id already exists on the receiver. @@ -54,7 +38,7 @@ class Producer { \param file_size - The size of the data. \return Error - Will be nullptr on success */ - virtual Error Send(uint64_t file_id, const void* data, size_t file_size) = 0; + virtual Error Send(uint64_t file_id, const void* data, size_t file_size, RequestCallback callback) = 0; //! Set internal log level virtual void SetLogLevel(LogLevel level) = 0; //! Enables/Disables logs output to stdout diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h index 13d43f255..a7d93435c 100644 --- a/producer/api/include/producer/producer_error.h +++ b/producer/api/include/producer/producer_error.h @@ -53,6 +53,11 @@ class ProducerErrorTemplate : public SimpleErrorTemplate { } }; +static inline std::ostream& operator<<(std::ostream& os, const ProducerErrorTemplate& err) { + return os << err.Text(); +} + + namespace ProducerErrorTemplates { auto const kAlreadyConnected = ProducerErrorTemplate { "Already connected", ProducerErrorType::kAlreadyConnected diff --git a/producer/api/src/producer.cpp b/producer/api/src/producer.cpp index 125c6cb60..2736578e1 100644 --- a/producer/api/src/producer.cpp +++ b/producer/api/src/producer.cpp @@ -1,7 +1,8 @@ #include "producer/producer.h" #include "producer_impl.h" -std::unique_ptr<asapo::Producer> asapo::Producer::Create(uint8_t n_processing_threads, Error* err) { +std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads, + Error* err) { if (n_processing_threads > kMaxProcessingThreads) { *err = TextError("Too many processing threads: " + std::to_string(n_processing_threads)); return nullptr; @@ -9,7 +10,7 @@ std::unique_ptr<asapo::Producer> asapo::Producer::Create(uint8_t n_processing_th try { *err = nullptr; - return std::unique_ptr<asapo::Producer>(new ProducerImpl(n_processing_threads)); + return std::unique_ptr<asapo::Producer>(new ProducerImpl(endpoint, n_processing_threads)); } catch (const std::exception& ex) { *err = TextError(ex.what()); return nullptr; diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 7e03097c7..454394681 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -7,47 +7,14 @@ namespace asapo { -const uint32_t ProducerImpl::kVersion = 1; const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t(1024) * size_t(2); //2GiByte +const size_t ProducerImpl::kMaxPoolVolume = size_t(1024) * size_t(1024) * size_t(1024) * size_t(2); //2GiByte +const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s -ProducerImpl::ProducerImpl(uint8_t n_processing_threads): io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()} { -} - -uint64_t ProducerImpl::GetVersion() const { - return kVersion; -} - -ProducerStatus ProducerImpl::GetStatus() const { - return status_; -} - -Error ProducerImpl::InitializeSocketToReceiver(const std::string& receiver_address) { - Error err; - FileDescriptor fd = io__->CreateAndConnectIPTCPSocket(receiver_address, &err); - if(err != nullptr) { - log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain()); - return err; - } - - receiver_uri_ = receiver_address; - client_fd_ = fd; - return nullptr; -} - -Error ProducerImpl::ConnectToReceiver(const std::string& receiver_address) { - if(status_ != ProducerStatus::kDisconnected) { - return ProducerErrorTemplates::kAlreadyConnected.Generate(); - } - - auto error = InitializeSocketToReceiver(receiver_address); - if(error) { - status_ = ProducerStatus::kDisconnected; - return error; - } - status_ = ProducerStatus::kConnected; - log__->Info("connected to receiver at " + receiver_address); - return nullptr; +ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads): log__{GetDefaultProducerLogger()} { + discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs}); + request_pool__.reset(new RequestPool{n_processing_threads, ProducerImpl::kMaxPoolVolume, discovery_service_.get()}); } GenericNetworkRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, size_t file_size) { @@ -58,54 +25,24 @@ GenericNetworkRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_ return request; } - -Error ProducerImpl::ReceiveResponce() { - Error err; - SendDataResponse sendDataResponse; - io__->Receive(client_fd_, &sendDataResponse, sizeof(sendDataResponse), &err); - if(err != nullptr) { -// std::cerr << "ProducerImpl::Receive error: " << err << std::endl; - return err; +Error CheckProducerRequest(const GenericNetworkRequestHeader header) { + if (header.data_size > ProducerImpl::kMaxChunkSize) { + return ProducerErrorTemplates::kFileTooLarge.Generate(); } - if(sendDataResponse.error_code) { - if(sendDataResponse.error_code == kNetErrorFileIdAlreadyInUse) { - return ProducerErrorTemplates::kFileIdAlreadyInUse.Generate(); - } -// std::cerr << "Server reported an error. NetErrorCode: " << int(sendDataResponse.error_code) << std::endl; - return ProducerErrorTemplates::kUnknownServerError.Generate(); - } return nullptr; } -Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size) { - if(status_ != ProducerStatus::kConnected) { - return ProducerErrorTemplates::kConnectionNotReady.Generate(); - } - if(file_size > kMaxChunkSize) { - return ProducerErrorTemplates::kFileTooLarge.Generate(); - } +Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size, RequestCallback callback) { + auto request_header = GenerateNextSendRequest(file_id, file_size); - auto send_data_request = GenerateNextSendRequest(file_id, file_size); - - /*auto error = SendHeaderAndData(send_data_request, data, file_size); - if(error) { - log__->Debug("error sending to " + receiver_uri_ + " - " + error->Explain()); - return error; - } - - error = ReceiveResponce(); - if(error) { - log__->Debug("error receiving response from " + receiver_uri_ + " - " + error->Explain()); - return error; + auto err = CheckProducerRequest(request_header); + if (err) { + return err; } - */ - - log__->Debug("succesfully sent data to " + receiver_uri_); - - return nullptr; + return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{request_header, data, callback}}); } void ProducerImpl::SetLogLevel(LogLevel level) { diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index 31ad897aa..3062812b1 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -6,41 +6,35 @@ #include <io/io.h> #include "producer/producer.h" #include "logger/logger.h" +#include "request_pool.h" +#include "receiver_discovery_service.h" namespace asapo { class ProducerImpl : public Producer { - private: - static const uint32_t kVersion; - - int client_fd_ = -1; - std::string receiver_uri_; - uint64_t request_id_ = 0; - - ProducerStatus status_ = ProducerStatus::kDisconnected; - - Error InitializeSocketToReceiver(const std::string& receiver_address); - GenericNetworkRequestHeader GenerateNextSendRequest(uint64_t file_id, size_t file_size); - Error SendHeaderAndData(const GenericNetworkRequestHeader& header, const void* data, size_t file_size); - Error ReceiveResponce(); - + private: + // important to create it before request_pool__ + std::unique_ptr<ReceiverDiscoveryService> discovery_service_; public: static const size_t kMaxChunkSize; + static const size_t kMaxPoolVolume; + static const uint64_t kDiscoveryServiceUpdateFrequencyMs; - explicit ProducerImpl(uint8_t n_processing_threads); + explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads); ProducerImpl(const ProducerImpl&) = delete; ProducerImpl& operator=(const ProducerImpl&) = delete; - uint64_t GetVersion() const override; - ProducerStatus GetStatus() const override; void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error ConnectToReceiver(const std::string& receiver_address) override; - Error Send(uint64_t file_id, const void* data, size_t file_size) override; - std::unique_ptr<IO> io__; + Error Send(uint64_t file_id, const void* data, size_t file_size, RequestCallback callback) override; AbstractLogger* log__; + std::unique_ptr<RequestPool> request_pool__; + private: + GenericNetworkRequestHeader GenerateNextSendRequest(uint64_t file_id, size_t file_size); }; + +Error CheckProducerRequest(const GenericNetworkRequestHeader header); } #endif //ASAPO_PRODUCER__PRODUCER_IMPL_H diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp index 55d2b4715..96ca46495 100644 --- a/producer/api/src/receiver_discovery_service.cpp +++ b/producer/api/src/receiver_discovery_service.cpp @@ -10,7 +10,7 @@ namespace asapo { ReceiverDiscoveryService::ReceiverDiscoveryService(std::string endpoint, uint64_t update_frequency_ms): httpclient__{DefaultHttpClient()}, log__{GetDefaultProducerLogger()}, - endpoint_{std::move(endpoint)}, update_frequency_ms_{update_frequency_ms} { + endpoint_{std::move(endpoint) + "/receivers"}, update_frequency_ms_{update_frequency_ms} { } @@ -34,11 +34,11 @@ Error ReceiverDiscoveryService::ParseResponse(const std::string& responce, Recei Error ReceiverDiscoveryService::UpdateFromEndpoint(ReceiversList* list, uint64_t* max_connections) { Error err; HttpCode code; + auto responce = httpclient__->Get(endpoint_, &code, &err); if (err != nullptr) { return err; } - if (code != HttpCode::OK) { return TextError(responce); } @@ -56,13 +56,13 @@ void ReceiverDiscoveryService::ThreadHandler() { auto err = UpdateFromEndpoint(&uris, &max_connections); if (err != nullptr) { log__->Error("getting receivers from " + endpoint_ + " - " + err->Explain()); + lock.lock(); continue; } lock.lock(); max_connections_ = max_connections; uri_list_ = uris; } while (!condition_.wait_for(lock, std::chrono::milliseconds(update_frequency_ms_), [this] {return (quit_);})) ; - } ReceiverDiscoveryService::~ReceiverDiscoveryService() { diff --git a/producer/api/src/receiver_discovery_service.h b/producer/api/src/receiver_discovery_service.h index c4a5dba93..255088bb3 100644 --- a/producer/api/src/receiver_discovery_service.h +++ b/producer/api/src/receiver_discovery_service.h @@ -26,7 +26,7 @@ class ReceiverDiscoveryService { ~ReceiverDiscoveryService(); VIRTUAL uint64_t MaxConnections(); VIRTUAL ReceiversList RotatedUriList(uint64_t nthread); - uint64_t UpdateFrequency(); + VIRTUAL uint64_t UpdateFrequency(); public: std::unique_ptr<HttpClient> httpclient__; AbstractLogger* log__; diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp index ae64a462f..b1203e74b 100644 --- a/producer/api/src/request.cpp +++ b/producer/api/src/request.cpp @@ -1,12 +1,13 @@ #include "producer/producer_error.h" #include "request.h" #include "producer_logger.h" +#include "io/io_factory.h" namespace asapo { -Request::Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data, +Request::Request(const GenericNetworkRequestHeader& header, const void* data, RequestCallback callback): - io__{io}, log__{GetDefaultProducerLogger()}, header_(header), data_{data}, callback_{std::move(callback)} { + io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()}, header_(header), data_{data}, callback_{std::move(callback)} { } @@ -94,7 +95,9 @@ Error Request::Send(SocketDescriptor* sd, const ReceiversList& receivers_list, b continue; } - callback_(header_, std::move(err)); + if (callback_) { + callback_(header_, std::move(err)); + } return nullptr; } diff --git a/producer/api/src/request.h b/producer/api/src/request.h index d3c8c5d79..cab866026 100644 --- a/producer/api/src/request.h +++ b/producer/api/src/request.h @@ -17,11 +17,11 @@ namespace asapo { class Request { public: - explicit Request(const asapo::IO* io, const GenericNetworkRequestHeader& header, const void* data, + explicit Request(const GenericNetworkRequestHeader& header, const void* data, RequestCallback callback); VIRTUAL Error Send(SocketDescriptor* sd, const ReceiversList& receivers_list, bool rebalance); VIRTUAL ~Request() = default; - const IO* io__; + std::unique_ptr<IO> io__; const AbstractLogger* log__; uint64_t GetMemoryRequitements(); private: diff --git a/producer/api/src/request_pool.cpp b/producer/api/src/request_pool.cpp index 92e1dfae6..b95d2ee86 100644 --- a/producer/api/src/request_pool.cpp +++ b/producer/api/src/request_pool.cpp @@ -8,7 +8,6 @@ RequestPool:: RequestPool(uint8_t n_threads, uint64_t max_pool_volume, ReceiverDiscoveryService* discovery_service): log__{GetDefaultProducerLogger()}, discovery_service__{discovery_service}, threads_{n_threads}, max_pool_volume_{max_pool_volume} { - discovery_service->StartCollectingData(); for(size_t i = 0; i < threads_.size(); i++) { log__->Debug("starting thread " + std::to_string(i)); @@ -59,6 +58,9 @@ void RequestPool::UpdateIfNewConnection(ThreadInformation* thread_info) { } bool RequestPool::CheckForRebalance(ThreadInformation* thread_info) { + if (thread_info->thread_sd == kDisconnectedSocketDescriptor) + return false; + auto now = high_resolution_clock::now(); uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( now - thread_info->last_rebalance).count(); diff --git a/producer/api/src/request_pool.h b/producer/api/src/request_pool.h index 69c27492a..7881ec6bb 100644 --- a/producer/api/src/request_pool.h +++ b/producer/api/src/request_pool.h @@ -16,6 +16,12 @@ using std::chrono::high_resolution_clock; +#ifdef UNIT_TESTS +#define VIRTUAL virtual +#endif + + + namespace asapo { class RequestPool { @@ -28,7 +34,7 @@ class RequestPool { }; public: explicit RequestPool(uint8_t n_threads, uint64_t max_pool_volume, ReceiverDiscoveryService* discovery_service); - Error AddRequest(std::unique_ptr<Request> request); + VIRTUAL Error AddRequest(std::unique_ptr<Request> request); ~RequestPool(); AbstractLogger* log__; uint64_t NRequestsInQueue(); diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp index 4d3b36f76..fd3b4eea7 100644 --- a/producer/api/unittests/test_producer.cpp +++ b/producer/api/unittests/test_producer.cpp @@ -10,7 +10,7 @@ namespace { TEST(CreateProducer, PointerIsNotNullptr) { asapo::Error err; - std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(4, &err); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, &err); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); @@ -18,11 +18,20 @@ TEST(CreateProducer, PointerIsNotNullptr) { TEST(CreateProducer, TooManyThreads) { asapo::Error err; - std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create(asapo::kMaxProcessingThreads + 1, &err); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Ne(nullptr)); } +TEST(Producer, SimpleWorkflowWihoutConnection) { + asapo::Error err; + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, &err); + auto err_send = producer->Send(1, nullptr, 1, nullptr); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ASSERT_THAT(producer, Ne(nullptr)); + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(err_send, Eq(nullptr)); +} diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index bcf40f051..131638f4b 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -1,12 +1,12 @@ #include <gtest/gtest.h> #include <gmock/gmock.h> -#include "unittests/MockIO.h" #include "unittests/MockLogger.h" #include "common/error.h" -#include "io/io.h" #include "producer/producer.h" #include "../src/producer_impl.h" +#include "../src/request_pool.h" +#include "../src/request.h" namespace { @@ -21,299 +21,73 @@ using ::testing::Mock; using ::testing::InSequence; using ::testing::HasSubstr; -TEST(get_version, VersionAboveZero) { - asapo::ProducerImpl producer{4}; - EXPECT_GE(producer.GetVersion(), 0); -} + +using asapo::RequestPool; +using asapo::Request; -TEST(Producer, Logger) { - asapo::ProducerImpl producer{4}; +TEST(ProducerImpl, Constructor) { + asapo::ProducerImpl producer{"", 4}; ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(producer.request_pool__.get()), Ne(nullptr)); } -/* -class ProducerImpl : public testing::Test { - public: - asapo::ProducerImpl producer; - testing::NiceMock<asapo::MockIO> mock_io; - testing::NiceMock<asapo::MockLogger> mock_logger; +asapo::ReceiverDiscoveryService discovery{"", 1}; - asapo::FileDescriptor expected_fd = 83942; - uint64_t expected_file_id = 4224; - std::string expected_address = "127.0.0.1:9090"; - uint64_t expected_request_id = 0; - uint64_t expected_file_size = 1337; - void* expected_file_pointer = (void*)0xC00FE; +class MockRequestPull : public RequestPool { + public: + MockRequestPull() : RequestPool{1, 1, &discovery} {}; + asapo::Error AddRequest(std::unique_ptr<asapo::Request> request) override { + if (request == nullptr) { + return asapo::Error{AddRequest_t(nullptr)}; + } + return asapo::Error{AddRequest_t(request.get())}; + } + MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (Request*)); +}; +class ProducerImplTests : public testing::Test { + public: + testing::NiceMock<asapo::MockLogger> mock_logger; + testing::NiceMock<MockRequestPull> mock_pull; + asapo::ProducerImpl producer{"", 1}; void SetUp() override { - producer.io__ = std::unique_ptr<asapo::IO> {&mock_io}; - producer.log__ = asapo::Logger {&mock_logger}; + producer.log__ = &mock_logger; + producer.request_pool__ = std::unique_ptr<RequestPool> {&mock_pull}; } void TearDown() override { - producer.io__.release(); - producer.log__.release(); - } - - void ConnectToReceiver_DONE(asapo::FileDescriptor expected_fd = 1) { - EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<1>(nullptr), - Return(expected_fd) - )); - producer.ConnectToReceiver(expected_address); - } - void Send_DONE(int times = 1) { - EXPECT_CALL(mock_io, Send_t(_, _, _, _)) - .Times(times) - .WillRepeatedly( - DoAll( - testing::SetArgPointee<3>(nullptr), - testing::ReturnArg<2>() - )); + producer.request_pool__.release(); } }; -TEST_F(ProducerImpl, get_status__disconnected) { - asapo::ProducerStatus status = producer.GetStatus(); - ASSERT_THAT(status, Eq(asapo::ProducerStatus::kDisconnected)); -} - - -TEST_F(ProducerImpl, ConnectToReceiver__CreateAndConnectIPTCPSocket_error) { - EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<1>(asapo::IOErrorTemplates::kInvalidAddressFormat.Generate().release()), - Return(-1) - )); - - EXPECT_CALL(mock_logger, Debug(HasSubstr("cannot connect"))); - - auto error = producer.ConnectToReceiver(expected_address); - auto status = producer.GetStatus(); - - ASSERT_THAT(error, Eq(asapo::IOErrorTemplates::kInvalidAddressFormat)); - ASSERT_THAT(status, Eq(asapo::ProducerStatus::kDisconnected)); -} - -TEST_F(ProducerImpl, ConnectToReceiver) { - EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(expected_address, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<1>(nullptr), - Return(expected_fd) - )); - - EXPECT_CALL(mock_logger, Info(HasSubstr("connected"))); - - - auto error = producer.ConnectToReceiver(expected_address); - auto status = producer.GetStatus(); - - ASSERT_THAT(error, Eq(nullptr)); - ASSERT_THAT(status, Eq(asapo::ProducerStatus::kConnected)); -} - -TEST_F(ProducerImpl, ConnectToReceiver__already_connected) { - InSequence sequence; - - ConnectToReceiver_DONE(); - - auto error = producer.ConnectToReceiver(expected_address); - - ASSERT_THAT(error, Eq(asapo::ProducerErrorTemplates::kAlreadyConnected)); -} - - - -MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, - "Checks if a valid GenericNetworkRequestHeader was Send") { - return ((asapo::GenericNetworkRequestHeader*)arg)->op_code == asapo::kNetOpcodeSendData - && ((asapo::GenericNetworkRequestHeader*)arg)->data_id == file_id - && ((asapo::GenericNetworkRequestHeader*)arg)->data_size == file_size; -} - -ACTION_P2(A_WriteSendDataResponse, error_code, request_id) { - ((asapo::SendDataResponse*)arg1)->op_code = asapo::kNetOpcodeSendData; - ((asapo::SendDataResponse*)arg1)->error_code = error_code; - ((asapo::SendDataResponse*)arg1)->request_id = request_id; -} - -TEST_F(ProducerImpl, Send__connection_not_ready) { - - auto error = producer.Send(expected_file_id, nullptr, 1); - - ASSERT_THAT(error, Eq(asapo::ProducerErrorTemplates::kConnectionNotReady)); -} - -TEST_F(ProducerImpl, Send__file_too_large) { - - ConnectToReceiver_DONE(expected_fd); - - auto error = producer.Send(expected_file_id, nullptr, - size_t(1024) * size_t(1024) * size_t(1024) * size_t(3)); - - ASSERT_THAT(error, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); -} -/* -TEST_F(ProducerImpl, Send__sendDataRequest_error) { - InSequence sequence; - - ConnectToReceiver_DONE(expected_fd); - - EXPECT_CALL(mock_io, Send_t(expected_fd, M_CheckSendDataRequest(expected_file_id, - expected_file_size), - sizeof(asapo::GenericNetworkRequestHeader), _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - Return(-1) - )); - - auto error = producer.Send(expected_file_id, nullptr, expected_file_size); - - ASSERT_THAT(error, Eq(asapo::IOErrorTemplates::kBadFileNumber)); -} - -TEST_F(ProducerImpl, Send__sendData_error) { - InSequence sequence; - - ConnectToReceiver_DONE(expected_fd); - Send_DONE(); - - EXPECT_CALL(mock_io, Send_t(expected_fd, expected_file_pointer, expected_file_size, _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - Return(-1) - )); - - EXPECT_CALL(mock_logger, Debug(HasSubstr("error sending to " + expected_address))); - - auto error = producer.Send(expected_file_id, expected_file_pointer, expected_file_size); - - ASSERT_THAT(error, Eq(asapo::IOErrorTemplates::kBadFileNumber)); -} - - -TEST_F(ProducerImpl, Send__Receive_error) { - InSequence sequence; - - ConnectToReceiver_DONE(expected_fd); - Send_DONE(2); - - EXPECT_CALL(mock_io, Receive_t(expected_fd, _, sizeof(asapo::SendDataResponse), _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(asapo::IOErrorTemplates::kBadFileNumber.Generate().release()), - testing::Return(-1) - )); - - EXPECT_CALL(mock_logger, Debug(HasSubstr("error receiving response from " + expected_address))); - - auto error = producer.Send(expected_file_id, expected_file_pointer, expected_file_size); - - ASSERT_THAT(error, Eq(asapo::IOErrorTemplates::kBadFileNumber)); -} - -TEST_F(ProducerImpl, Send__Receive_server_error) { - InSequence sequence; - - ConnectToReceiver_DONE(expected_fd); - Send_DONE(2); - - - EXPECT_CALL(mock_io, Receive_t(_, _, sizeof(asapo::SendDataResponse), _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendDataResponse(asapo::kNetErrorAllocateStorageFailed, expected_request_id), - testing::ReturnArg<2>() - )); - - auto error = producer.Send(expected_file_id, expected_file_pointer, expected_file_size); - - ASSERT_THAT(error, Eq(asapo::ProducerErrorTemplates::kUnknownServerError)); +TEST_F(ProducerImplTests, SendReturnsError) { + EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( + asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); + auto err = producer.Send(1, nullptr, 1, nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } -TEST_F(ProducerImpl, Send__Receive_server_error_id_already_in_use) { - InSequence sequence; - - ConnectToReceiver_DONE(expected_fd); - Send_DONE(2); - - - EXPECT_CALL(mock_io, Receive_t(_, _, sizeof(asapo::SendDataResponse), _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendDataResponse(asapo::kNetErrorFileIdAlreadyInUse, expected_request_id), - testing::ReturnArg<2>() - )); - - auto error = producer.Send(expected_file_id, expected_file_pointer, expected_file_size); - - ASSERT_THAT(error, Eq(asapo::ProducerErrorTemplates::kFileIdAlreadyInUse)); +TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { + auto err = producer.Send(1, nullptr, asapo::ProducerImpl::kMaxChunkSize + 1, nullptr); + ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } -TEST_F(ProducerImpl, Send) { - InSequence sequence; - - ConnectToReceiver_DONE(expected_fd); - Send_DONE(2); - - - EXPECT_CALL(mock_io, Receive_t(_, _, sizeof(asapo::SendDataResponse), _)) - .Times(1) - .WillOnce( - DoAll( - testing::SetArgPointee<3>(nullptr), - A_WriteSendDataResponse(asapo::kNetErrorNoError, expected_request_id), - testing::ReturnArg<2>() - )); - - EXPECT_CALL(mock_logger, Debug(HasSubstr("succesfully sent data to " + expected_address))); - - auto error = producer.Send(expected_file_id, expected_file_pointer, expected_file_size); - ASSERT_THAT(error, Eq(nullptr)); +MATCHER_P(M_CheckSendDataRequest, request_size, + "Checks if a valid request was send (check size only)") { + return ((Request*)arg)->GetMemoryRequitements() == request_size; } -TEST_F(ProducerImpl, EnableLocalLog) { - - EXPECT_CALL(mock_logger, EnableLocalLog(true)); - - producer.EnableLocalLog(true); - -} - -TEST_F(ProducerImpl, EnableRemoteLog) { - - EXPECT_CALL(mock_logger, EnableRemoteLog(false)); - - producer.EnableRemoteLog(false); - -} - - -TEST_F(ProducerImpl, SetLogLevel) { - EXPECT_CALL(mock_logger, SetLogLevel(asapo::LogLevel::Warning)); +TEST_F(ProducerImplTests, OKSendingRequest) { + uint64_t expected_size = 100; + Request request{asapo::GenericNetworkRequestHeader{}, nullptr, nullptr}; + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(expected_size + sizeof(Request)))).WillOnce(Return(nullptr)); - producer.SetLogLevel(asapo::LogLevel::Warning); + auto err = producer.Send(1, nullptr, expected_size, nullptr); + ASSERT_THAT(err, Eq(nullptr)); } - */ } diff --git a/producer/api/unittests/test_receivers_status.cpp b/producer/api/unittests/test_receiver_discovery_service.cpp similarity index 86% rename from producer/api/unittests/test_receivers_status.cpp rename to producer/api/unittests/test_receiver_discovery_service.cpp index 9c90c7040..e83d1aa67 100644 --- a/producer/api/unittests/test_receivers_status.cpp +++ b/producer/api/unittests/test_receiver_discovery_service.cpp @@ -36,7 +36,7 @@ using asapo::ReceiverDiscoveryService; std::mutex mutex; TEST(ReceiversStatus, Constructor) { - ReceiverDiscoveryService status{"endpoint/receivers", 1000}; + ReceiverDiscoveryService status{"endpoint", 1000}; ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(status.log__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::HttpClient*>(status.httpclient__.get()), Ne(nullptr)); } @@ -49,7 +49,7 @@ class ReceiversStatusTests : public Test { NiceMock<MockHttpClient>* mock_http_client; std::string expected_endpoint{"endpoint/receivers"}; - ReceiverDiscoveryService status{expected_endpoint, 1000}; + ReceiverDiscoveryService status{"endpoint", 20}; void SetUp() override { mock_http_client = new NiceMock<MockHttpClient>; @@ -62,6 +62,7 @@ class ReceiversStatusTests : public Test { TEST_F(ReceiversStatusTests, LogWhenHttpError) { EXPECT_CALL(*mock_http_client, Get_t(expected_endpoint, _, _)) + .Times(1) .WillOnce( DoAll(SetArgPointee<2>(new asapo::IOError("Test Read Error", asapo::IOErrorType::kReadError)), Return("") @@ -69,19 +70,23 @@ TEST_F(ReceiversStatusTests, LogWhenHttpError) { EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("getting receivers"), HasSubstr(expected_endpoint)))); status.StartCollectingData(); + } TEST_F(ReceiversStatusTests, LogWhenWhenWrongHttpCode) { EXPECT_CALL(*mock_http_client, Get_t(expected_endpoint, _, _)) - .WillOnce( + .Times(testing::AnyNumber()) + .WillRepeatedly( DoAll(SetArgPointee<2>(nullptr), SetArgPointee<1>(asapo::HttpCode::BadRequest), Return("bad request") )); EXPECT_CALL(mock_logger, Error(AllOf(HasSubstr("getting receivers"), HasSubstr(expected_endpoint), - HasSubstr("bad request")))); + HasSubstr("bad request")))).Times(testing::AtLeast(1)); status.StartCollectingData(); + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + } TEST_F(ReceiversStatusTests, LogWhenWhenCannotReadResponce) { @@ -102,6 +107,7 @@ TEST_F(ReceiversStatusTests, GetsReqestedInformation) { std::string json = R"({"uri_list":["s1","s2","s3"], "max_connections":8})"; EXPECT_CALL(*mock_http_client, Get_t(expected_endpoint, _, _)) + .Times(testing::AtLeast(1)) .WillRepeatedly( DoAll(SetArgPointee<2>(nullptr), SetArgPointee<1>(asapo::HttpCode::OK), @@ -128,6 +134,15 @@ TEST_F(ReceiversStatusTests, GetsReqestedInformation) { } TEST_F(ReceiversStatusTests, JoinThreadAtTheEnd) { + std::string json = R"({"uri_list":["s1","s2","s3"], "max_connections":8})"; + EXPECT_CALL(*mock_http_client, Get_t(expected_endpoint, _, _)) + .Times(testing::AtLeast(1)) + .WillRepeatedly( + DoAll(SetArgPointee<2>(nullptr), + SetArgPointee<1>(asapo::HttpCode::OK), + Return(json) + )); + EXPECT_CALL(mock_logger, Debug(HasSubstr("starting receiver discovery"))); EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing"))); status.StartCollectingData(); @@ -144,5 +159,4 @@ TEST_F(ReceiversStatusTests, InitialUriList) { } - } diff --git a/producer/api/unittests/test_request.cpp b/producer/api/unittests/test_request.cpp index c1e85d854..9434ecaa3 100644 --- a/producer/api/unittests/test_request.cpp +++ b/producer/api/unittests/test_request.cpp @@ -31,16 +31,14 @@ using ::testing::HasSubstr; TEST(Request, Constructor) { - auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; asapo::GenericNetworkRequestHeader header; - asapo::Request request{io.get(), header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}}; + asapo::Request request{header, nullptr, [](asapo::GenericNetworkRequestHeader, asapo::Error) {}}; - ASSERT_THAT(dynamic_cast<const asapo::IO*>(request.io__), Ne(nullptr)); + ASSERT_THAT(dynamic_cast<const asapo::IO*>(request.io__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(request.log__), Ne(nullptr)); } - class RequestTests : public testing::Test { public: testing::NiceMock<asapo::MockIO> mock_io; @@ -53,11 +51,14 @@ class RequestTests : public testing::Test { asapo::GenericNetworkRequestHeader header{expected_op_code, expected_file_id, expected_file_size}; bool called = false; asapo::GenericNetworkRequestHeader callback_header; - asapo::Request request{&mock_io, header, expected_file_pointer, [this](asapo::GenericNetworkRequestHeader header, asapo::Error err) { + asapo::Request request{header, expected_file_pointer, [this](asapo::GenericNetworkRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; + + asapo::Request request_nocallback{header, expected_file_pointer, nullptr}; + testing::NiceMock<asapo::MockLogger> mock_logger; asapo::SocketDescriptor sd = asapo::kDisconnectedSocketDescriptor; @@ -77,8 +78,13 @@ class RequestTests : public testing::Test { void SetUp() override { request.log__ = &mock_logger; + request.io__.reset(&mock_io); + request_nocallback.log__ = &mock_logger; + request_nocallback.io__.reset(&mock_io); } void TearDown() override { + request.io__.release(); + request_nocallback.io__.release(); } }; @@ -87,8 +93,6 @@ ACTION_P(A_WriteSendDataResponse, error_code) { ((asapo::SendDataResponse*)arg1)->error_code = error_code; } - - MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, "Checks if a valid GenericNetworkRequestHeader was Send") { return ((asapo::GenericNetworkRequestHeader*)arg)->op_code == asapo::kNetOpcodeSendData @@ -109,7 +113,6 @@ void RequestTests::ExpectFailConnect(bool only_once) { } - void RequestTests::ExpectFailSendHeader(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { @@ -301,8 +304,6 @@ TEST_F(RequestTests, DoNotCloseWhenRebalanceIfNotConnected) { ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kCannotSendDataToReceivers)); } - - TEST_F(RequestTests, ReconnectWhenRebalance) { sd = 1000; EXPECT_CALL(mock_io, CloseSocket_t(1000, _)); @@ -375,6 +376,18 @@ TEST_F(RequestTests, ImmediatelyCallBackErrorIfFileAlreadyInUse) { } +TEST_F(RequestTests, SendEmptyCallBack) { + ExpectOKConnect(true); + ExpectOKSendHeader(true); + ExpectOKSendData(true); + ExpectOKReceive(); + + auto err = request_nocallback.Send(&sd, receivers_list, false); + + ASSERT_THAT(err, Eq(nullptr)); + ASSERT_THAT(called, Eq(false)); +} + TEST_F(RequestTests, SendOK) { ExpectOKConnect(true); ExpectOKSendHeader(true); diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp index e3687522f..6ab557e73 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/producer/api/unittests/test_request_pool.cpp @@ -41,13 +41,15 @@ class MockDiscoveryService : public asapo::ReceiverDiscoveryService { MOCK_METHOD0(StartCollectingData, void()); MOCK_METHOD0(MaxConnections, uint64_t()); MOCK_METHOD1(RotatedUriList, ReceiversList(uint64_t)); + uint64_t UpdateFrequency() override { + return 0; + } }; -std::unique_ptr<asapo::IO> io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; class MockRequest : public Request { public: - MockRequest() : Request(io.get(), asapo::GenericNetworkRequestHeader{}, nullptr, nullptr) {}; + MockRequest() : Request(asapo::GenericNetworkRequestHeader{}, nullptr, nullptr) {}; Error Send(asapo::SocketDescriptor* sd, const ReceiversList& receivers_list, bool rebalance) override { return Error {Send_t(sd, receivers_list, rebalance)}; } @@ -99,7 +101,7 @@ TEST(RequestPool, Constructor) { TEST(RequestPool, AddRequestFailsDueToSize) { asapo::ReceiverDiscoveryService discovery{expected_endpoint, 1000}; RequestPool pool{4, 0, &discovery}; - std::unique_ptr<Request> request{new Request{io.get(), GenericNetworkRequestHeader{}, nullptr, nullptr}}; + std::unique_ptr<Request> request{new Request{GenericNetworkRequestHeader{}, nullptr, nullptr}}; auto err = pool.AddRequest(std::move(request)); @@ -151,7 +153,7 @@ TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { request.reset(mock_request2); auto err2 = pool.AddRequest(std::move(request)); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(30)); ASSERT_THAT(err1, Eq(nullptr)); ASSERT_THAT(err2, Eq(nullptr)); } diff --git a/receiver/src/receiver_error.h b/receiver/src/receiver_error.h index a83f2bf09..6af4287eb 100644 --- a/receiver/src/receiver_error.h +++ b/receiver/src/receiver_error.h @@ -48,6 +48,11 @@ class ReceiverErrorTemplate : public SimpleErrorTemplate { } }; +static inline std::ostream& operator<<(std::ostream& os, const ReceiverErrorTemplate& err) { + return os << err.Text(); +} + + namespace ReceiverErrorTemplates { auto const kInvalidOpCode = ReceiverErrorTemplate { "Invalid Opcode", ReceiverErrorType::kInvalidOpCode -- GitLab