From 0cdc350b2aeeee20dde252ddb603cb5def2f4026 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Tue, 22 May 2018 18:50:51 +0200 Subject: [PATCH] fixed some tests --- common/cpp/include/common/networking.h | 9 +-- common/cpp/include/unittests/MockIO.h | 2 +- common/cpp/src/system_io/system_io.cpp | 2 +- common/cpp/src/system_io/system_io.h | 2 +- .../dummy_data_producer.cpp | 18 ++--- producer/api/include/producer/common.h | 4 +- producer/api/include/producer/producer.h | 6 +- producer/api/src/producer.cpp | 4 +- producer/api/src/producer_impl.cpp | 25 +++---- producer/api/src/producer_impl.h | 7 +- producer/api/src/producer_logger.cpp | 2 +- .../api/src/receiver_discovery_service.cpp | 4 +- producer/api/src/request.h | 6 +- producer/api/src/request_handler.h | 12 ++-- producer/api/src/request_handler_factory.cpp | 6 +- producer/api/src/request_handler_factory.h | 2 +- .../api/src/request_handler_filesystem.cpp | 10 +-- producer/api/src/request_handler_filesystem.h | 4 +- producer/api/unittests/mocking.h | 67 +++++++++---------- producer/api/unittests/test_producer.cpp | 12 ++-- producer/api/unittests/test_producer_impl.cpp | 24 +++---- .../test_receiver_discovery_service.cpp | 2 +- .../test_request_handler_filesystem.cpp | 32 ++++----- .../unittests/test_request_handler_tcp.cpp | 22 +++--- .../check_monitoring/CMakeLists.txt | 3 +- .../check_monitoring/check_linux.sh | 7 +- .../settings/discovery_settings.json | 2 +- 27 files changed, 158 insertions(+), 138 deletions(-) diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index 708743860..7f59eb7cf 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -4,6 +4,7 @@ #include <cstdint> #include <algorithm> #include <string> +#include <cstring> namespace asapo { @@ -32,10 +33,10 @@ enum NetworkErrorCode : uint16_t { const std::size_t kMaxFileNameSize = 1024; struct GenericRequestHeader { - GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp,uint64_t i_data_id = 0, - uint64_t i_data_size = 0,const std::string& i_file_name = ""): - op_code{i_op_code},data_id{i_data_id},data_size{i_data_size} { - auto size = std::min(i_file_name.size()+1,kMaxFileNameSize); + GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp, uint64_t i_data_id = 0, + uint64_t i_data_size = 0, const std::string& i_file_name = ""): + op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size} { + auto size = std::min(i_file_name.size() + 1, kMaxFileNameSize); memcpy(file_name, i_file_name.c_str(), size); } Opcode op_code; diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index 4260a5918..5287004ba 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -183,7 +183,7 @@ class MockIO : public IO { } - MOCK_CONST_METHOD3(WriteDataToFile_t, ErrorInterface * (const std::string& fname, const uint8_t* data, size_t fsize)); + MOCK_CONST_METHOD3(WriteDataToFile_t, ErrorInterface * (const std::string& fname, const uint8_t* data, size_t fsize)); void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, Error* err) const override { diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index 6841f7d9d..f8dc19c31 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -140,7 +140,7 @@ Error SystemIO::WriteDataToFile(const std::string& fname, const uint8_t* data, s } Error SystemIO::WriteDataToFile(const std::string& fname, const FileData& data, size_t length) const { - return WriteDataToFile(fname,data.get(),length); + return WriteDataToFile(fname, data.get(), length); } diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index 424ca8cd5..9b7c1feb7 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -103,7 +103,7 @@ class SystemIO final : public IO { FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const; Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const; Error WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const; - void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, + void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, Error* err) const; std::string ReadFileToString(const std::string& fname, Error* err) const; }; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index f5c9a6c93..f41aa7ef5 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -13,7 +13,7 @@ using std::chrono::high_resolution_clock; std::mutex mutex; int nfiles; -typedef std::tuple<std::string, size_t, uint64_t, uint64_t,uint64_t> ArgumentTuple; +typedef std::tuple<std::string, size_t, uint64_t, uint64_t, uint64_t> ArgumentTuple; ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { if (argc != 6) { std::cout << @@ -22,7 +22,7 @@ ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { exit(EXIT_FAILURE); } try { - return ArgumentTuple(argv[1], std::stoull(argv[2]), std::stoull(argv[3]), std::stoull(argv[4]),std::stoull(argv[5])); + return ArgumentTuple(argv[1], std::stoull(argv[2]), std::stoull(argv[3]), std::stoull(argv[4]), std::stoull(argv[5])); } catch(std::exception& e) { std::cerr << "Fail to parse arguments" << std::endl; std::cerr << e.what() << std::endl; @@ -35,11 +35,11 @@ void work(asapo::GenericRequestHeader header, asapo::Error err) { nfiles--; if (err) { std::cerr << "File was not successfully send: " << err << std::endl; - nfiles = 0; + //nfiles = 0; mutex.unlock(); return; } - // std::cerr << "File was successfully send." << header.data_id << std::endl; + // std::cerr << "File was successfully send." << header.data_id << std::endl; mutex.unlock(); } @@ -48,7 +48,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it for(uint64_t i = 0; i < iterations; i++) { // std::cerr << "Send file " << i + 1 << "/" << iterations << std::endl; - auto err = producer->Send(i + 1, buffer.get(), number_of_byte,std::to_string(i), &work); + auto err = producer->Send(i + 1, buffer.get(), number_of_byte, std::to_string(i), &work); if (err) { std::cerr << "Cannot send file: " << err << std::endl; return false; @@ -65,23 +65,23 @@ int main (int argc, char* argv[]) { uint64_t nthreads; uint64_t mode; - std::tie(receiver_address, number_of_kbytes, iterations, nthreads,mode) = ProcessCommandArguments(argc, argv); + std::tie(receiver_address, number_of_kbytes, iterations, nthreads, mode) = ProcessCommandArguments(argc, argv); std::cout << "receiver_address: " << receiver_address << std::endl << "Package size: " << number_of_kbytes << "k" << std::endl << "iterations: " << iterations << std::endl << "nthreads: " << nthreads << std::endl << "mode: " << mode << std::endl - << std::endl; + << std::endl; nfiles = iterations; asapo::Error err; std::unique_ptr<asapo::Producer> producer; if (mode == 0) { - producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kTcp,&err); + producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kTcp, &err); } else { - producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kFilesystem,&err); + producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kFilesystem, &err); } producer->EnableLocalLog(true); producer->SetLogLevel(asapo::LogLevel::Debug); diff --git a/producer/api/include/producer/common.h b/producer/api/include/producer/common.h index 2f6b6322e..4c0cb7792 100644 --- a/producer/api/include/producer/common.h +++ b/producer/api/include/producer/common.h @@ -14,8 +14,8 @@ const uint8_t kMaxProcessingThreads = 32; using RequestCallback = std::function<void(GenericRequestHeader, Error)>; enum class RequestHandlerType { - kTcp, - kFilesystem + kTcp, + kFilesystem }; diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index 0174ca8de..303f79d29 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -16,7 +16,8 @@ class Producer { /*! * @return A unique_ptr to a new producer instance */ - static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type, + static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, + asapo::RequestHandlerType type, Error* err); virtual ~Producer() = default; @@ -28,7 +29,8 @@ class Producer { \param file_size - The size of the data. \return Error - Will be nullptr on success */ - virtual Error Send(uint64_t file_id, const void* data, size_t file_size,std::string file_name, RequestCallback callback) = 0; + virtual Error Send(uint64_t file_id, const void* data, size_t file_size, std::string file_name, + RequestCallback callback) = 0; //! Set internal log level virtual void SetLogLevel(LogLevel level) = 0; //! Enables/Disables logs output to stdout diff --git a/producer/api/src/producer.cpp b/producer/api/src/producer.cpp index 01be40c65..a8e3e38b4 100644 --- a/producer/api/src/producer.cpp +++ b/producer/api/src/producer.cpp @@ -2,7 +2,7 @@ #include "producer_impl.h" std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads, - asapo::RequestHandlerType type, Error* err) { + asapo::RequestHandlerType type, Error* err) { if (n_processing_threads > kMaxProcessingThreads) { *err = TextError("Too many processing threads: " + std::to_string(n_processing_threads)); return nullptr; @@ -10,7 +10,7 @@ std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endp try { *err = nullptr; - return std::unique_ptr<asapo::Producer>(new ProducerImpl(endpoint, n_processing_threads,type)); + return std::unique_ptr<asapo::Producer>(new ProducerImpl(endpoint, n_processing_threads, type)); } catch (const std::exception& ex) { *err = TextError(ex.what()); return nullptr; diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index e2eca87ad..59ddc7686 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -12,23 +12,23 @@ const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t( const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s -ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads,asapo::RequestHandlerType type): +ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type): log__{GetDefaultProducerLogger()} { switch (type) { - case RequestHandlerType::kTcp: - discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs}); - request_handler_factory_.reset(new RequestHandlerFactory{discovery_service_.get()}); - break; - case RequestHandlerType::kFilesystem: - request_handler_factory_.reset(nullptr); - request_handler_factory_.reset(new RequestHandlerFactory{endpoint}); + case RequestHandlerType::kTcp: + discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs}); + request_handler_factory_.reset(new RequestHandlerFactory{discovery_service_.get()}); + break; + case RequestHandlerType::kFilesystem: + request_handler_factory_.reset(nullptr); + request_handler_factory_.reset(new RequestHandlerFactory{endpoint}); } request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get()}); } -GenericRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, size_t file_size,std::string file_name) { - GenericRequestHeader request{kOpcodeTransferData,file_id,file_size,std::move(file_name)}; +GenericRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, size_t file_size, std::string file_name) { + GenericRequestHeader request{kOpcodeTransferData, file_id, file_size, std::move(file_name)}; return request; } @@ -41,8 +41,9 @@ Error CheckProducerRequest(const GenericRequestHeader header) { } -Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size,std::string file_name,RequestCallback callback) { - auto request_header = GenerateNextSendRequest(file_id, file_size,std::move(file_name)); +Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size, std::string file_name, + RequestCallback callback) { + auto request_header = GenerateNextSendRequest(file_id, file_size, std::move(file_name)); auto err = CheckProducerRequest(request_header); if (err) { diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index 1706ed87a..b675c11ad 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -21,18 +21,19 @@ class ProducerImpl : public Producer { static const size_t kMaxChunkSize; static const size_t kDiscoveryServiceUpdateFrequencyMs; - explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads,asapo::RequestHandlerType type); + explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type); ProducerImpl(const ProducerImpl&) = delete; ProducerImpl& operator=(const ProducerImpl&) = delete; void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error Send(uint64_t file_id, const void* data, size_t file_size,std::string file_name, RequestCallback callback) override; + Error Send(uint64_t file_id, const void* data, size_t file_size, std::string file_name, + RequestCallback callback) override; AbstractLogger* log__; std::unique_ptr<RequestPool> request_pool__; private: - GenericRequestHeader GenerateNextSendRequest(uint64_t file_id, size_t file_size,std::string file_name); + GenericRequestHeader GenerateNextSendRequest(uint64_t file_id, size_t file_size, std::string file_name); }; Error CheckProducerRequest(const GenericRequestHeader header); diff --git a/producer/api/src/producer_logger.cpp b/producer/api/src/producer_logger.cpp index 35528043b..1fb485bbe 100644 --- a/producer/api/src/producer_logger.cpp +++ b/producer/api/src/producer_logger.cpp @@ -5,7 +5,7 @@ namespace asapo { AbstractLogger* GetDefaultProducerLogger() { //todo get fluentd uri from service discovery - // static Logger logger = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo"); +// static Logger logger = CreateDefaultLoggerApi("producer_api", "http://max-wgs.desy.de:9880/asapo"); static Logger logger = CreateDefaultLoggerBin("producer_api"); return logger.get(); diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp index 96ca46495..564313444 100644 --- a/producer/api/src/receiver_discovery_service.cpp +++ b/producer/api/src/receiver_discovery_service.cpp @@ -26,8 +26,8 @@ Error ReceiverDiscoveryService::ParseResponse(const std::string& responce, Recei uint64_t* max_connections) { auto parser = JsonStringParser(responce); Error err; - (err = parser.GetArrayString("uri_list", list)) || - (err = parser.GetUInt64("max_connections", max_connections)); + (err = parser.GetArrayString("Uris", list)) || + (err = parser.GetUInt64("MaxConnections", max_connections)); return err; } diff --git a/producer/api/src/request.h b/producer/api/src/request.h index beb9daf8e..4c0a7ccdd 100644 --- a/producer/api/src/request.h +++ b/producer/api/src/request.h @@ -7,9 +7,9 @@ namespace asapo { struct Request { - GenericRequestHeader header; - const void* data; - RequestCallback callback; + GenericRequestHeader header; + const void* data; + RequestCallback callback; }; } diff --git a/producer/api/src/request_handler.h b/producer/api/src/request_handler.h index 70138c7f0..e99bbd284 100644 --- a/producer/api/src/request_handler.h +++ b/producer/api/src/request_handler.h @@ -9,12 +9,12 @@ namespace asapo { class RequestHandler { - public: - virtual void PrepareProcessingRequestLocked() = 0; - virtual void TearDownProcessingRequestLocked(const Error &error_from_process) = 0; - virtual Error ProcessRequestUnlocked(const Request* request) = 0; - virtual bool ReadyProcessRequest() = 0; - virtual ~RequestHandler() = default; + public: + virtual void PrepareProcessingRequestLocked() = 0; + virtual void TearDownProcessingRequestLocked(const Error& error_from_process) = 0; + virtual Error ProcessRequestUnlocked(const Request* request) = 0; + virtual bool ReadyProcessRequest() = 0; + virtual ~RequestHandler() = default; }; diff --git a/producer/api/src/request_handler_factory.cpp b/producer/api/src/request_handler_factory.cpp index bd6ff52dd..3fbd76d28 100644 --- a/producer/api/src/request_handler_factory.cpp +++ b/producer/api/src/request_handler_factory.cpp @@ -10,8 +10,8 @@ std::unique_ptr<RequestHandler> RequestHandlerFactory::NewRequestHandler(uint64_ switch (type_) { case asapo::RequestHandlerType::kTcp: return std::unique_ptr<RequestHandler> {new RequestHandlerTcp(discovery_service_, thread_id, shared_counter)}; - case asapo::RequestHandlerType::kFilesystem: - return std::unique_ptr<RequestHandler> {new RequestHandlerFilesystem(destination_folder_, thread_id)}; + case asapo::RequestHandlerType::kFilesystem: + return std::unique_ptr<RequestHandler> {new RequestHandlerFilesystem(destination_folder_, thread_id)}; } return nullptr; @@ -25,7 +25,7 @@ RequestHandlerFactory::RequestHandlerFactory(ReceiverDiscoveryService* discovery } RequestHandlerFactory::RequestHandlerFactory(std::string destination_folder): type_{RequestHandlerType::kFilesystem}, - destination_folder_{std::move(destination_folder)} { + destination_folder_{std::move(destination_folder)} { } diff --git a/producer/api/src/request_handler_factory.h b/producer/api/src/request_handler_factory.h index 3c38fd314..199d0aa93 100644 --- a/producer/api/src/request_handler_factory.h +++ b/producer/api/src/request_handler_factory.h @@ -16,7 +16,7 @@ class RequestHandlerFactory { public: RequestHandlerFactory(ReceiverDiscoveryService* discovery_service); RequestHandlerFactory(std::string destination_folder); - VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter); + VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter); private: RequestHandlerType type_; ReceiverDiscoveryService* discovery_service_{nullptr}; diff --git a/producer/api/src/request_handler_filesystem.cpp b/producer/api/src/request_handler_filesystem.cpp index f198d9a2d..8e35164f3 100644 --- a/producer/api/src/request_handler_filesystem.cpp +++ b/producer/api/src/request_handler_filesystem.cpp @@ -9,20 +9,20 @@ namespace asapo { RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id): - io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()}, destination_folder_{std::move(destination_folder)}, thread_id_{thread_id} -{ + io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()}, destination_folder_{std::move(destination_folder)}, + thread_id_{thread_id} { } Error RequestHandlerFilesystem::ProcessRequestUnlocked(const Request* request) { - std::string fullpath = destination_folder_ + "/"+request->header.file_name+".bin"; - auto err = io__->WriteDataToFile(fullpath,(uint8_t*)request->data,request->header.data_size); + std::string fullpath = destination_folder_ + "/" + request->header.file_name + ".bin"; + auto err = io__->WriteDataToFile(fullpath, (uint8_t*)request->data, request->header.data_size); if (request->callback) { request->callback(request->header, std::move(err)); } - return nullptr; + return nullptr; } diff --git a/producer/api/src/request_handler_filesystem.h b/producer/api/src/request_handler_filesystem.h index 8436562fc..ba29a407e 100644 --- a/producer/api/src/request_handler_filesystem.h +++ b/producer/api/src/request_handler_filesystem.h @@ -18,7 +18,9 @@ class RequestHandlerFilesystem: public RequestHandler { public: explicit RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id); Error ProcessRequestUnlocked(const Request* request) override; - bool ReadyProcessRequest() override {return true;}; + bool ReadyProcessRequest() override { + return true; + }; void PrepareProcessingRequestLocked() override {}; void TearDownProcessingRequestLocked(const Error& error_from_process) override {}; diff --git a/producer/api/unittests/mocking.h b/producer/api/unittests/mocking.h index 13d14da91..7d231cc2a 100644 --- a/producer/api/unittests/mocking.h +++ b/producer/api/unittests/mocking.h @@ -9,52 +9,51 @@ namespace asapo { -const std::string expected_endpoint="expected_endpont"; +const std::string expected_endpoint = "expected_endpont"; class MockDiscoveryService : public asapo::ReceiverDiscoveryService { - public: - MockDiscoveryService() : ReceiverDiscoveryService{expected_endpoint, 1} {}; - MOCK_METHOD0(StartCollectingData, void()); - MOCK_METHOD0(MaxConnections, uint64_t()); - MOCK_METHOD1(RotatedUriList, ReceiversList(uint64_t)); - uint64_t UpdateFrequency() override { - return 0; - } + public: + MockDiscoveryService() : ReceiverDiscoveryService{expected_endpoint, 1} {}; + MOCK_METHOD0(StartCollectingData, void()); + MOCK_METHOD0(MaxConnections, uint64_t()); + MOCK_METHOD1(RotatedUriList, ReceiversList(uint64_t)); + uint64_t UpdateFrequency() override { + return 0; + } }; class MockRequestPull : public RequestPool { - public: - MockRequestPull(RequestHandlerFactory* request_handler_factory) : - RequestPool{1, request_handler_factory} {}; - asapo::Error AddRequest(std::unique_ptr<asapo::Request> request) override { - if (request == nullptr) { - return asapo::Error{AddRequest_t(nullptr)}; - } - return asapo::Error{AddRequest_t(request.get())}; - } - MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (Request*)); + public: + MockRequestPull(RequestHandlerFactory* request_handler_factory) : + RequestPool{1, request_handler_factory} {}; + asapo::Error AddRequest(std::unique_ptr<asapo::Request> request) override { + if (request == nullptr) { + return asapo::Error{AddRequest_t(nullptr)}; + } + return asapo::Error{AddRequest_t(request.get())}; + } + MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (Request*)); }; class MockRequestHandler : public RequestHandler { - public: + public: - Error ProcessRequestUnlocked(const Request* request) override { - return Error{ProcessRequestUnlocked_t(request)}; - } - void TearDownProcessingRequestLocked(const Error& error_from_process) override{ - if (error_from_process) { - TearDownProcessingRequestLocked_t(error_from_process.get()); - } - else { - TearDownProcessingRequestLocked_t(nullptr); - } - } + Error ProcessRequestUnlocked(const Request* request) override { + return Error{ProcessRequestUnlocked_t(request)}; + } + void TearDownProcessingRequestLocked(const Error& error_from_process) override { + if (error_from_process) { + TearDownProcessingRequestLocked_t(error_from_process.get()); + } else { + TearDownProcessingRequestLocked_t(nullptr); + } + } MOCK_METHOD0(PrepareProcessingRequestLocked, void()); - MOCK_METHOD0(ReadyProcessRequest, bool()); - MOCK_METHOD1(TearDownProcessingRequestLocked_t, void(ErrorInterface* error_from_process)); - MOCK_METHOD1(ProcessRequestUnlocked_t, ErrorInterface*(const Request*)); + MOCK_METHOD0(ReadyProcessRequest, bool()); + MOCK_METHOD1(TearDownProcessingRequestLocked_t, void(ErrorInterface* error_from_process)); + MOCK_METHOD1(ProcessRequestUnlocked_t, ErrorInterface * (const Request*)); }; diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp index cc9cdb135..941456e79 100644 --- a/producer/api/unittests/test_producer.cpp +++ b/producer/api/unittests/test_producer.cpp @@ -11,14 +11,16 @@ namespace { TEST(CreateProducer, TcpProducer) { asapo::Error err; - std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint",4,asapo::RequestHandlerType::kTcp, &err); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, asapo::RequestHandlerType::kTcp, + &err); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); } TEST(CreateProducer, FileSystemProducer) { asapo::Error err; - std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4,asapo::RequestHandlerType::kFilesystem, &err); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, + asapo::RequestHandlerType::kFilesystem, &err); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); } @@ -27,15 +29,15 @@ TEST(CreateProducer, FileSystemProducer) { TEST(CreateProducer, TooManyThreads) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, - asapo::RequestHandlerType::kTcp, &err); + asapo::RequestHandlerType::kTcp, &err); ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(err, Ne(nullptr)); } TEST(Producer, SimpleWorkflowWihoutConnection) { asapo::Error err; - std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp,&err); - auto err_send = producer->Send(1, nullptr, 1, "",nullptr); + std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp, &err); + auto err_send = producer->Send(1, nullptr, 1, "", nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index 8f68ef6fe..084557fb3 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -30,17 +30,17 @@ using asapo::RequestPool; using asapo::Request; -MATCHER_P3(M_CheckSendDataRequest, file_id, file_size,file_name, +MATCHER_P3(M_CheckSendDataRequest, file_id, file_size, file_name, "Checks if a valid GenericRequestHeader was Send") { return ((asapo::GenericRequestHeader*)arg)->op_code == asapo::kOpcodeTransferData - && ((asapo::GenericRequestHeader*)arg)->data_id == file_id - && std::string(((asapo::GenericRequestHeader*)arg)->file_name) == file_name - && ((asapo::GenericRequestHeader*)arg)->data_size == file_size; + && ((asapo::GenericRequestHeader*)arg)->data_id == file_id + && std::string(((asapo::GenericRequestHeader*)arg)->file_name) == file_name + && ((asapo::GenericRequestHeader*)arg)->data_size == file_size; } TEST(ProducerImpl, Constructor) { - asapo::ProducerImpl producer{"", 4,asapo::RequestHandlerType::kTcp}; + asapo::ProducerImpl producer{"", 4, asapo::RequestHandlerType::kTcp}; ASSERT_THAT(dynamic_cast<asapo::AbstractLogger*>(producer.log__), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::RequestPool*>(producer.request_pool__.get()), Ne(nullptr)); } @@ -51,7 +51,7 @@ class ProducerImplTests : public testing::Test { asapo::RequestHandlerFactory factory{&service}; testing::NiceMock<asapo::MockLogger> mock_logger; testing::NiceMock<MockRequestPull> mock_pull{&factory}; - asapo::ProducerImpl producer{"", 1,asapo::RequestHandlerType::kTcp}; + asapo::ProducerImpl producer{"", 1, asapo::RequestHandlerType::kTcp}; void SetUp() override { producer.log__ = &mock_logger; producer.request_pool__ = std::unique_ptr<RequestPool> {&mock_pull}; @@ -64,12 +64,12 @@ class ProducerImplTests : public testing::Test { TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); - auto err = producer.Send(1, nullptr, 1, "",nullptr); + auto err = producer.Send(1, nullptr, 1, "", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { - auto err = producer.Send(1, nullptr, asapo::ProducerImpl::kMaxChunkSize + 1,"", nullptr); + auto err = producer.Send(1, nullptr, asapo::ProducerImpl::kMaxChunkSize + 1, "", nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } @@ -80,12 +80,12 @@ TEST_F(ProducerImplTests, OKSendingRequest) { std::string expected_name = "test_name"; - Request request{asapo::GenericRequestHeader{asapo::kOpcodeTransferData,expected_id,expected_size,expected_name}, nullptr, nullptr}; + Request request{asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, nullptr, nullptr}; - EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(expected_id,expected_size,expected_name))).WillOnce(Return( - nullptr)); + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(expected_id, expected_size, expected_name))).WillOnce(Return( + nullptr)); - auto err = producer.Send(expected_id, nullptr, expected_size,expected_name, nullptr); + auto err = producer.Send(expected_id, nullptr, expected_size, expected_name, nullptr); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/producer/api/unittests/test_receiver_discovery_service.cpp b/producer/api/unittests/test_receiver_discovery_service.cpp index 772f6563b..c69af5fdd 100644 --- a/producer/api/unittests/test_receiver_discovery_service.cpp +++ b/producer/api/unittests/test_receiver_discovery_service.cpp @@ -104,7 +104,7 @@ TEST_F(ReceiversStatusTests, LogWhenWhenCannotReadResponce) { TEST_F(ReceiversStatusTests, GetsReqestedInformation) { - std::string json = R"({"uri_list":["s1","s2","s3"], "max_connections":8})"; + std::string json = R"({"Uris":["s1","s2","s3"], "MaxConnections":8})"; EXPECT_CALL(*mock_http_client, Get_t(expected_endpoint, _, _)) .Times(testing::AtLeast(1)) diff --git a/producer/api/unittests/test_request_handler_filesystem.cpp b/producer/api/unittests/test_request_handler_filesystem.cpp index 7579e63d1..7cee0ad24 100644 --- a/producer/api/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/unittests/test_request_handler_filesystem.cpp @@ -45,23 +45,23 @@ class RequestHandlerFilesystemTests : public testing::Test { std::string expected_file_name = "test_name"; uint64_t expected_thread_id = 2; std::string expected_destination = "destination"; - std::string expected_fullpath = expected_destination + "/" + expected_file_name+".bin"; + std::string expected_fullpath = expected_destination + "/" + expected_file_name + ".bin"; asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; uint8_t* expected_data_pointer = (uint8_t*)0xC00FE; asapo::Error callback_err; - asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size,expected_file_name}; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; asapo::Request request{header, expected_data_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { - called = true; - callback_err = std::move(err); - callback_header = header; - }}; + called = true; + callback_err = std::move(err); + callback_header = header; + }}; asapo::Request request_nocallback{header, expected_data_pointer, nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; - asapo::RequestHandlerFilesystem request_handler{expected_destination,expected_thread_id}; + asapo::RequestHandlerFilesystem request_handler{expected_destination, expected_thread_id}; void SetUp() override { request_handler.log__ = &mock_logger; @@ -89,7 +89,7 @@ TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { .WillOnce( Return( asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) - ); + ); auto err = request_handler.ProcessRequestUnlocked(&request); @@ -101,10 +101,10 @@ TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, expected_data_pointer, expected_file_size)) - .WillOnce( - Return( - asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) - ); + .WillOnce( + Return( + asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) + ); auto err = request_handler.ProcessRequestUnlocked(&request_nocallback); @@ -117,10 +117,10 @@ TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { TEST_F(RequestHandlerFilesystemTests, TransferOK) { EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, expected_data_pointer, expected_file_size)) - .WillOnce( - Return( - nullptr) - ); + .WillOnce( + Return( + nullptr) + ); request_handler.PrepareProcessingRequestLocked(); auto err = request_handler.ProcessRequestUnlocked(&request); diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 3eb1e025f..8eb3a24d6 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -55,14 +55,14 @@ class RequestHandlerTcpTests : public testing::Test { asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; void* expected_file_pointer = (void*)0xC00FE; asapo::Error callback_err; - asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size,expected_file_name}; + asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; asapo::Request request{header, expected_file_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { - called = true; - callback_err = std::move(err); - callback_header = header; - }}; + called = true; + callback_err = std::move(err); + callback_header = header; + }}; asapo::Request request_nocallback{header, expected_file_pointer, nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; @@ -74,7 +74,7 @@ class RequestHandlerTcpTests : public testing::Test { asapo::ReceiversList receivers_list{expected_address1, expected_address2}; asapo::ReceiversList receivers_list2{expected_address2, expected_address1}; - asapo::ReceiversList receivers_list_single{expected_address1}; + asapo::ReceiversList receivers_list_single{expected_address1}; std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943}; @@ -92,7 +92,7 @@ class RequestHandlerTcpTests : public testing::Test { request_handler.log__ = &mock_logger; request_handler.io__.reset(&mock_io); ON_CALL(mock_discovery_service, RotatedUriList(_)). - WillByDefault(Return(receivers_list)); + WillByDefault(Return(receivers_list)); } void TearDown() override { @@ -288,6 +288,7 @@ void RequestHandlerTcpTests::DoSingleSend(bool connect, bool success) { Mock::VerifyAndClearExpectations(&mock_io); Mock::VerifyAndClearExpectations(&mock_logger); Mock::VerifyAndClearExpectations(&mock_discovery_service); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } TEST_F(RequestHandlerTcpTests, CannotProcessRequestIfNotEnoughConnections) { @@ -347,8 +348,13 @@ TEST_F(RequestHandlerTcpTests, TriesConnectWhenNotConnected) { TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) { DoSingleSend(); + EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). + WillOnce(Return(receivers_list_single)); + + EXPECT_CALL(mock_io, CreateAndConnectIPTCPSocket_t(_, _)) .Times(0); + ExpectFailSendHeader(true); auto err = request_handler.ProcessRequestUnlocked(&request); @@ -375,7 +381,7 @@ TEST_F(RequestHandlerTcpTests, CloseConnectionWhenRebalance) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); EXPECT_CALL(mock_discovery_service, RotatedUriList(_)). - WillOnce(Return(asapo::ReceiversList{})); + WillOnce(Return(asapo::ReceiversList{})); EXPECT_CALL(mock_io, CloseSocket_t(_, _)); diff --git a/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt index 298b81ed2..c7be857c5 100644 --- a/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt +++ b/tests/automatic/producer_receiver/check_monitoring/CMakeLists.txt @@ -4,5 +4,6 @@ set(TARGET_NAME receiver) # Testing ################################ configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json receiver.json COPYONLY) -add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin>" nomem +configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/discovery_settings.json discovery.json COPYONLY) +add_script_test("${TARGET_NAME}-monitoring" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:asapo-discovery,EXENAME>" nomem ) diff --git a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh index 5eeb022b9..16cc1bdc9 100644 --- a/tests/automatic/producer_receiver/check_monitoring/check_linux.sh +++ b/tests/automatic/producer_receiver/check_monitoring/check_linux.sh @@ -11,19 +11,24 @@ Cleanup() { echo cleanup influx -execute "drop database ${database_name}" kill $receiverid + kill $discoveryid rm -rf files echo "db.dropDatabase()" | mongo ${mongo_database_name} } influx -execute "create database ${database_name}" +nohup $3 -config discovery.json &>/dev/null & +discoveryid=`echo $!` +sleep 0.3 + nohup $2 receiver.json &>/dev/null & sleep 0.3 receiverid=`echo $!` mkdir files -$1 localhost:4200 100 112 +$1 localhost:5006 100 112 4 0 sleep 1 diff --git a/tests/automatic/settings/discovery_settings.json b/tests/automatic/settings/discovery_settings.json index 642dcfffb..379b5bfa0 100644 --- a/tests/automatic/settings/discovery_settings.json +++ b/tests/automatic/settings/discovery_settings.json @@ -1,6 +1,6 @@ { "MaxConnections": 32, - "Endpoints": ["localhost:8086"], + "Endpoints": ["localhost:4200"], "Mode": "static", "Port":5006, "LogLevel":"debug" -- GitLab