From dcd7590a91e485191432a839f26485104dd3d123 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Fri, 15 Feb 2019 12:13:46 +0100 Subject: [PATCH] fix for windows --- common/cpp/CMakeLists.txt | 1 + common/cpp/include/common/networking.h | 6 +++- .../cpp/include/request}/request_pool.h | 16 ++++----- .../cpp/src/request}/request_pool.cpp | 13 ++++--- .../unittests/request}/test_request_pool.cpp | 30 ++++++++-------- producer/api/CMakeLists.txt | 9 +++-- producer/api/src/producer_impl.cpp | 10 +++--- producer/api/src/producer_impl.h | 4 +-- producer/api/src/request.cpp | 14 -------- producer/api/src/request.h | 22 ------------ producer/api/src/request_handler.h | 22 ------------ producer/api/src/request_handler_factory.cpp | 32 ----------------- producer/api/src/request_handler_factory.h | 25 ------------- .../api/src/request_handler_filesystem.cpp | 20 +++++++---- producer/api/src/request_handler_filesystem.h | 4 +-- producer/api/src/request_handler_tcp.cpp | 19 ++++++---- producer/api/src/request_handler_tcp.h | 12 +++---- producer/api/unittests/mocking.h | 35 ++++--------------- producer/api/unittests/test_producer_impl.cpp | 14 ++++---- .../test_request_handler_factory.cpp | 8 ++--- .../test_request_handler_filesystem.cpp | 8 +++-- .../unittests/test_request_handler_tcp.cpp | 6 ++-- receiver/src/data_cache.cpp | 2 +- receiver/src/data_cache.h | 2 +- receiver/src/request_handler.h | 6 ++-- 25 files changed, 110 insertions(+), 230 deletions(-) rename {producer/api/src => common/cpp/include/request}/request_pool.h (75%) rename {producer/api/src => common/cpp/src/request}/request_pool.cpp (88%) rename {producer/api/unittests => common/cpp/unittests/request}/test_request_pool.cpp (80%) delete mode 100644 producer/api/src/request.cpp delete mode 100644 producer/api/src/request.h delete mode 100644 producer/api/src/request_handler.h delete mode 100644 producer/api/src/request_handler_factory.cpp delete mode 100644 producer/api/src/request_handler_factory.h diff --git a/common/cpp/CMakeLists.txt b/common/cpp/CMakeLists.txt index 05ba02232..d7770ef6a 100644 --- a/common/cpp/CMakeLists.txt +++ b/common/cpp/CMakeLists.txt @@ -10,6 +10,7 @@ add_subdirectory(src/http_client) add_subdirectory(src/logger) +add_subdirectory(src/request) if(BUILD_MONGODB_CLIENTLIB) add_subdirectory(src/database) diff --git a/common/cpp/include/common/networking.h b/common/cpp/include/common/networking.h index c4cea3d36..cafe5f2ce 100644 --- a/common/cpp/include/common/networking.h +++ b/common/cpp/include/common/networking.h @@ -36,7 +36,11 @@ struct GenericRequestHeader { op_code{i_op_code}, data_id{i_data_id}, data_size{i_data_size} { strncpy(message, i_message.c_str(), kMaxMessageSize); } - Opcode op_code; + GenericRequestHeader(const GenericRequestHeader &header) {op_code = header.op_code, data_id = header.data_id,data_size = header.data_size, + strncpy(message, header.message, kMaxMessageSize); + } + + Opcode op_code; uint64_t data_id; uint64_t data_size; char message[kMaxMessageSize]; diff --git a/producer/api/src/request_pool.h b/common/cpp/include/request/request_pool.h similarity index 75% rename from producer/api/src/request_pool.h rename to common/cpp/include/request/request_pool.h index 4fac37bbc..4b344846e 100644 --- a/producer/api/src/request_pool.h +++ b/common/cpp/include/request/request_pool.h @@ -8,11 +8,9 @@ #include <condition_variable> #include <queue> - #include "logger/logger.h" -#include "request_handler_tcp.h" #include "request_handler_factory.h" - +#include "request.h" #include "preprocessor/definitions.h" @@ -23,23 +21,23 @@ class RequestPool { std::unique_lock<std::mutex> lock; }; public: - explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory); - VIRTUAL Error AddRequest(std::unique_ptr<Request> request); + explicit RequestPool(uint8_t n_threads, RequestHandlerFactory* request_handler_factory,AbstractLogger* log); + VIRTUAL Error AddRequest(std::unique_ptr<GenericRequest> request); ~RequestPool(); - AbstractLogger* log__; uint64_t NRequestsInQueue(); private: + AbstractLogger* log__; RequestHandlerFactory* request_handler_factory__; std::vector<std::thread> threads_; void ThreadHandler(uint64_t id); bool quit_{false}; std::condition_variable condition_; std::mutex mutex_; - std::deque<std::unique_ptr<Request>> request_queue_; + std::deque<std::unique_ptr<GenericRequest>> request_queue_; bool CanProcessRequest(const std::unique_ptr<RequestHandler>& request_handler); void ProcessRequest(const std::unique_ptr<RequestHandler>& request_handler, ThreadInformation* thread_info); - std::unique_ptr<Request> GetRequestFromQueue(); - void PutRequestBackToQueue(std::unique_ptr<Request>request); + std::unique_ptr<GenericRequest> GetRequestFromQueue(); + void PutRequestBackToQueue(std::unique_ptr<GenericRequest>request); uint64_t shared_counter_{0}; }; diff --git a/producer/api/src/request_pool.cpp b/common/cpp/src/request/request_pool.cpp similarity index 88% rename from producer/api/src/request_pool.cpp rename to common/cpp/src/request/request_pool.cpp index a5598feed..63f484e6d 100644 --- a/producer/api/src/request_pool.cpp +++ b/common/cpp/src/request/request_pool.cpp @@ -1,14 +1,13 @@ -#include "request_pool.h" -#include "producer_logger.h" +#include "request/request_pool.h" namespace asapo { RequestPool:: RequestPool(uint8_t n_threads, - RequestHandlerFactory* request_handler_factory): log__{GetDefaultProducerLogger()}, + RequestHandlerFactory* request_handler_factory,AbstractLogger* log): log__{log}, request_handler_factory__{request_handler_factory}, threads_{n_threads} { - for(size_t i = 0; i < threads_.size(); i++) { + for(size_t i = 0; i < n_threads; i++) { log__->Debug("starting thread " + std::to_string(i)); threads_[i] = std::thread( [this, i] {ThreadHandler(i);}); @@ -16,7 +15,7 @@ RequestPool:: RequestPool(uint8_t n_threads, } -Error RequestPool::AddRequest(std::unique_ptr<Request> request) { +Error RequestPool::AddRequest(std::unique_ptr<GenericRequest> request) { std::unique_lock<std::mutex> lock(mutex_); request_queue_.emplace_back(std::move(request)); lock.unlock(); @@ -30,13 +29,13 @@ bool RequestPool::CanProcessRequest(const std::unique_ptr<RequestHandler>& reque return request_queue_.size() && request_handler->ReadyProcessRequest(); } -std::unique_ptr<Request> RequestPool::GetRequestFromQueue() { +std::unique_ptr<GenericRequest> RequestPool::GetRequestFromQueue() { auto request = std::move(request_queue_.front()); request_queue_.pop_front(); return request; } -void RequestPool::PutRequestBackToQueue(std::unique_ptr<Request> request) { +void RequestPool::PutRequestBackToQueue(std::unique_ptr<GenericRequest> request) { request_queue_.emplace_front(std::move(request)); } diff --git a/producer/api/unittests/test_request_pool.cpp b/common/cpp/unittests/request/test_request_pool.cpp similarity index 80% rename from producer/api/unittests/test_request_pool.cpp rename to common/cpp/unittests/request/test_request_pool.cpp index aaca6d65b..bee5a58e6 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/common/cpp/unittests/request/test_request_pool.cpp @@ -5,10 +5,8 @@ #include "unittests/MockLogger.h" #include "common/error.h" -#include "../src/request_handler_tcp.h" -#include "../src/request_pool.h" -#include "../src/receiver_discovery_service.h" -#include "../src/request_handler_factory.h" +#include "../../include/request/request_pool.h" +#include "../../include/request/request_handler_factory.h" #include "mocking.h" #include "io/io_factory.h" @@ -35,7 +33,7 @@ using asapo::RequestHandler; using asapo::RequestPool; using asapo::Error; using asapo::ErrorInterface; -using asapo::Request; +using asapo::GenericRequest; using asapo::GenericRequestHeader; @@ -43,7 +41,7 @@ using asapo::GenericRequestHeader; class MockRequestHandlerFactory : public asapo::RequestHandlerFactory { public: MockRequestHandlerFactory(RequestHandler* request_handler): - RequestHandlerFactory(nullptr) { + RequestHandlerFactory() { request_handler_ = request_handler; } std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) override { @@ -53,6 +51,10 @@ class MockRequestHandlerFactory : public asapo::RequestHandlerFactory { RequestHandler* request_handler_; }; +class TestRequest : public GenericRequest { + public: + TestRequest(GenericRequestHeader header):GenericRequest(header){}; +}; class RequestPoolTests : public testing::Test { @@ -61,10 +63,9 @@ class RequestPoolTests : public testing::Test { NiceMock<asapo::MockLogger> mock_logger; MockRequestHandlerFactory request_handler_factory{mock_request_handler}; const uint8_t nthreads = 1; - asapo::RequestPool pool {nthreads, &request_handler_factory}; - std::unique_ptr<Request> request{new Request{"", GenericRequestHeader{}, nullptr, "", nullptr}}; + asapo::RequestPool pool {nthreads, &request_handler_factory,&mock_logger}; + std::unique_ptr<GenericRequest> request{new TestRequest{GenericRequestHeader{}}}; void SetUp() override { - pool.log__ = &mock_logger; } void TearDown() override { } @@ -72,12 +73,13 @@ class RequestPoolTests : public testing::Test { TEST(RequestPool, Constructor) { - NiceMock<MockDiscoveryService> ds; - NiceMock<asapo::RequestHandlerFactory> request_handler_factory{&ds}; + NiceMock<asapo::MockLogger> mock_logger; + MockRequestHandlerFactory factory(nullptr); - asapo::RequestPool pool{4, &request_handler_factory}; + EXPECT_CALL(mock_logger, Debug(HasSubstr("starting"))).Times(4); + EXPECT_CALL(mock_logger, Debug(HasSubstr("finishing thread"))).Times(4); - ASSERT_THAT(dynamic_cast<const asapo::AbstractLogger*>(pool.log__), Ne(nullptr)); + asapo::RequestPool pool{4, &factory,&mock_logger}; } TEST_F(RequestPoolTests, AddRequestDoesNotGoFurtherWhenNotReady) { @@ -118,7 +120,7 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { - Request* request2 = new Request{"", GenericRequestHeader{}, nullptr, "", nullptr}; + TestRequest* request2 = new TestRequest{GenericRequestHeader{}}; ExpectSend(mock_request_handler, 2); diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index ca21d6cf8..1c3b0b5a6 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -5,17 +5,16 @@ set(SOURCE_FILES src/producer_logger.cpp src/request_handler_tcp.cpp src/request_handler_filesystem.cpp - src/request_pool.cpp src/receiver_discovery_service.cpp - src/request_handler_factory.cpp - src/request.cpp) + src/producer_request_handler_factory.cpp + src/producer_request.cpp) ################################ # Library ################################ add_library(${TARGET_NAME} STATIC ${SOURCE_FILES} $<TARGET_OBJECTS:system_io> $<TARGET_OBJECTS:logger> $<TARGET_OBJECTS:json_parser> - $<TARGET_OBJECTS:curl_http_client> ) + $<TARGET_OBJECTS:curl_http_client> $<TARGET_OBJECTS:request_pool>) target_include_directories(${TARGET_NAME} PUBLIC include ${ASAPO_CXX_COMMON_INCLUDE_DIR}) target_link_libraries(${TARGET_NAME} ${CURL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) @@ -27,9 +26,9 @@ set(TEST_SOURCE_FILES unittests/test_producer.cpp unittests/test_request_handler_tcp.cpp unittests/test_request_handler_filesystem.cpp - unittests/test_request_pool.cpp unittests/test_receiver_discovery_service.cpp unittests/test_request_handler_factory.cpp + unittests/test_producer_request.cpp ) set(TEST_LIBRARIES "${TARGET_NAME}") diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 2f1a700d6..20cefa78e 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -5,6 +5,8 @@ #include "producer_logger.h" #include "io/io_factory.h" #include "producer/producer_error.h" +#include "producer_request_handler_factory.h" +#include "producer_request.h" namespace asapo { @@ -17,14 +19,14 @@ ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads, a switch (type) { case RequestHandlerType::kTcp: discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs}); - request_handler_factory_.reset(new RequestHandlerFactory{discovery_service_.get()}); + request_handler_factory_.reset(new ProducerRequestHandlerFactory{discovery_service_.get()}); break; case RequestHandlerType::kFilesystem: request_handler_factory_.reset(nullptr); - request_handler_factory_.reset(new RequestHandlerFactory{endpoint}); + request_handler_factory_.reset(new ProducerRequestHandlerFactory{endpoint}); } - request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get()}); + request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get(),log__}); } GenericRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, uint64_t file_size, @@ -57,7 +59,7 @@ Error ProducerImpl::Send(const EventHeader& event_header, auto request_header = GenerateNextSendRequest(event_header.file_id, event_header.file_size, event_header.file_name); - return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{beamtime_id_, request_header, + return request_pool__->AddRequest(std::unique_ptr<ProducerRequest> {new ProducerRequest{beamtime_id_, std::move(request_header), std::move(data), std::move(full_path), callback} }); diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index da104f95c..20f1e8be8 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -6,8 +6,8 @@ #include <io/io.h> #include "producer/producer.h" #include "logger/logger.h" -#include "request_pool.h" -#include "request_handler_factory.h" +#include "request/request_pool.h" +#include "producer_request_handler_factory.h" #include "receiver_discovery_service.h" namespace asapo { diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp deleted file mode 100644 index faae53e91..000000000 --- a/producer/api/src/request.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include "request.h" - -namespace asapo { - -Error Request::ReadDataFromFileIfNeeded(const IO* io) { - if (data != nullptr || original_filepath.empty()) { - return nullptr; - } - Error err; - data = io->GetDataFromFile(original_filepath, &header.data_size, &err); - return err; -} - -} \ No newline at end of file diff --git a/producer/api/src/request.h b/producer/api/src/request.h deleted file mode 100644 index 3b5fe21c1..000000000 --- a/producer/api/src/request.h +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef ASAPO_PRODUCER_REQUEST_H -#define ASAPO_PRODUCER_REQUEST_H - -#include "common/networking.h" -#include "producer/common.h" -#include "common/data_structs.h" -#include "io/io.h" - -namespace asapo { - -struct Request { - std::string beamtime_id; - GenericRequestHeader header; - FileData data; - std::string original_filepath; - RequestCallback callback; - Error ReadDataFromFileIfNeeded(const IO* io); -}; - -} - -#endif //ASAPO_PRODUCER_REQUEST_H diff --git a/producer/api/src/request_handler.h b/producer/api/src/request_handler.h deleted file mode 100644 index 64b39ced5..000000000 --- a/producer/api/src/request_handler.h +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef ASAPO_PRODUCER_REQUEST_HANDLER_H -#define ASAPO_PRODUCER_REQUEST_HANDLER_H - -#include <memory> - -#include "common/error.h" -#include "request.h" - -namespace asapo { - -class RequestHandler { - public: - virtual void PrepareProcessingRequestLocked() = 0; - virtual void TearDownProcessingRequestLocked(const Error& error_from_process) = 0; - virtual Error ProcessRequestUnlocked(Request* request) = 0; - virtual bool ReadyProcessRequest() = 0; - virtual ~RequestHandler() = default; -}; - - -} -#endif //ASAPO_PRODUCER_REQUEST_HANDLER_H diff --git a/producer/api/src/request_handler_factory.cpp b/producer/api/src/request_handler_factory.cpp deleted file mode 100644 index 3fbd76d28..000000000 --- a/producer/api/src/request_handler_factory.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include "request_handler_factory.h" - -#include "request_handler_tcp.h" -#include "request_handler_filesystem.h" - - -namespace asapo { - -std::unique_ptr<RequestHandler> RequestHandlerFactory::NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter) { - switch (type_) { - case asapo::RequestHandlerType::kTcp: - return std::unique_ptr<RequestHandler> {new RequestHandlerTcp(discovery_service_, thread_id, shared_counter)}; - case asapo::RequestHandlerType::kFilesystem: - return std::unique_ptr<RequestHandler> {new RequestHandlerFilesystem(destination_folder_, thread_id)}; - - } - return nullptr; -} - -RequestHandlerFactory::RequestHandlerFactory(ReceiverDiscoveryService* discovery_service): type_{RequestHandlerType::kTcp}, - discovery_service_{discovery_service} { - if (discovery_service_) { - discovery_service_->StartCollectingData(); - } -} - -RequestHandlerFactory::RequestHandlerFactory(std::string destination_folder): type_{RequestHandlerType::kFilesystem}, - destination_folder_{std::move(destination_folder)} { -} - - -} \ No newline at end of file diff --git a/producer/api/src/request_handler_factory.h b/producer/api/src/request_handler_factory.h deleted file mode 100644 index 066f0d181..000000000 --- a/producer/api/src/request_handler_factory.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef ASAPO_REQUEST_HANDLER_FACTORY_H -#define ASAPO_REQUEST_HANDLER_FACTORY_H - -#include "request_handler.h" -#include "receiver_discovery_service.h" - -#include "preprocessor/definitions.h" - -namespace asapo { - -class RequestHandlerFactory { - public: - RequestHandlerFactory(ReceiverDiscoveryService* discovery_service); - RequestHandlerFactory(std::string destination_folder); - VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter); - private: - RequestHandlerType type_; - ReceiverDiscoveryService* discovery_service_{nullptr}; - std::string destination_folder_; -}; - - -} - -#endif //ASAPO_REQUEST_HANDLER_FACTORY_H diff --git a/producer/api/src/request_handler_filesystem.cpp b/producer/api/src/request_handler_filesystem.cpp index cca62163b..1d55f5f53 100644 --- a/producer/api/src/request_handler_filesystem.cpp +++ b/producer/api/src/request_handler_filesystem.cpp @@ -1,9 +1,13 @@ -#include "producer/producer_error.h" #include "request_handler_filesystem.h" + +#include <cstdint> + +#include "producer/producer_error.h" #include "producer_logger.h" #include "io/io_factory.h" -#include <cstdint> +#include "producer_request.h" + namespace asapo { @@ -13,16 +17,18 @@ RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folde } -Error RequestHandlerFilesystem::ProcessRequestUnlocked(Request* request) { - auto err = request->ReadDataFromFileIfNeeded(io__.get()); +Error RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request) { + auto producer_request = dynamic_cast<ProducerRequest*>(request); + + auto err = producer_request->ReadDataFromFileIfNeeded(io__.get()); if (err) { return err; } - err = io__->WriteDataToFile(destination_folder_, request->header.message, (uint8_t*)request->data.get(), + err = io__->WriteDataToFile(destination_folder_, request->header.message, (uint8_t*)producer_request->data.get(), request->header.data_size, true); - if (request->callback) { - request->callback(request->header, std::move(err)); + if (producer_request->callback) { + producer_request->callback(request->header, std::move(err)); } return nullptr; } diff --git a/producer/api/src/request_handler_filesystem.h b/producer/api/src/request_handler_filesystem.h index 607816cf8..88d81027c 100644 --- a/producer/api/src/request_handler_filesystem.h +++ b/producer/api/src/request_handler_filesystem.h @@ -7,7 +7,7 @@ #include "common/error.h" #include "producer/common.h" -#include "request_handler.h" +#include "request/request_handler.h" #include "logger/logger.h" using std::chrono::high_resolution_clock; @@ -17,7 +17,7 @@ namespace asapo { class RequestHandlerFilesystem: public RequestHandler { public: explicit RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id); - Error ProcessRequestUnlocked(Request* request) override; + Error ProcessRequestUnlocked(GenericRequest* request) override; bool ReadyProcessRequest() override { return true; }; diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index f5e271da1..c9234d8ae 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -2,7 +2,7 @@ #include "request_handler_tcp.h" #include "producer_logger.h" #include "io/io_factory.h" - +#include "producer_request.h" namespace asapo { @@ -48,7 +48,7 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const return nullptr; } -Error RequestHandlerTcp::SendHeaderAndData(const Request* request) { +Error RequestHandlerTcp::SendHeaderAndData(const ProducerRequest* request) { Error io_error; io__->Send(sd_, &(request->header), sizeof(request->header), &io_error); if(io_error) { @@ -85,7 +85,7 @@ Error RequestHandlerTcp::ReceiveResponse() { } } -Error RequestHandlerTcp::TrySendToReceiver(const Request* request) { +Error RequestHandlerTcp::TrySendToReceiver(const ProducerRequest* request) { auto err = SendHeaderAndData(request); if (err) { return err; @@ -158,7 +158,7 @@ bool RequestHandlerTcp::ServerError(const Error& err) { return err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse; } -Error RequestHandlerTcp::SendDataToOneOfTheReceivers(Request* request) { +Error RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request) { for (auto receiver_uri : receivers_list_) { if (Disconnected()) { auto err = ConnectToReceiver(request->beamtime_id, receiver_uri); @@ -182,8 +182,13 @@ Error RequestHandlerTcp::SendDataToOneOfTheReceivers(Request* request) { } -Error RequestHandlerTcp::ProcessRequestUnlocked(Request* request) { - auto err = request->ReadDataFromFileIfNeeded(io__.get()); +Error RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request) { + auto producer_request = dynamic_cast<ProducerRequest*>(request); + if (producer_request == nullptr) { + return ProducerErrorTemplates::kInternalServerError.Generate("dynamic cast to ProducerRequest"); + } + + auto err = producer_request->ReadDataFromFileIfNeeded(io__.get()); if (err) { return err; } @@ -191,7 +196,7 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(Request* request) { if (NeedRebalance()) { CloseConnectionToPeformRebalance(); } - return SendDataToOneOfTheReceivers(request); + return SendDataToOneOfTheReceivers(producer_request); } bool RequestHandlerTcp::Connected() { diff --git a/producer/api/src/request_handler_tcp.h b/producer/api/src/request_handler_tcp.h index dec7f4d4c..2b00f7f20 100644 --- a/producer/api/src/request_handler_tcp.h +++ b/producer/api/src/request_handler_tcp.h @@ -9,8 +9,8 @@ #include "common/networking.h" #include "producer/common.h" -#include "request_handler.h" - +#include "request/request_handler.h" +#include "producer_request.h" using std::chrono::high_resolution_clock; @@ -19,7 +19,7 @@ namespace asapo { class RequestHandlerTcp: public RequestHandler { public: explicit RequestHandlerTcp(ReceiverDiscoveryService* discovery_service, uint64_t thread_id, uint64_t* shared_counter); - Error ProcessRequestUnlocked(Request* request) override; + Error ProcessRequestUnlocked(GenericRequest* request) override; bool ReadyProcessRequest() override; void PrepareProcessingRequestLocked() override; void TearDownProcessingRequestLocked(const Error& error_from_process) override; @@ -31,10 +31,10 @@ class RequestHandlerTcp: public RequestHandler { private: Error Authorize(const std::string& beamtime_id); Error ConnectToReceiver(const std::string& beamtime_id, const std::string& receiver_address); - Error SendDataToOneOfTheReceivers(Request* request); - Error SendHeaderAndData(const Request*); + Error SendDataToOneOfTheReceivers(ProducerRequest* request); + Error SendHeaderAndData(const ProducerRequest*); Error ReceiveResponse(); - Error TrySendToReceiver(const Request* request); + Error TrySendToReceiver(const ProducerRequest* request); SocketDescriptor sd_{kDisconnectedSocketDescriptor}; void UpdateIfNewConnection(); bool UpdateReceiversList(); diff --git a/producer/api/unittests/mocking.h b/producer/api/unittests/mocking.h index 0ccb688c9..dc81bfe58 100644 --- a/producer/api/unittests/mocking.h +++ b/producer/api/unittests/mocking.h @@ -3,8 +3,8 @@ #include <gtest/gtest.h> -#include "../src/request_pool.h" -#include "../src/request_handler_factory.h" +#include "request/request_pool.h" +#include "request/request_handler_factory.h" #include "../src/receiver_discovery_service.h" namespace asapo { @@ -22,45 +22,22 @@ class MockDiscoveryService : public asapo::ReceiverDiscoveryService { } }; - class MockRequestPull : public RequestPool { public: - MockRequestPull(RequestHandlerFactory* request_handler_factory) : - RequestPool{1, request_handler_factory} {}; - asapo::Error AddRequest(std::unique_ptr<asapo::Request> request) override { + MockRequestPull(RequestHandlerFactory* request_handler_factory, AbstractLogger* log) : + RequestPool{1, request_handler_factory,log} {}; + asapo::Error AddRequest(std::unique_ptr<asapo::GenericRequest> request) override { if (request == nullptr) { return asapo::Error{AddRequest_t(nullptr)}; } return asapo::Error{AddRequest_t(request.get())}; } - MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (Request*)); -}; - - -class MockRequestHandler : public RequestHandler { - public: - - Error ProcessRequestUnlocked(Request* request) override { - return Error{ProcessRequestUnlocked_t(request)}; - } - void TearDownProcessingRequestLocked(const Error& error_from_process) override { - if (error_from_process) { - TearDownProcessingRequestLocked_t(error_from_process.get()); - } else { - TearDownProcessingRequestLocked_t(nullptr); - } - } - MOCK_METHOD0(PrepareProcessingRequestLocked, void()); - MOCK_METHOD0(ReadyProcessRequest, bool()); - MOCK_METHOD1(TearDownProcessingRequestLocked_t, void(ErrorInterface* error_from_process)); - MOCK_METHOD1(ProcessRequestUnlocked_t, ErrorInterface * (const Request*)); + MOCK_METHOD1(AddRequest_t, asapo::ErrorInterface * (GenericRequest*)); }; - } -using asapo::MockRequestHandler; using asapo::MockDiscoveryService; using asapo::MockRequestPull; diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index bfb2c164f..6152a5efd 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -7,7 +7,6 @@ #include "../src/producer_impl.h" #include "producer/producer_error.h" -#include "../src/request_pool.h" #include "../src/request_handler_tcp.h" #include "mocking.h" @@ -27,15 +26,16 @@ using ::testing::HasSubstr; using asapo::RequestPool; -using asapo::Request; +using asapo::ProducerRequest; MATCHER_P5(M_CheckSendDataRequest, op_code, beamtime_id, file_id, file_size, message, "Checks if a valid GenericRequestHeader was Send") { + auto request = dynamic_cast<ProducerRequest*>(arg); return ((asapo::GenericRequestHeader)(arg->header)).op_code == op_code && ((asapo::GenericRequestHeader)(arg->header)).data_id == file_id && ((asapo::GenericRequestHeader)(arg->header)).data_size == uint64_t(file_size) - && arg->beamtime_id == beamtime_id + && request->beamtime_id == beamtime_id && strcmp(((asapo::GenericRequestHeader)(arg->header)).message, message) == 0; } @@ -49,9 +49,9 @@ TEST(ProducerImpl, Constructor) { class ProducerImplTests : public testing::Test { public: testing::NiceMock<MockDiscoveryService> service; - asapo::RequestHandlerFactory factory{&service}; + asapo::ProducerRequestHandlerFactory factory{&service}; testing::NiceMock<asapo::MockLogger> mock_logger; - testing::NiceMock<MockRequestPull> mock_pull{&factory}; + testing::NiceMock<MockRequestPull> mock_pull{&factory,&mock_logger}; asapo::ProducerImpl producer{"", 1, asapo::RequestHandlerType::kTcp}; void SetUp() override { producer.log__ = &mock_logger; @@ -93,7 +93,7 @@ TEST_F(ProducerImplTests, OKSendingSendDataRequest) { std::string expected_beamtimeid = "beamtime_id"; producer.SetBeamtimeId(expected_beamtimeid); - Request request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, + ProducerRequest request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, nullptr, "", nullptr}; EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, @@ -113,7 +113,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) { std::string expected_fullpath = "filename"; producer.SetBeamtimeId(expected_beamtimeid); - Request request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, 0, expected_name}, + ProducerRequest request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, 0, expected_name}, nullptr, "", nullptr}; EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, diff --git a/producer/api/unittests/test_request_handler_factory.cpp b/producer/api/unittests/test_request_handler_factory.cpp index cbfc57d80..5091abad3 100644 --- a/producer/api/unittests/test_request_handler_factory.cpp +++ b/producer/api/unittests/test_request_handler_factory.cpp @@ -1,7 +1,7 @@ #include <gtest/gtest.h> #include <unittests/MockIO.h> -#include "../src/request_handler_factory.h" +#include "../src/producer_request_handler_factory.h" #include "../src/receiver_discovery_service.h" #include "../src/request_handler_tcp.h" #include "mocking.h" @@ -10,7 +10,7 @@ using ::testing::Ne; using ::testing::Eq; -using asapo:: RequestHandlerFactory; +using asapo::ProducerRequestHandlerFactory; namespace { @@ -19,7 +19,7 @@ TEST(CreateFactory, Tcp) { MockDiscoveryService mock_discovery; EXPECT_CALL(mock_discovery, StartCollectingData()); - RequestHandlerFactory factory{&mock_discovery}; + ProducerRequestHandlerFactory factory{&mock_discovery}; auto handler = factory.NewRequestHandler(1, nullptr); @@ -28,7 +28,7 @@ TEST(CreateFactory, Tcp) { } TEST(CreateFactory, Filesystem) { - RequestHandlerFactory factory{""}; + ProducerRequestHandlerFactory factory{""}; auto handler = factory.NewRequestHandler(1, nullptr); diff --git a/producer/api/unittests/test_request_handler_filesystem.cpp b/producer/api/unittests/test_request_handler_filesystem.cpp index a87a58506..c3f69e5a4 100644 --- a/producer/api/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/unittests/test_request_handler_filesystem.cpp @@ -10,6 +10,8 @@ #include "producer/producer_error.h" #include "../src/request_handler_filesystem.h" +#include "../src/producer_request.h" + #include "io/io_factory.h" namespace { @@ -53,14 +55,14 @@ class RequestHandlerFilesystemTests : public testing::Test { asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{"", header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::ProducerRequest request{"", header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::Request request_nocallback{"", header, nullptr, "", nullptr}; - asapo::Request request_filesend{"", header, nullptr, expected_origin_fullpath, nullptr}; + asapo::ProducerRequest request_nocallback{"", header, nullptr, "", nullptr}; + asapo::ProducerRequest request_filesend{"", header, nullptr, expected_origin_fullpath, nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 0b241c7e6..e8e18a2a1 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -61,17 +61,17 @@ class RequestHandlerTcpTests : public testing::Test { asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{expected_beamtime_id, header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; - asapo::Request request_filesend{expected_beamtime_id, header, nullptr, expected_origin_fullpath, nullptr}; + asapo::ProducerRequest request_filesend{expected_beamtime_id, header, nullptr, expected_origin_fullpath, nullptr}; - asapo::Request request_nocallback{expected_beamtime_id, header, nullptr, "", nullptr}; + asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, "", nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; uint64_t n_connections{0}; asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; diff --git a/receiver/src/data_cache.cpp b/receiver/src/data_cache.cpp index e26bca603..a8cf0e70b 100644 --- a/receiver/src/data_cache.cpp +++ b/receiver/src/data_cache.cpp @@ -125,7 +125,7 @@ bool DataCache::UnlockSlot(CacheMeta* meta) { return false; } std::lock_guard<std::mutex> lock{mutex_}; - meta->lock = std::max(0, (int)meta->lock - 1); + meta->lock = std::max(0, meta->lock - 1); return true; } diff --git a/receiver/src/data_cache.h b/receiver/src/data_cache.h index 8d6128548..24dd0d987 100644 --- a/receiver/src/data_cache.h +++ b/receiver/src/data_cache.h @@ -15,7 +15,7 @@ struct CacheMeta { uint64_t id; void* addr; uint64_t size; - uint lock; + int lock; }; class DataCache { diff --git a/receiver/src/request_handler.h b/receiver/src/request_handler.h index 49d4da20a..b02cca661 100644 --- a/receiver/src/request_handler.h +++ b/receiver/src/request_handler.h @@ -1,5 +1,5 @@ -#ifndef ASAPO_REQUEST_HANDLER_H -#define ASAPO_REQUEST_HANDLER_H +#ifndef ASAPO_RECEIVER_REQUEST_HANDLER_H +#define ASAPO_RECEIVER_REQUEST_HANDLER_H #include "receiver_error.h" #include "statistics.h" @@ -18,4 +18,4 @@ class RequestHandler { } -#endif //ASAPO_REQUEST_HANDLER_H +#endif //ASAPO_RECEIVER_REQUEST_HANDLER_H -- GitLab