diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c32a77ca020ad1bc7d84c1469cc24aee94ec227..65114f8c551b076b82f027d86194aecb8166e56c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -61,7 +61,7 @@ if (BUILD_BROKER)#TODO: Somehow make it clear that this is needed by examples/wo add_subdirectory(broker) endif() -add_subdirectory(producer/api) +add_subdirectory(producer) add_subdirectory(worker) diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 87ca3582c44018c2cd66a7838acfbbac195a301c..2c8c4f2fd592302382882909bb3064fd78d2ee74 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -71,10 +71,17 @@ void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { mutex.unlock(); } -bool SendDummyData(asapo::Producer* producer, uint8_t* data, size_t number_of_byte, uint64_t iterations) { +asapo::FileData CreateMemoryBuffer(size_t size) { + return asapo::FileData(new uint8_t[size]); +} + + +bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations) { for(uint64_t i = 0; i < iterations; i++) { - auto err = producer->Send(i + 1, data, number_of_byte, std::to_string(i), &ProcessAfterSend); + auto buffer = CreateMemoryBuffer(number_of_byte); + asapo::EventHeader event_header{i + 1, number_of_byte, std::to_string(i)}; + auto err = producer->Send(event_header, std::move(buffer), &ProcessAfterSend); if (err) { std::cerr << "Cannot send file: " << err << std::endl; return false; @@ -127,9 +134,6 @@ void PrintOutput(const Args& args, const high_resolution_clock::time_point& star } -std::unique_ptr<uint8_t> CreateMemoryBuffer(const Args& args) { - return std::unique_ptr<uint8_t>(new uint8_t[args.number_of_bytes]); -} int main (int argc, char* argv[]) { Args args; @@ -139,11 +143,9 @@ int main (int argc, char* argv[]) { iterations_remained = args.iterations; - auto buffer = CreateMemoryBuffer(args); - high_resolution_clock::time_point start_time = high_resolution_clock::now(); - if(!SendDummyData(producer.get(), buffer.get(), args.number_of_bytes, args.iterations)) { + if(!SendDummyData(producer.get(), args.number_of_bytes, args.iterations)) { return EXIT_FAILURE; } diff --git a/producer/CMakeLists.txt b/producer/CMakeLists.txt index 53d257b04ea6ffc4b663736b9d8bd559dc341222..894680120f4f58a380ad1513d91e226e085d08e2 100644 --- a/producer/CMakeLists.txt +++ b/producer/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(api) +add_subdirectory(folder_monitor_producer) diff --git a/producer/api/include/producer/common.h b/producer/api/include/producer/common.h index 4c0cb77926621fcde292485504d40cbc7ccf3fc7..90b8a954b6647c1c7013c6aabf75f8c32139e222 100644 --- a/producer/api/include/producer/common.h +++ b/producer/api/include/producer/common.h @@ -19,6 +19,12 @@ enum class RequestHandlerType { }; +struct EventHeader { + uint64_t file_id; + size_t file_size; + std::string file_name; +}; + } #endif //ASAPO_PRODUCER_COMMON_H diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index 8b9cc16d6b984353b0aae3ec9568ba4dc5f526fd..ecded5dd8c3bfb3d73f9a0ff49aaf515b187cc37 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -6,6 +6,7 @@ #include "logger/logger.h" #include "producer/common.h" +#include "common/data_structs.h" namespace asapo { @@ -29,8 +30,8 @@ class Producer { \param file_size - The size of the data. \return Error - Will be nullptr on success */ - virtual Error Send(uint64_t file_id, const void* data, size_t file_size, std::string file_name, - RequestCallback callback) = 0; + virtual Error Send(const EventHeader& event_header, FileData data, RequestCallback callback) = 0; + //! Set internal log level virtual void SetLogLevel(LogLevel level) = 0; //! Enables/Disables logs output to stdout diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index fb592648069422d9d914fc587b087d26a4cb6d45..7c53a0bf735528e83cfe3cea3850774223317ab9 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -45,20 +45,20 @@ Error CheckProducerRequest(size_t file_size, size_t filename_size) { } -Error ProducerImpl::Send(uint64_t file_id, const void* data, size_t file_size, std::string file_name, +Error ProducerImpl::Send(const EventHeader& event_header, FileData data, RequestCallback callback) { - auto err = CheckProducerRequest(file_size, file_name.size()); + auto err = CheckProducerRequest(event_header.file_size, event_header.file_name.size()); if (err) { log__->Error("error checking request - " + err->Explain()); return err; } - auto request_header = GenerateNextSendRequest(file_id, file_size, std::move(file_name)); + auto request_header = GenerateNextSendRequest(event_header.file_id, event_header.file_size, event_header.file_name); - return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{beamtime_id_, request_header, data, callback}}); + return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{beamtime_id_, request_header, std::move(data), callback}}); } void ProducerImpl::SetLogLevel(LogLevel level) { diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index e76b93c27b65eb8a3f8fb5595c1a78bbe08a4162..f2359052ec0d580e6924498f66b688b468dca509 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -28,8 +28,7 @@ class ProducerImpl : public Producer { void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error Send(uint64_t file_id, const void* data, size_t file_size, std::string file_name, - RequestCallback callback) override; + Error Send(const EventHeader& event_header, FileData data, RequestCallback callback) override; AbstractLogger* log__; std::unique_ptr<RequestPool> request_pool__; Error SetBeamtimeId(std::string beamtime_id) override; diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp index 9f256f7803bd760910e05d068b5b21465f910981..2b671dfc4fd7e04854bd0f1709b1e4ede22123c9 100644 --- a/producer/api/src/request.cpp +++ b/producer/api/src/request.cpp @@ -1,5 +1 @@ -// -// Created by yakubov on 17/05/18. -// - #include "request.h" diff --git a/producer/api/src/request.h b/producer/api/src/request.h index 59c7589f9c93011d85a730b46bf3594ef6868c74..b522385a32bae06f7551cb8ea1bdd4a68790c3c1 100644 --- a/producer/api/src/request.h +++ b/producer/api/src/request.h @@ -3,13 +3,14 @@ #include "common/networking.h" #include "producer/common.h" +#include "common/data_structs.h" namespace asapo { struct Request { std::string beamtime_id; GenericRequestHeader header; - const void* data; + FileData data; RequestCallback callback; }; diff --git a/producer/api/src/request_handler_filesystem.cpp b/producer/api/src/request_handler_filesystem.cpp index e43acd01fc0ab9efe18b22b40674bf9cb9775c3d..e557c2645c9d1426cefc19f7a00132a53e2389e5 100644 --- a/producer/api/src/request_handler_filesystem.cpp +++ b/producer/api/src/request_handler_filesystem.cpp @@ -15,7 +15,7 @@ RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folde Error RequestHandlerFilesystem::ProcessRequestUnlocked(const Request* request) { std::string fullpath = destination_folder_ + "/" + request->header.message + ".bin"; - auto err = io__->WriteDataToFile(fullpath, (uint8_t*)request->data, request->header.data_size); + auto err = io__->WriteDataToFile(fullpath, (uint8_t*)request->data.get(), request->header.data_size); if (request->callback) { request->callback(request->header, std::move(err)); } diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index 8aad81b27323386c831fd74d1e816f904147c5ef..e82af41d319bb6f132913065650da6cf8ac03348 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -55,7 +55,7 @@ Error RequestHandlerTcp::SendHeaderAndData(const Request* request) { return io_error; } - io__->Send(sd_, request->data, request->header.data_size, &io_error); + io__->Send(sd_, (void*) request->data.get(), request->header.data_size, &io_error); if(io_error) { return io_error; } diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp index f15706983f5c6a8e83ca5bcc096cfab3427b93d5..ee69cb8eb8476b6a0bce7606e639a260d424925b 100644 --- a/producer/api/unittests/test_producer.cpp +++ b/producer/api/unittests/test_producer.cpp @@ -49,7 +49,10 @@ TEST(Producer, SimpleWorkflowWihoutConnection) { asapo::Error err; std::unique_ptr<asapo::Producer> producer = asapo::Producer::Create("hello", 5, asapo::RequestHandlerType::kTcp, "bt", &err); - auto err_send = producer->Send(1, nullptr, 1, "", nullptr); + + asapo::EventHeader event_header{1, 1, ""}; + auto err_send = producer->Send(event_header, nullptr, nullptr); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); ASSERT_THAT(err, Eq(nullptr)); diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index 10e0d5f37056950486c057258e9cd7331ba436d6..16824cf7e5d1e5cc29fd63d53ed2fbb06e561393 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -65,22 +65,23 @@ class ProducerImplTests : public testing::Test { TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); - auto err = producer.Send(1, nullptr, 1, "", nullptr); + asapo::EventHeader event_header{1, 1, ""}; + auto err = producer.Send(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); - auto err = producer.Send(1, nullptr, 1, long_string, nullptr); + asapo::EventHeader event_header{1, 1, long_string}; + auto err = producer.Send(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong)); } TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("error checking"))); - - auto err = producer.Send(1, nullptr, asapo::ProducerImpl::kMaxChunkSize + 1, "", nullptr); - + asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize + 1, ""}; + auto err = producer.Send(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } @@ -98,7 +99,8 @@ TEST_F(ProducerImplTests, OKSendingRequest) { expected_beamtimeid, expected_id, expected_size, expected_name))).WillOnce(Return( nullptr)); - auto err = producer.Send(expected_id, nullptr, expected_size, expected_name, nullptr); + asapo::EventHeader event_header{expected_id, expected_size, expected_name}; + auto err = producer.Send(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/producer/api/unittests/test_request_handler_filesystem.cpp b/producer/api/unittests/test_request_handler_filesystem.cpp index eea5ab2b11340e5a7611ed6f48d9b43a1a4aa494..6082d7dc93fd7c233e949dfb2282e9aeb86e64b1 100644 --- a/producer/api/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/unittests/test_request_handler_filesystem.cpp @@ -47,18 +47,17 @@ class RequestHandlerFilesystemTests : public testing::Test { std::string expected_destination = "destination"; std::string expected_fullpath = expected_destination + "/" + expected_file_name + ".bin"; asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; - uint8_t* expected_data_pointer = (uint8_t*)0xC00FE; asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{"", header, expected_data_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::Request request{"", header, nullptr, [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::Request request_nocallback{"", header, expected_data_pointer, nullptr}; + asapo::Request request_nocallback{"", header, nullptr, nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; asapo::RequestHandlerFilesystem request_handler{expected_destination, expected_thread_id}; @@ -85,7 +84,7 @@ MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, } TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, expected_data_pointer, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, nullptr, expected_file_size)) .WillOnce( Return( asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) @@ -100,7 +99,7 @@ TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { } TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, expected_data_pointer, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, nullptr, expected_file_size)) .WillOnce( Return( asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) @@ -116,7 +115,7 @@ TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { TEST_F(RequestHandlerFilesystemTests, TransferOK) { - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, expected_data_pointer, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, nullptr, expected_file_size)) .WillOnce( Return( nullptr) diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index c335f7ae07e50ce8d6e5734678cf1ec96209e990..383dd0066f23b9747b1ba5e62c6acbba6fee94ca 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -57,18 +57,17 @@ class RequestHandlerTcpTests : public testing::Test { uint64_t expected_thread_id = 2; asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; - void* expected_file_pointer = (void*)0xC00FE; asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{expected_beamtime_id, header, expected_file_pointer, [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::Request request{expected_beamtime_id, header, nullptr, [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::Request request_nocallback{expected_beamtime_id, header, expected_file_pointer, nullptr}; + asapo::Request request_nocallback{expected_beamtime_id, header, nullptr, nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; uint64_t n_connections{0}; asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; @@ -244,7 +243,7 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) { void RequestHandlerTcpTests::ExpectFailSendData(bool only_once) { int i = 0; for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _)) + EXPECT_CALL(mock_io, Send_t(expected_sd, nullptr, expected_file_size, _)) .Times(1) .WillOnce( DoAll( @@ -302,7 +301,7 @@ void RequestHandlerTcpTests::ExpectFailReceive(bool only_once) { void RequestHandlerTcpTests::ExpectOKSendData(bool only_once) { for (auto expected_sd : expected_sds) { - EXPECT_CALL(mock_io, Send_t(expected_sd, expected_file_pointer, expected_file_size, _)) + EXPECT_CALL(mock_io, Send_t(expected_sd, nullptr, expected_file_size, _)) .Times(1) .WillOnce( DoAll( diff --git a/producer/folder_monitor_producer/CMakeLists.txt b/producer/folder_monitor_producer/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..258fc6a8a7d46b70bcc745ad8794dc52b5b05418 --- /dev/null +++ b/producer/folder_monitor_producer/CMakeLists.txt @@ -0,0 +1,47 @@ +set(TARGET_NAME folder-monitor-producer) +set(SOURCE_FILES + src/event_detector.cpp + ) + + +################################ +# Library (needed for unit tests) +################################ + +add_library(${TARGET_NAME} ${SOURCE_FILES}) +target_include_directories(${TARGET_NAME} PUBLIC include ${CMAKE_SOURCE_DIR}/common/cpp/include) + +#Add all necessary common libraries +GET_PROPERTY(ASAPO_COMMON_IO_LIBRARIES GLOBAL PROPERTY ASAPO_COMMON_IO_LIBRARIES) +target_link_libraries(${TARGET_NAME} ${ASAPO_COMMON_IO_LIBRARIES}) + +target_link_libraries(${TARGET_NAME} producer-api) + + + +################################ +# Executable +################################ +add_executable(${TARGET_NAME}-bin src/foldermon_main.cpp) +set_target_properties(${TARGET_NAME}-bin PROPERTIES OUTPUT_NAME asapo-foldermon-producer) +target_link_libraries(${TARGET_NAME}-bin ${TARGET_NAME}) + +set_target_properties(${TARGET_NAME}-bin PROPERTIES RUNTIME_OUTPUT_DIRECTORY + ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> + ) + + +################################ +# Testing +################################ +set(TEST_SOURCE_FILES + unittests/test_foldermon_config.cpp + ) + +set(TEST_LIBRARIES "${TARGET_NAME}") + + +gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}") + +install(TARGETS ${TARGET_NAME}-bin DESTINATION bin) + diff --git a/producer/folder_monitor_producer/src/event_detector.cpp b/producer/folder_monitor_producer/src/event_detector.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f667dbbb4c1d49cb16bf9d59d497398dd56737d5 --- /dev/null +++ b/producer/folder_monitor_producer/src/event_detector.cpp @@ -0,0 +1,5 @@ +#include "event_detector.h" + +namespace asapo { + +} \ No newline at end of file diff --git a/producer/folder_monitor_producer/src/event_detector.h b/producer/folder_monitor_producer/src/event_detector.h new file mode 100644 index 0000000000000000000000000000000000000000..004b5b393543b5fbf0c2358f8bcc7e0941cec671 --- /dev/null +++ b/producer/folder_monitor_producer/src/event_detector.h @@ -0,0 +1,13 @@ +#ifndef ASAPO_EVENT_DETECTOR_H +#define ASAPO_EVENT_DETECTOR_H + +namespace asapo { + +class EventDetector { + +}; + + +} + +#endif //ASAPO_EVENT_DETECTOR_H diff --git a/producer/folder_monitor_producer/src/foldermon_main.cpp b/producer/folder_monitor_producer/src/foldermon_main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3a518ec12140302ea288945038238570e777452e --- /dev/null +++ b/producer/folder_monitor_producer/src/foldermon_main.cpp @@ -0,0 +1,13 @@ +#include <iostream> +#include <chrono> +#include <vector> +#include <mutex> +#include <thread> + +#include "asapo_producer.h" + + +int main (int argc, char* argv[]) { + return EXIT_SUCCESS; +} + diff --git a/producer/folder_monitor_producer/unittests/test_foldermon_config.cpp b/producer/folder_monitor_producer/unittests/test_foldermon_config.cpp new file mode 100644 index 0000000000000000000000000000000000000000..cad0aebb653d23ddcb9830c58a35e98f5528e2ed --- /dev/null +++ b/producer/folder_monitor_producer/unittests/test_foldermon_config.cpp @@ -0,0 +1,42 @@ +#include <gtest/gtest.h> +#include <gmock/gmock.h> +#include <unittests/MockIO.h> + +using ::testing::Test; +using ::testing::Return; +using ::testing::_; +using ::testing::DoAll; +using ::testing::SetArgReferee; +using ::testing::Gt; +using ::testing::Eq; +using ::testing::Ne; +using ::testing::Mock; +using ::testing::NiceMock; +using ::testing::SaveArg; +using ::testing::SaveArgPointee; +using ::testing::InSequence; +using ::testing::SetArgPointee; +using ::asapo::Error; +using ::asapo::ErrorInterface; +using ::asapo::FileDescriptor; +using ::asapo::SocketDescriptor; +using ::asapo::MockIO; + +namespace { + + +class ConfigTests : public Test { + public: + MockIO mock_io; + void SetUp() override { + } + void TearDown() override { + } + +}; + + +TEST_F(ConfigTests, ReadSettings) { +} + +}