Skip to content
Snippets Groups Projects
Commit d5e78984 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

refactor, add option to producer api to write to file instead of sending via network

parent d79aeb93
No related branches found
No related tags found
No related merge requests found
Showing
with 217 additions and 75 deletions
...@@ -2,15 +2,17 @@ ...@@ -2,15 +2,17 @@
#define ASAPO_COMMON__NETWORKING_H #define ASAPO_COMMON__NETWORKING_H
#include <cstdint> #include <cstdint>
#include <algorithm>
#include <string>
namespace asapo { namespace asapo {
typedef uint64_t NetworkRequestId; typedef uint64_t NetworkRequestId;
enum Opcode : uint8_t { enum Opcode : uint8_t {
kNetOpcodeUnknownOp, kOpcodeUnknownOp,
kNetOpcodeSendData, kOpcodeTransferData,
kNetOpcodeCount, kOpcodeCount,
}; };
enum NetworkErrorCode : uint16_t { enum NetworkErrorCode : uint16_t {
...@@ -27,10 +29,19 @@ enum NetworkErrorCode : uint16_t { ...@@ -27,10 +29,19 @@ enum NetworkErrorCode : uint16_t {
* RPC always return a response to a corresponding request * RPC always return a response to a corresponding request
* @{ * @{
*/ */
struct GenericNetworkRequestHeader {
Opcode op_code; const std::size_t kMaxFileNameSize = 1024;
struct GenericRequestHeader {
GenericRequestHeader(Opcode i_op_code = kOpcodeUnknownOp,uint64_t i_data_id = 0,
uint64_t i_data_size = 0,const std::string& i_file_name = ""):
op_code{i_op_code},data_id{i_data_id},data_size{i_data_size} {
auto size = std::min(i_file_name.size()+1,kMaxFileNameSize);
memcpy(file_name, i_file_name.c_str(), size);
}
Opcode op_code;
uint64_t data_id; uint64_t data_id;
uint64_t data_size; uint64_t data_size;
char file_name[kMaxFileNameSize];
}; };
struct GenericNetworkResponse { struct GenericNetworkResponse {
......
...@@ -92,6 +92,7 @@ class IO { ...@@ -92,6 +92,7 @@ class IO {
virtual size_t Write (FileDescriptor fd, const void* buf, size_t length, Error* err) const = 0; virtual size_t Write (FileDescriptor fd, const void* buf, size_t length, Error* err) const = 0;
virtual Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const = 0; virtual Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const = 0;
virtual Error WriteDataToFile (const std::string& fname, const uint8_t* data, size_t length) const = 0;
virtual void CreateNewDirectory (const std::string& directory_name, Error* err) const = 0; virtual void CreateNewDirectory (const std::string& directory_name, Error* err) const = 0;
virtual FileData GetDataFromFile (const std::string& fname, uint64_t fsize, Error* err) const = 0; virtual FileData GetDataFromFile (const std::string& fname, uint64_t fsize, Error* err) const = 0;
......
...@@ -178,7 +178,12 @@ class MockIO : public IO { ...@@ -178,7 +178,12 @@ class MockIO : public IO {
} }
MOCK_CONST_METHOD3(WriteDataToFile_t, ErrorInterface * (const std::string& fname, uint8_t* data, size_t fsize)); Error WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const override {
return Error{WriteDataToFile_t(fname, data, length)};
}
MOCK_CONST_METHOD3(WriteDataToFile_t, ErrorInterface * (const std::string& fname, const uint8_t* data, size_t fsize));
void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files,
Error* err) const override { Error* err) const override {
......
...@@ -122,20 +122,25 @@ void asapo::SystemIO::CreateNewDirectory(const std::string& directory_name, Erro ...@@ -122,20 +122,25 @@ void asapo::SystemIO::CreateNewDirectory(const std::string& directory_name, Erro
} }
} }
Error SystemIO::WriteDataToFile(const std::string& fname, const FileData& data, size_t length) const { Error SystemIO::WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const {
Error err; Error err;
auto fd = Open(fname, IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS | IO_OPEN_MODE_RW, &err); auto fd = Open(fname, IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS | IO_OPEN_MODE_RW, &err);
if (err) { if (err) {
return err; return err;
} }
Write(fd, data.get(), length, &err); Write(fd, data, length, &err);
if (err) { if (err) {
return err; return err;
} }
Close(fd, &err); Close(fd, &err);
return err; return err;
}
Error SystemIO::WriteDataToFile(const std::string& fname, const FileData& data, size_t length) const {
return WriteDataToFile(fname,data.get(),length);
} }
......
...@@ -102,7 +102,8 @@ class SystemIO final : public IO { ...@@ -102,7 +102,8 @@ class SystemIO final : public IO {
void CreateNewDirectory(const std::string& directory_name, Error* err) const; void CreateNewDirectory(const std::string& directory_name, Error* err) const;
FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const; FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const;
Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const; Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const;
void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, Error WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const;
void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files,
Error* err) const; Error* err) const;
std::string ReadFileToString(const std::string& fname, Error* err) const; std::string ReadFileToString(const std::string& fname, Error* err) const;
}; };
......
...@@ -11,17 +11,18 @@ ...@@ -11,17 +11,18 @@
using std::chrono::high_resolution_clock; using std::chrono::high_resolution_clock;
std::mutex mutex; std::mutex mutex;
int nfiles;
typedef std::tuple<std::string, size_t, uint64_t, uint64_t> ArgumentTuple; typedef std::tuple<std::string, size_t, uint64_t, uint64_t,uint64_t> ArgumentTuple;
ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) {
if (argc != 5) { if (argc != 6) {
std::cout << std::cout <<
"Usage: " << argv[0] << " <receiver_address> <number_of_byte> <iterations> <nthreads>" "Usage: " << argv[0] << " <destination> <number_of_byte> <iterations> <nthreads> <mode 0 -t tcp, 1 - filesystem>"
<< std::endl; << std::endl;
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
try { try {
return ArgumentTuple(argv[1], std::stoull(argv[2]), std::stoull(argv[3]), std::stoull(argv[4])); return ArgumentTuple(argv[1], std::stoull(argv[2]), std::stoull(argv[3]), std::stoull(argv[4]),std::stoull(argv[5]));
} catch(std::exception& e) { } catch(std::exception& e) {
std::cerr << "Fail to parse arguments" << std::endl; std::cerr << "Fail to parse arguments" << std::endl;
std::cerr << e.what() << std::endl; std::cerr << e.what() << std::endl;
...@@ -29,13 +30,16 @@ ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) { ...@@ -29,13 +30,16 @@ ArgumentTuple ProcessCommandArguments(int argc, char* argv[]) {
} }
} }
void work(asapo::GenericNetworkRequestHeader header, asapo::Error err) { void work(asapo::GenericRequestHeader header, asapo::Error err) {
mutex.lock(); mutex.lock();
nfiles--;
if (err) { if (err) {
std::cerr << "File was not successfully send: " << err << std::endl; std::cerr << "File was not successfully send: " << err << std::endl;
nfiles = 0;
mutex.unlock();
return; return;
} }
std::cerr << "File was successfully send." << header.data_id << std::endl; // std::cerr << "File was successfully send." << header.data_id << std::endl;
mutex.unlock(); mutex.unlock();
} }
...@@ -44,7 +48,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it ...@@ -44,7 +48,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
for(uint64_t i = 0; i < iterations; i++) { for(uint64_t i = 0; i < iterations; i++) {
// std::cerr << "Send file " << i + 1 << "/" << iterations << std::endl; // std::cerr << "Send file " << i + 1 << "/" << iterations << std::endl;
auto err = producer->Send(i + 1, buffer.get(), number_of_byte, &work); auto err = producer->Send(i + 1, buffer.get(), number_of_byte,std::to_string(i), &work);
if (err) { if (err) {
std::cerr << "Cannot send file: " << err << std::endl; std::cerr << "Cannot send file: " << err << std::endl;
return false; return false;
...@@ -59,16 +63,26 @@ int main (int argc, char* argv[]) { ...@@ -59,16 +63,26 @@ int main (int argc, char* argv[]) {
size_t number_of_kbytes; size_t number_of_kbytes;
uint64_t iterations; uint64_t iterations;
uint64_t nthreads; uint64_t nthreads;
std::tie(receiver_address, number_of_kbytes, iterations, nthreads) = ProcessCommandArguments(argc, argv); uint64_t mode;
std::tie(receiver_address, number_of_kbytes, iterations, nthreads,mode) = ProcessCommandArguments(argc, argv);
std::cout << "receiver_address: " << receiver_address << std::endl std::cout << "receiver_address: " << receiver_address << std::endl
<< "Package size: " << number_of_kbytes << "k" << std::endl << "Package size: " << number_of_kbytes << "k" << std::endl
<< "iterations: " << iterations << std::endl << "iterations: " << iterations << std::endl
<< "nthreads: " << nthreads << std::endl << "nthreads: " << nthreads << std::endl
<< std::endl; << "mode: " << mode << std::endl
<< std::endl;
nfiles = iterations;
asapo::Error err; asapo::Error err;
auto producer = asapo::Producer::Create(receiver_address, nthreads, &err); std::unique_ptr<asapo::Producer> producer;
if (mode == 0) {
producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kTcp,&err);
} else {
producer = asapo::Producer::Create(receiver_address, nthreads, asapo::RequestHandlerType::kFilesystem,&err);
}
producer->EnableLocalLog(true); producer->EnableLocalLog(true);
producer->SetLogLevel(asapo::LogLevel::Debug); producer->SetLogLevel(asapo::LogLevel::Debug);
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
...@@ -82,6 +96,16 @@ int main (int argc, char* argv[]) { ...@@ -82,6 +96,16 @@ int main (int argc, char* argv[]) {
if(!SendDummyData(producer.get(), number_of_kbytes * 1024, iterations)) { if(!SendDummyData(producer.get(), number_of_kbytes * 1024, iterations)) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
while (true) {
mutex.lock();
if (nfiles <= 0) {
mutex.unlock();
break;
}
mutex.unlock();
}
high_resolution_clock::time_point t2 = high_resolution_clock::now(); high_resolution_clock::time_point t2 = high_resolution_clock::now();
double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - t1 ).count() / 1000.0; double duration_sec = std::chrono::duration_cast<std::chrono::milliseconds>( t2 - t1 ).count() / 1000.0;
double size_gb = double(number_of_kbytes) * iterations / 1024.0 / 1024.0 * 8.0; double size_gb = double(number_of_kbytes) * iterations / 1024.0 / 1024.0 * 8.0;
......
...@@ -4,9 +4,11 @@ set(SOURCE_FILES ...@@ -4,9 +4,11 @@ set(SOURCE_FILES
src/producer_impl.cpp src/producer_impl.cpp
src/producer_logger.cpp src/producer_logger.cpp
src/request_handler_tcp.cpp src/request_handler_tcp.cpp
src/request_pool.cpp src/receiver_discovery_service.cpp src/request_handler_filesystem.cpp
src/request_pool.cpp
src/receiver_discovery_service.cpp
src/request_handler_factory.cpp src/request_handler_factory.cpp
src/request.cpp) src/request.cpp include/producer/common.h)
################################ ################################
...@@ -24,6 +26,7 @@ set(TEST_SOURCE_FILES ...@@ -24,6 +26,7 @@ set(TEST_SOURCE_FILES
unittests/test_producer_impl.cpp unittests/test_producer_impl.cpp
unittests/test_producer.cpp unittests/test_producer.cpp
unittests/test_request_handler_tcp.cpp unittests/test_request_handler_tcp.cpp
unittests/test_request_handler_filesystem.cpp
unittests/test_request_pool.cpp unittests/test_request_pool.cpp
unittests/test_receiver_discovery_service.cpp unittests/test_receiver_discovery_service.cpp
unittests/test_request_handler_factory.cpp unittests/test_request_handler_factory.cpp
......
#ifndef ASAPO_PRODUCER_COMMON_H
#define ASAPO_PRODUCER_COMMON_H
#include <cstdint>
#include <functional>
#include "common/networking.h"
#include "common/error.h"
namespace asapo {
const uint8_t kMaxProcessingThreads = 32;
using RequestCallback = std::function<void(GenericRequestHeader, Error)>;
enum class RequestHandlerType {
kTcp,
kFilesystem
};
}
#endif //ASAPO_PRODUCER_COMMON_H
...@@ -3,23 +3,12 @@ ...@@ -3,23 +3,12 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <functional>
#include "common/networking.h"
#include "producer_error.h"
#include "logger/logger.h" #include "logger/logger.h"
#include "producer/common.h"
namespace asapo { namespace asapo {
enum class ProducerStatus {
kDisconnected,
kConnected,
};
const uint8_t kMaxProcessingThreads = 32;
using RequestCallback = std::function<void(GenericNetworkRequestHeader, Error)>;
class Producer { class Producer {
public: public:
...@@ -27,7 +16,8 @@ class Producer { ...@@ -27,7 +16,8 @@ class Producer {
/*! /*!
* @return A unique_ptr to a new producer instance * @return A unique_ptr to a new producer instance
*/ */
static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, Error* err); static std::unique_ptr<Producer> Create(const std::string& endpoint, uint8_t n_processing_threads, asapo::RequestHandlerType type,
Error* err);
virtual ~Producer() = default; virtual ~Producer() = default;
...@@ -38,7 +28,7 @@ class Producer { ...@@ -38,7 +28,7 @@ class Producer {
\param file_size - The size of the data. \param file_size - The size of the data.
\return Error - Will be nullptr on success \return Error - Will be nullptr on success
*/ */
virtual Error Send(uint64_t file_id, const void* data, size_t file_size, RequestCallback callback) = 0; virtual Error Send(uint64_t file_id, const void* data, size_t file_size,std::string file_name, RequestCallback callback) = 0;
//! Set internal log level //! Set internal log level
virtual void SetLogLevel(LogLevel level) = 0; virtual void SetLogLevel(LogLevel level) = 0;
//! Enables/Disables logs output to stdout //! Enables/Disables logs output to stdout
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#include "producer_impl.h" #include "producer_impl.h"
std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads, std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endpoint, uint8_t n_processing_threads,
Error* err) { asapo::RequestHandlerType type, Error* err) {
if (n_processing_threads > kMaxProcessingThreads) { if (n_processing_threads > kMaxProcessingThreads) {
*err = TextError("Too many processing threads: " + std::to_string(n_processing_threads)); *err = TextError("Too many processing threads: " + std::to_string(n_processing_threads));
return nullptr; return nullptr;
...@@ -10,7 +10,7 @@ std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endp ...@@ -10,7 +10,7 @@ std::unique_ptr<asapo::Producer> asapo::Producer::Create(const std::string& endp
try { try {
*err = nullptr; *err = nullptr;
return std::unique_ptr<asapo::Producer>(new ProducerImpl(endpoint, n_processing_threads)); return std::unique_ptr<asapo::Producer>(new ProducerImpl(endpoint, n_processing_threads,type));
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
*err = TextError(ex.what()); *err = TextError(ex.what());
return nullptr; return nullptr;
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "producer_impl.h" #include "producer_impl.h"
#include "producer_logger.h" #include "producer_logger.h"
#include "io/io_factory.h" #include "io/io_factory.h"
#include "producer/producer_error.h"
namespace asapo { namespace asapo {
...@@ -11,21 +12,27 @@ const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t( ...@@ -11,21 +12,27 @@ const size_t ProducerImpl::kMaxChunkSize = size_t(1024) * size_t(1024) * size_t(
const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s const size_t ProducerImpl::kDiscoveryServiceUpdateFrequencyMs = 10000; // 10s
ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads): log__{GetDefaultProducerLogger()} { ProducerImpl::ProducerImpl(std::string endpoint, uint8_t n_processing_threads,asapo::RequestHandlerType type):
discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs}); log__{GetDefaultProducerLogger()} {
request_handler_factory_.reset(new RequestHandlerFactory{RequestHandlerType::kTcp, discovery_service_.get()}); switch (type) {
case RequestHandlerType::kTcp:
discovery_service_.reset(new ReceiverDiscoveryService{endpoint, ProducerImpl::kDiscoveryServiceUpdateFrequencyMs});
request_handler_factory_.reset(new RequestHandlerFactory{discovery_service_.get()});
break;
case RequestHandlerType::kFilesystem:
request_handler_factory_.reset(nullptr);
request_handler_factory_.reset(new RequestHandlerFactory{endpoint});
}
request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get()}); request_pool__.reset(new RequestPool{n_processing_threads, request_handler_factory_.get()});
} }
GenericNetworkRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, size_t file_size) { GenericRequestHeader ProducerImpl::GenerateNextSendRequest(uint64_t file_id, size_t file_size,std::string file_name) {
GenericNetworkRequestHeader request; GenericRequestHeader request{kOpcodeTransferData,file_id,file_size,std::move(file_name)};
request.op_code = kNetOpcodeSendData;
request.data_id = file_id;
request.data_size = file_size;
return request; return request;
} }
Error CheckProducerRequest(const GenericNetworkRequestHeader header) { Error CheckProducerRequest(const GenericRequestHeader header) {
if (header.data_size > ProducerImpl::kMaxChunkSize) { if (header.data_size > ProducerImpl::kMaxChunkSize) {
return ProducerErrorTemplates::kFileTooLarge.Generate(); return ProducerErrorTemplates::kFileTooLarge.Generate();
} }
...@@ -34,8 +41,8 @@ Error CheckProducerRequest(const GenericNetworkRequestHeader header) { ...@@ -34,8 +41,8 @@ Error CheckProducerRequest(const GenericNetworkRequestHeader header) {
} }
Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size, RequestCallback callback) { Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size,std::string file_name,RequestCallback callback) {
auto request_header = GenerateNextSendRequest(file_id, file_size); auto request_header = GenerateNextSendRequest(file_id, file_size,std::move(file_name));
auto err = CheckProducerRequest(request_header); auto err = CheckProducerRequest(request_header);
if (err) { if (err) {
......
...@@ -21,21 +21,21 @@ class ProducerImpl : public Producer { ...@@ -21,21 +21,21 @@ class ProducerImpl : public Producer {
static const size_t kMaxChunkSize; static const size_t kMaxChunkSize;
static const size_t kDiscoveryServiceUpdateFrequencyMs; static const size_t kDiscoveryServiceUpdateFrequencyMs;
explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads); explicit ProducerImpl(std::string endpoint, uint8_t n_processing_threads,asapo::RequestHandlerType type);
ProducerImpl(const ProducerImpl&) = delete; ProducerImpl(const ProducerImpl&) = delete;
ProducerImpl& operator=(const ProducerImpl&) = delete; ProducerImpl& operator=(const ProducerImpl&) = delete;
void SetLogLevel(LogLevel level) override; void SetLogLevel(LogLevel level) override;
void EnableLocalLog(bool enable) override; void EnableLocalLog(bool enable) override;
void EnableRemoteLog(bool enable) override; void EnableRemoteLog(bool enable) override;
Error Send(uint64_t file_id, const void* data, size_t file_size, RequestCallback callback) override; Error Send(uint64_t file_id, const void* data, size_t file_size,std::string file_name, RequestCallback callback) override;
AbstractLogger* log__; AbstractLogger* log__;
std::unique_ptr<RequestPool> request_pool__; std::unique_ptr<RequestPool> request_pool__;
private: private:
GenericNetworkRequestHeader GenerateNextSendRequest(uint64_t file_id, size_t file_size); GenericRequestHeader GenerateNextSendRequest(uint64_t file_id, size_t file_size,std::string file_name);
}; };
Error CheckProducerRequest(const GenericNetworkRequestHeader header); Error CheckProducerRequest(const GenericRequestHeader header);
} }
#endif //ASAPO_PRODUCER__PRODUCER_IMPL_H #endif //ASAPO_PRODUCER__PRODUCER_IMPL_H
...@@ -2,12 +2,12 @@ ...@@ -2,12 +2,12 @@
#define ASAPO_PRODUCER_REQUEST_H #define ASAPO_PRODUCER_REQUEST_H
#include "common/networking.h" #include "common/networking.h"
#include "producer/producer.h" #include "producer/common.h"
namespace asapo { namespace asapo {
struct Request { struct Request {
GenericNetworkRequestHeader header; GenericRequestHeader header;
const void* data; const void* data;
RequestCallback callback; RequestCallback callback;
}; };
......
...@@ -6,11 +6,6 @@ ...@@ -6,11 +6,6 @@
#include "common/error.h" #include "common/error.h"
#include "request.h" #include "request.h"
#ifdef UNIT_TESTS
#define VIRTUAL virtual
#endif
namespace asapo { namespace asapo {
class RequestHandler { class RequestHandler {
......
#include "request_handler_factory.h" #include "request_handler_factory.h"
#include "request_handler_tcp.h" #include "request_handler_tcp.h"
#include "request_handler_filesystem.h"
namespace asapo { namespace asapo {
...@@ -8,18 +10,22 @@ std::unique_ptr<RequestHandler> RequestHandlerFactory::NewRequestHandler(uint64_ ...@@ -8,18 +10,22 @@ std::unique_ptr<RequestHandler> RequestHandlerFactory::NewRequestHandler(uint64_
switch (type_) { switch (type_) {
case asapo::RequestHandlerType::kTcp: case asapo::RequestHandlerType::kTcp:
return std::unique_ptr<RequestHandler> {new RequestHandlerTcp(discovery_service_, thread_id, shared_counter)}; return std::unique_ptr<RequestHandler> {new RequestHandlerTcp(discovery_service_, thread_id, shared_counter)};
case asapo::RequestHandlerType::kFilesystem:
return std::unique_ptr<RequestHandler> {new RequestHandlerFilesystem(destination_folder_, thread_id)};
} }
return nullptr; return nullptr;
} }
RequestHandlerFactory::RequestHandlerFactory(RequestHandlerType type, RequestHandlerFactory::RequestHandlerFactory(ReceiverDiscoveryService* discovery_service): type_{RequestHandlerType::kTcp},
ReceiverDiscoveryService* discovery_service): type_{type},
discovery_service_{discovery_service} { discovery_service_{discovery_service} {
if (discovery_service_) { if (discovery_service_) {
discovery_service_->StartCollectingData(); discovery_service_->StartCollectingData();
} }
}
RequestHandlerFactory::RequestHandlerFactory(std::string destination_folder): type_{RequestHandlerType::kFilesystem},
destination_folder_{std::move(destination_folder)} {
} }
......
...@@ -11,18 +11,16 @@ namespace asapo { ...@@ -11,18 +11,16 @@ namespace asapo {
#define VIRTUAL virtual #define VIRTUAL virtual
#endif #endif
enum class RequestHandlerType {
kTcp,
kFilesystem
};
class RequestHandlerFactory { class RequestHandlerFactory {
public: public:
RequestHandlerFactory(RequestHandlerType type, ReceiverDiscoveryService* discovery_service); RequestHandlerFactory(ReceiverDiscoveryService* discovery_service);
VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter); RequestHandlerFactory(std::string destination_folder);
VIRTUAL std::unique_ptr<RequestHandler> NewRequestHandler(uint64_t thread_id, uint64_t* shared_counter);
private: private:
RequestHandlerType type_; RequestHandlerType type_;
ReceiverDiscoveryService* discovery_service_; ReceiverDiscoveryService* discovery_service_{nullptr};
std::string destination_folder_;
}; };
......
#include "producer/producer_error.h"
#include "request_handler_filesystem.h"
#include "producer_logger.h"
#include "io/io_factory.h"
#include <cstdint>
namespace asapo {
RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id):
io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()}, destination_folder_{std::move(destination_folder)}, thread_id_{thread_id}
{
}
Error RequestHandlerFilesystem::ProcessRequestUnlocked(const Request* request) {
std::string fullpath = destination_folder_ + "/"+request->header.file_name+".bin";
auto err = io__->WriteDataToFile(fullpath,(uint8_t*)request->data,request->header.data_size);
if (request->callback) {
request->callback(request->header, std::move(err));
}
return nullptr;
}
}
#ifndef ASAPO_REQUEST_HANDLER_FILESYSTEM_H
#define ASAPO_REQUEST_HANDLER_FILESYSTEM_H
#include <chrono>
#include "io/io.h"
#include "common/error.h"
#include "producer/common.h"
#include "request_handler.h"
#include "logger/logger.h"
using std::chrono::high_resolution_clock;
namespace asapo {
class RequestHandlerFilesystem: public RequestHandler {
public:
explicit RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id);
Error ProcessRequestUnlocked(const Request* request) override;
bool ReadyProcessRequest() override {return true;};
void PrepareProcessingRequestLocked() override {};
void TearDownProcessingRequestLocked(const Error& error_from_process) override {};
virtual ~RequestHandlerFilesystem() = default;
std::unique_ptr<IO> io__;
const AbstractLogger* log__;
private:
std::string destination_folder_;
uint64_t thread_id_;
};
}
#endif //ASAPO_REQUEST_HANDLER_FILESYSTEM_H
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
#include "receiver_discovery_service.h" #include "receiver_discovery_service.h"
#include "common/networking.h" #include "common/networking.h"
#include "producer/producer.h" #include "producer/common.h"
#include "request_handler.h" #include "request_handler.h"
......
...@@ -3,30 +3,39 @@ ...@@ -3,30 +3,39 @@
#include "producer/producer.h" #include "producer/producer.h"
#include "../src/producer_impl.h" #include "../src/producer_impl.h"
using ::testing::Ne; using ::testing::Ne;
using ::testing::Eq; using ::testing::Eq;
namespace { namespace {
TEST(CreateProducer, PointerIsNotNullptr) { TEST(CreateProducer, TcpProducer) {
asapo::Error err; asapo::Error err;
std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4, &err); std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint",4,asapo::RequestHandlerType::kTcp, &err);
ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr));
ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(err, Eq(nullptr));
}
TEST(CreateProducer, FileSystemProducer) {
asapo::Error err;
std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("endpoint", 4,asapo::RequestHandlerType::kFilesystem, &err);
ASSERT_THAT(dynamic_cast<asapo::ProducerImpl*>(producer.get()), Ne(nullptr));
ASSERT_THAT(err, Eq(nullptr));
} }
TEST(CreateProducer, TooManyThreads) { TEST(CreateProducer, TooManyThreads) {
asapo::Error err; asapo::Error err;
std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1, &err); std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("", asapo::kMaxProcessingThreads + 1,
asapo::RequestHandlerType::kTcp, &err);
ASSERT_THAT(producer, Eq(nullptr)); ASSERT_THAT(producer, Eq(nullptr));
ASSERT_THAT(err, Ne(nullptr)); ASSERT_THAT(err, Ne(nullptr));
} }
TEST(Producer, SimpleWorkflowWihoutConnection) { TEST(Producer, SimpleWorkflowWihoutConnection) {
asapo::Error err; asapo::Error err;
std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, &err); std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp,&err);
auto err_send = producer->Send(1, nullptr, 1, nullptr); auto err_send = producer->Send(1, nullptr, 1, "",nullptr);
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
ASSERT_THAT(producer, Ne(nullptr)); ASSERT_THAT(producer, Ne(nullptr));
ASSERT_THAT(err, Eq(nullptr)); ASSERT_THAT(err, Eq(nullptr));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment