From 098d16b50fb40536d865a92ac864bc1685a22aed Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Fri, 31 Aug 2018 16:41:58 +0200 Subject: [PATCH] integration tests workd for inotify --- common/cpp/include/common/data_structs.h | 2 +- common/cpp/include/io/io.h | 20 ++--- common/cpp/include/unittests/MockIO.h | 17 ++-- common/cpp/src/system_io/system_io.cpp | 77 +++++++++++++++---- common/cpp/src/system_io/system_io.h | 9 ++- .../dummy_data_producer.cpp | 2 +- producer/api/CMakeLists.txt | 2 +- producer/api/include/producer/producer.h | 13 +++- producer/api/src/producer_impl.cpp | 23 ++++-- producer/api/src/producer_impl.h | 5 +- producer/api/src/request.cpp | 13 ++++ producer/api/src/request.h | 3 + producer/api/src/request_handler.h | 2 +- .../api/src/request_handler_filesystem.cpp | 11 ++- producer/api/src/request_handler_filesystem.h | 2 +- producer/api/src/request_handler_tcp.cpp | 18 ++++- producer/api/src/request_handler_tcp.h | 3 +- producer/api/unittests/mocking.h | 2 +- producer/api/unittests/test_producer.cpp | 2 +- producer/api/unittests/test_producer_impl.cpp | 35 +++++++-- .../test_receiver_discovery_service.cpp | 2 +- .../test_request_handler_filesystem.cpp | 51 ++++++++++-- .../unittests/test_request_handler_tcp.cpp | 48 +++++++++++- producer/api/unittests/test_request_pool.cpp | 4 +- .../event_monitor_producer/CMakeLists.txt | 4 +- .../src/inotify_event.cpp | 32 +++++++- .../src/inotify_event.h | 2 +- .../{eventmon_main.cpp => main_eventmon.cpp} | 4 +- .../src/system_folder_watch_linux.cpp | 4 +- .../src/system_folder_watch_linux.h | 7 +- receiver/src/request_handler_file_write.cpp | 4 +- .../test_request_handler_file_write.cpp | 7 +- .../file_monitor_producer/CMakeLists.txt | 1 + .../file_monitor_producer/check_linux.sh | 24 +++--- .../producer/file_monitor_producer/test.json | 11 +++ .../read_file_content/CMakeLists.txt | 1 + .../read_file_content/read_file_content.cpp | 9 ++- .../read_file_content/setup_linux.sh | 1 + .../read_file_content/setup_windows.bat | 1 + .../write_data_to_file/CMakeLists.txt | 1 + .../write_data_to_file/cleanup_linux.sh | 1 + .../write_data_to_file/cleanup_windows.bat | 1 + .../write_data_to_file/write_data_to_file.cpp | 12 ++- worker/api/cpp/src/folder_data_broker.cpp | 2 +- worker/api/cpp/src/server_data_broker.cpp | 2 +- .../api/cpp/unittests/test_folder_broker.cpp | 2 +- .../api/cpp/unittests/test_server_broker.cpp | 2 +- 47 files changed, 380 insertions(+), 121 deletions(-) rename producer/event_monitor_producer/src/{eventmon_main.cpp => main_eventmon.cpp} (93%) create mode 100644 tests/automatic/producer/file_monitor_producer/test.json diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h index dab7dd358..c0065769e 100644 --- a/common/cpp/include/common/data_structs.h +++ b/common/cpp/include/common/data_structs.h @@ -28,8 +28,8 @@ inline bool operator==(const FileInfo& lhs, const FileInfo& rhs) { } - using FileData = std::unique_ptr<uint8_t[]>; + using FileInfos = std::vector<FileInfo>; using SubDirList = std::vector<std::string>; diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index 4af8883a7..4a0ee3ee5 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -14,17 +14,17 @@ namespace asapo { //Need to be "enum" since multiple flags are allowed -enum FileOpenMode { - IO_OPEN_MODE_READ = 1 << 0, +enum FileOpenMode : unsigned short { + IO_OPEN_MODE_READ = 1, IO_OPEN_MODE_WRITE = 1 << 1, - IO_OPEN_MODE_RW = IO_OPEN_MODE_READ | IO_OPEN_MODE_WRITE, - IO_OPEN_MODE_CREATE = 1 << 2, - IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS = 1 << 3, + IO_OPEN_MODE_RW = 1 << 2, + IO_OPEN_MODE_CREATE = 1 << 3, + IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS = 1 << 4, /** * Will set the length of a file to 0 * Only works if file is open with READ and WRITE mode */ - IO_OPEN_MODE_SET_LENGTH_0 = 1 << 4, + IO_OPEN_MODE_SET_LENGTH_0 = 1 << 5, }; enum class AddressFamilies { @@ -91,11 +91,13 @@ class IO { virtual size_t Read (FileDescriptor fd, void* buf, size_t length, Error* err) const = 0; virtual size_t Write (FileDescriptor fd, const void* buf, size_t length, Error* err) const = 0; - virtual Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const = 0; - virtual Error WriteDataToFile (const std::string& fname, const uint8_t* data, size_t length) const = 0; + virtual Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const FileData& data, + size_t length, bool create_directories) const = 0; + virtual Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const uint8_t* data, + size_t length, bool create_directories) const = 0; virtual void CreateNewDirectory (const std::string& directory_name, Error* err) const = 0; - virtual FileData GetDataFromFile (const std::string& fname, uint64_t fsize, Error* err) const = 0; + virtual FileData GetDataFromFile (const std::string& fname, uint64_t* fsize, Error* err) const = 0; virtual SubDirList GetSubDirectories(const std::string& path, Error* err) const = 0; virtual std::vector<FileInfo> FilesInFolder (const std::string& folder, Error* err) const = 0; virtual std::string ReadFileToString (const std::string& fname, Error* err) const = 0; diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index 0a7b826e1..b88faa5e3 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -164,26 +164,29 @@ class MockIO : public IO { } MOCK_CONST_METHOD2(CreateNewDirectory_t, void(const std::string& directory_name, ErrorInterface** err)); - FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const override { + FileData GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const override { ErrorInterface* error = nullptr; auto data = GetDataFromFile_t(fname, fsize, &error); err->reset(error); return FileData(data); } - MOCK_CONST_METHOD3(GetDataFromFile_t, uint8_t* (const std::string& fname, uint64_t fsize, ErrorInterface** err)); + MOCK_CONST_METHOD3(GetDataFromFile_t, uint8_t* (const std::string& fname, uint64_t* fsize, ErrorInterface** err)); - Error WriteDataToFile(const std::string& fname, const FileData& data, size_t length) const override { - return Error{WriteDataToFile_t(fname, data.get(), length)}; + Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const FileData& data, + size_t length, bool create_directories) const override { + return Error{WriteDataToFile_t(root_folder, fname, data.get(), length, create_directories)}; } - Error WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const override { - return Error{WriteDataToFile_t(fname, data, length)}; + Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, + size_t length, bool create_directories) const override { + return Error{WriteDataToFile_t(root_folder, fname, data, length, create_directories)}; } - MOCK_CONST_METHOD3(WriteDataToFile_t, ErrorInterface * (const std::string& fname, const uint8_t* data, size_t fsize)); + MOCK_CONST_METHOD5(WriteDataToFile_t, ErrorInterface * (const std::string& root_folder, const std::string& fname, + const uint8_t* data, size_t fsize, bool create_directories)); std::vector<FileInfo> FilesInFolder(const std::string& folder, Error* err) const override { ErrorInterface* error = nullptr; diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index 9cf619807..306451a16 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -19,7 +19,7 @@ #endif #include "system_io.h" - +#include "preprocessor/definitions.h" namespace asapo { @@ -79,19 +79,28 @@ uint8_t* AllocateArray(uint64_t fsize, Error* err) { // PRIVATE FUNCTIONS - END -FileData SystemIO::GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const { +FileData SystemIO::GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const { + + if (*fsize == 0 && !fname.empty()) { + auto info = GetFileInfo(fname, err); + if (*err != nullptr) { + return nullptr; + } + *fsize = info.size; + } + *err = nullptr; auto fd = Open(fname, IO_OPEN_MODE_READ, err); if (*err != nullptr) { return nullptr; } - auto data_array = AllocateArray(fsize, err); + auto data_array = AllocateArray(*fsize, err); if (*err != nullptr) { return nullptr; } - Read(fd, data_array, fsize, err); + Read(fd, data_array, *fsize, err); if (*err != nullptr) { (*err)->Append(fname); Close(fd, nullptr); @@ -132,9 +141,28 @@ void asapo::SystemIO::CreateNewDirectory(const std::string& directory_name, Erro } } -Error SystemIO::WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const { +Error SystemIO::WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, + size_t length, bool create_directories) const { + std::string full_name; + if (!root_folder.empty()) { + full_name = root_folder + kPathSeparator + fname; + } else { + full_name = fname; + } Error err; - auto fd = Open(fname, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW, &err); + auto fd = Open(full_name, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW, &err); + if (err == IOErrorTemplates::kFileNotFound && create_directories) { + size_t pos = fname.rfind(kPathSeparator); + if (pos == std::string::npos) { + return IOErrorTemplates::kFileNotFound.Generate(); + } + auto err_create = CreateDirectoryWithParents(root_folder, fname.substr(0, pos)); + if (err_create) { + return err_create; + } + return WriteDataToFile(root_folder, fname, data, length, false); + } + if (err) { return err; } @@ -149,23 +177,21 @@ Error SystemIO::WriteDataToFile(const std::string& fname, const uint8_t* data, s } -Error SystemIO::WriteDataToFile(const std::string& fname, const FileData& data, size_t length) const { - return WriteDataToFile(fname, data.get(), length); +Error SystemIO::WriteDataToFile(const std::string& root_folder, const std::string& fname, const FileData& data, + size_t length, bool create_directories) const { + return WriteDataToFile(root_folder, fname, data.get(), length, create_directories); } std::string SystemIO::ReadFileToString(const std::string& fname, Error* err) const { - auto info = GetFileInfo(fname, err); - if (*err != nullptr) { - return ""; - } - auto data = GetDataFromFile(fname, info.size, err); + uint64_t size = 0; + auto data = GetDataFromFile(fname, &size, err); if (*err != nullptr) { return ""; } - return std::string(reinterpret_cast<const char*>(data.get()), info.size); + return std::string(reinterpret_cast<const char*>(data.get()), size); } @@ -218,7 +244,7 @@ asapo::FileDescriptor asapo::SystemIO::CreateAndConnectIPTCPSocket(const std::st int SystemIO::FileOpenModeToPosixFileOpenMode(int open_flags) const { int flags = 0; - if((open_flags & IO_OPEN_MODE_READ && open_flags & IO_OPEN_MODE_WRITE) || open_flags & IO_OPEN_MODE_RW) { + if(((open_flags & IO_OPEN_MODE_READ) && (open_flags & IO_OPEN_MODE_WRITE)) || (open_flags & IO_OPEN_MODE_RW)) { flags |= O_RDWR; } else { if (open_flags & IO_OPEN_MODE_READ) { @@ -554,4 +580,25 @@ size_t SystemIO::Transfer(ssize_t (* method)(FileDescriptor, void*, size_t), Fil return already_transferred; } +Error SystemIO::CreateDirectoryWithParents(const std::string& root_path, const std::string& path) const { + for( std::string::const_iterator iter = path.begin() ; iter != path.end(); ) { + iter = std::find( iter, path.end(), kPathSeparator ); + std::string new_path; + if (root_path.empty()) { + new_path = std::string( path.begin(), iter); + } else { + new_path = root_path + kPathSeparator + std::string(path.begin(), iter); + } + Error err; + CreateNewDirectory(new_path, &err); + if (err && err != IOErrorTemplates::kFileAlreadyExists) { + return err; + } + if (iter != path.end()) { + ++iter; + } + } + return nullptr; +} + } diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index 156a25b48..b368c43c1 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -64,6 +64,7 @@ class SystemIO final : public IO { void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, Error* err) const; void GetSubDirectoriesRecursively(const std::string& path, SubDirList* subdirs, Error* err) const; + Error CreateDirectoryWithParents(const std::string& root_path, const std::string& path) const; public: /* * Special @@ -102,9 +103,11 @@ class SystemIO final : public IO { size_t Read(FileDescriptor fd, void* buf, size_t length, Error* err) const override; size_t Write(FileDescriptor fd, const void* buf, size_t length, Error* err) const override; void CreateNewDirectory(const std::string& directory_name, Error* err) const override; - FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const override; - Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const override; - Error WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const override; + FileData GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const override; + Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const FileData& data, + size_t length, bool create_directories) const override; + Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, + size_t length, bool create_directories) const override; SubDirList GetSubDirectories(const std::string& path, Error* err) const override; std::string ReadFileToString(const std::string& fname, Error* err) const override; }; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 2c8c4f2fd..aec527a91 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -81,7 +81,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it for(uint64_t i = 0; i < iterations; i++) { auto buffer = CreateMemoryBuffer(number_of_byte); asapo::EventHeader event_header{i + 1, number_of_byte, std::to_string(i)}; - auto err = producer->Send(event_header, std::move(buffer), &ProcessAfterSend); + auto err = producer->SendData(event_header, std::move(buffer), &ProcessAfterSend); if (err) { std::cerr << "Cannot send file: " << err << std::endl; return false; diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index e30da7fec..ca21d6cf8 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -8,7 +8,7 @@ set(SOURCE_FILES src/request_pool.cpp src/receiver_discovery_service.cpp src/request_handler_factory.cpp - src/request.cpp include/producer/common.h) + src/request.cpp) ################################ diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index ecded5dd8..6332739d0 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -11,6 +11,7 @@ namespace asapo { + class Producer { public: //! Creates a new producer @@ -25,12 +26,18 @@ class Producer { //! Sends data to the receiver /*! - \param file_id - The id of the file. An error will be returned if this file id already exists on the receiver. + \param event_header - A stucture with the meta information (file name, size). \param data - A pointer to the data to send - \param file_size - The size of the data. \return Error - Will be nullptr on success */ - virtual Error Send(const EventHeader& event_header, FileData data, RequestCallback callback) = 0; + virtual Error SendData(const EventHeader& event_header, FileData data, RequestCallback callback) = 0; + //! Sends files to the receiver + /*! + \param event_header - A stucture with the meta information (file name, size is ignored). + \param file name - A full path of the file to send + \return Error - Will be nullptr on success + */ + virtual Error SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) = 0; //! Set internal log level virtual void SetLogLevel(LogLevel level) = 0; diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 80c09055b..2f1a700d6 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -45,23 +45,36 @@ Error CheckProducerRequest(size_t file_size, size_t filename_size) { return nullptr; } - -Error ProducerImpl::Send(const EventHeader& event_header, FileData data, +Error ProducerImpl::Send(const EventHeader& event_header, + FileData data, + std::string full_path, RequestCallback callback) { - auto err = CheckProducerRequest(event_header.file_size, event_header.file_name.size()); if (err) { log__->Error("error checking request - " + err->Explain()); return err; } - auto request_header = GenerateNextSendRequest(event_header.file_id, event_header.file_size, event_header.file_name); + return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{beamtime_id_, request_header, + std::move(data), std::move(full_path), callback} + }); + +} + - return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{beamtime_id_, request_header, std::move(data), callback}}); +Error ProducerImpl::SendData(const EventHeader& event_header, FileData data, + RequestCallback callback) { + return Send(event_header, std::move(data), "", callback); } + +Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) { + return Send(event_header, nullptr, std::move(full_path), callback); +} + + void ProducerImpl::SetLogLevel(LogLevel level) { log__->SetLogLevel(level); } diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index 55d6e7bbb..da104f95c 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -28,12 +28,13 @@ class ProducerImpl : public Producer { void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error Send(const EventHeader& event_header, FileData data, RequestCallback callback) override; + Error SendData(const EventHeader& event_header, FileData data, RequestCallback callback) override; + Error SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) override; AbstractLogger* log__; std::unique_ptr<RequestPool> request_pool__; Error SetBeamtimeId(std::string beamtime_id) override; - private: + Error Send(const EventHeader& event_header, FileData data, std::string full_path, RequestCallback callback); GenericRequestHeader GenerateNextSendRequest(uint64_t file_id, uint64_t file_size, std::string file_name); std::string beamtime_id_; }; diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp index 2b671dfc4..faae53e91 100644 --- a/producer/api/src/request.cpp +++ b/producer/api/src/request.cpp @@ -1 +1,14 @@ #include "request.h" + +namespace asapo { + +Error Request::ReadDataFromFileIfNeeded(const IO* io) { + if (data != nullptr || original_filepath.empty()) { + return nullptr; + } + Error err; + data = io->GetDataFromFile(original_filepath, &header.data_size, &err); + return err; +} + +} \ No newline at end of file diff --git a/producer/api/src/request.h b/producer/api/src/request.h index b522385a3..3b5fe21c1 100644 --- a/producer/api/src/request.h +++ b/producer/api/src/request.h @@ -4,6 +4,7 @@ #include "common/networking.h" #include "producer/common.h" #include "common/data_structs.h" +#include "io/io.h" namespace asapo { @@ -11,7 +12,9 @@ struct Request { std::string beamtime_id; GenericRequestHeader header; FileData data; + std::string original_filepath; RequestCallback callback; + Error ReadDataFromFileIfNeeded(const IO* io); }; } diff --git a/producer/api/src/request_handler.h b/producer/api/src/request_handler.h index e99bbd284..64b39ced5 100644 --- a/producer/api/src/request_handler.h +++ b/producer/api/src/request_handler.h @@ -12,7 +12,7 @@ class RequestHandler { public: virtual void PrepareProcessingRequestLocked() = 0; virtual void TearDownProcessingRequestLocked(const Error& error_from_process) = 0; - virtual Error ProcessRequestUnlocked(const Request* request) = 0; + virtual Error ProcessRequestUnlocked(Request* request) = 0; virtual bool ReadyProcessRequest() = 0; virtual ~RequestHandler() = default; }; diff --git a/producer/api/src/request_handler_filesystem.cpp b/producer/api/src/request_handler_filesystem.cpp index e557c2645..cca62163b 100644 --- a/producer/api/src/request_handler_filesystem.cpp +++ b/producer/api/src/request_handler_filesystem.cpp @@ -13,9 +13,14 @@ RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folde } -Error RequestHandlerFilesystem::ProcessRequestUnlocked(const Request* request) { - std::string fullpath = destination_folder_ + "/" + request->header.message + ".bin"; - auto err = io__->WriteDataToFile(fullpath, (uint8_t*)request->data.get(), request->header.data_size); +Error RequestHandlerFilesystem::ProcessRequestUnlocked(Request* request) { + auto err = request->ReadDataFromFileIfNeeded(io__.get()); + if (err) { + return err; + } + + err = io__->WriteDataToFile(destination_folder_, request->header.message, (uint8_t*)request->data.get(), + request->header.data_size, true); if (request->callback) { request->callback(request->header, std::move(err)); } diff --git a/producer/api/src/request_handler_filesystem.h b/producer/api/src/request_handler_filesystem.h index ba29a407e..607816cf8 100644 --- a/producer/api/src/request_handler_filesystem.h +++ b/producer/api/src/request_handler_filesystem.h @@ -17,7 +17,7 @@ namespace asapo { class RequestHandlerFilesystem: public RequestHandler { public: explicit RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id); - Error ProcessRequestUnlocked(const Request* request) override; + Error ProcessRequestUnlocked(Request* request) override; bool ReadyProcessRequest() override { return true; }; diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index e82af41d3..f5e271da1 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -158,10 +158,7 @@ bool RequestHandlerTcp::ServerError(const Error& err) { return err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse; } -Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { - if (NeedRebalance()) { - CloseConnectionToPeformRebalance(); - } +Error RequestHandlerTcp::SendDataToOneOfTheReceivers(Request* request) { for (auto receiver_uri : receivers_list_) { if (Disconnected()) { auto err = ConnectToReceiver(request->beamtime_id, receiver_uri); @@ -184,6 +181,19 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { return ProducerErrorTemplates::kCannotSendDataToReceivers.Generate(); } + +Error RequestHandlerTcp::ProcessRequestUnlocked(Request* request) { + auto err = request->ReadDataFromFileIfNeeded(io__.get()); + if (err) { + return err; + } + + if (NeedRebalance()) { + CloseConnectionToPeformRebalance(); + } + return SendDataToOneOfTheReceivers(request); +} + bool RequestHandlerTcp::Connected() { return sd_ != kDisconnectedSocketDescriptor; } diff --git a/producer/api/src/request_handler_tcp.h b/producer/api/src/request_handler_tcp.h index c89b282a7..dec7f4d4c 100644 --- a/producer/api/src/request_handler_tcp.h +++ b/producer/api/src/request_handler_tcp.h @@ -19,7 +19,7 @@ namespace asapo { class RequestHandlerTcp: public RequestHandler { public: explicit RequestHandlerTcp(ReceiverDiscoveryService* discovery_service, uint64_t thread_id, uint64_t* shared_counter); - Error ProcessRequestUnlocked(const Request* request) override; + Error ProcessRequestUnlocked(Request* request) override; bool ReadyProcessRequest() override; void PrepareProcessingRequestLocked() override; void TearDownProcessingRequestLocked(const Error& error_from_process) override; @@ -31,6 +31,7 @@ class RequestHandlerTcp: public RequestHandler { private: Error Authorize(const std::string& beamtime_id); Error ConnectToReceiver(const std::string& beamtime_id, const std::string& receiver_address); + Error SendDataToOneOfTheReceivers(Request* request); Error SendHeaderAndData(const Request*); Error ReceiveResponse(); Error TrySendToReceiver(const Request* request); diff --git a/producer/api/unittests/mocking.h b/producer/api/unittests/mocking.h index 7d231cc2a..0ccb688c9 100644 --- a/producer/api/unittests/mocking.h +++ b/producer/api/unittests/mocking.h @@ -40,7 +40,7 @@ class MockRequestPull : public RequestPool { class MockRequestHandler : public RequestHandler { public: - Error ProcessRequestUnlocked(const Request* request) override { + Error ProcessRequestUnlocked(Request* request) override { return Error{ProcessRequestUnlocked_t(request)}; } void TearDownProcessingRequestLocked(const Error& error_from_process) override { diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp index ee69cb8eb..fa99a9b79 100644 --- a/producer/api/unittests/test_producer.cpp +++ b/producer/api/unittests/test_producer.cpp @@ -51,7 +51,7 @@ TEST(Producer, SimpleWorkflowWihoutConnection) { &err); asapo::EventHeader event_header{1, 1, ""}; - auto err_send = producer->Send(event_header, nullptr, nullptr); + auto err_send = producer->SendData(event_header, nullptr, nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index 16824cf7e..0d922f238 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -34,7 +34,7 @@ MATCHER_P5(M_CheckSendDataRequest, op_code, beamtime_id, file_id, file_size, mes "Checks if a valid GenericRequestHeader was Send") { return ((asapo::GenericRequestHeader)(arg->header)).op_code == op_code && ((asapo::GenericRequestHeader)(arg->header)).data_id == file_id - && ((asapo::GenericRequestHeader)(arg->header)).data_size == file_size + && ((asapo::GenericRequestHeader)(arg->header)).data_size == uint(file_size) && arg->beamtime_id == beamtime_id && strcmp(((asapo::GenericRequestHeader)(arg->header)).message, message) == 0; } @@ -66,14 +66,14 @@ TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); asapo::EventHeader event_header{1, 1, ""}; - auto err = producer.Send(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::EventHeader event_header{1, 1, long_string}; - auto err = producer.Send(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong)); } @@ -81,26 +81,47 @@ TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("error checking"))); asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize + 1, ""}; - auto err = producer.Send(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } -TEST_F(ProducerImplTests, OKSendingRequest) { +TEST_F(ProducerImplTests, OKSendingSendDataRequest) { uint64_t expected_size = 100; uint64_t expected_id = 10; char expected_name[asapo::kMaxMessageSize] = "test_name"; std::string expected_beamtimeid = "beamtime_id"; producer.SetBeamtimeId(expected_beamtimeid); - Request request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, nullptr, nullptr}; + Request request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, + nullptr, "", nullptr}; EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, expected_beamtimeid, expected_id, expected_size, expected_name))).WillOnce(Return( nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name}; - auto err = producer.Send(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, nullptr); + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(ProducerImplTests, OKSendingSendFileRequest) { + uint64_t expected_id = 10; + char expected_name[asapo::kMaxMessageSize] = "test_name"; + std::string expected_beamtimeid = "beamtime_id"; + std::string expected_fullpath = "filename"; + + producer.SetBeamtimeId(expected_beamtimeid); + Request request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, 0, expected_name}, + nullptr, "", nullptr}; + + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, + expected_beamtimeid, expected_id, 0, expected_name))).WillOnce(Return( + nullptr)); + + asapo::EventHeader event_header{expected_id, 0, expected_name}; + auto err = producer.SendFile(event_header, expected_fullpath, nullptr); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/producer/api/unittests/test_receiver_discovery_service.cpp b/producer/api/unittests/test_receiver_discovery_service.cpp index c31136152..ddf55532f 100644 --- a/producer/api/unittests/test_receiver_discovery_service.cpp +++ b/producer/api/unittests/test_receiver_discovery_service.cpp @@ -115,7 +115,7 @@ TEST_F(ReceiversStatusTests, GetsReqestedInformation) { )); status.StartCollectingData(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(30)); auto nc = status.MaxConnections(); ASSERT_THAT(nc, Eq(8)); diff --git a/producer/api/unittests/test_request_handler_filesystem.cpp b/producer/api/unittests/test_request_handler_filesystem.cpp index 6082d7dc9..a87a58506 100644 --- a/producer/api/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/unittests/test_request_handler_filesystem.cpp @@ -45,19 +45,23 @@ class RequestHandlerFilesystemTests : public testing::Test { std::string expected_file_name = "test_name"; uint64_t expected_thread_id = 2; std::string expected_destination = "destination"; - std::string expected_fullpath = expected_destination + "/" + expected_file_name + ".bin"; + std::string expected_fullpath = expected_destination + "/" + expected_file_name; + std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; + asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{"", header, nullptr, [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::Request request{"", header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::Request request_nocallback{"", header, nullptr, nullptr}; + asapo::Request request_nocallback{"", header, nullptr, "", nullptr}; + asapo::Request request_filesend{"", header, nullptr, expected_origin_fullpath, nullptr}; + testing::NiceMock<asapo::MockLogger> mock_logger; asapo::RequestHandlerFilesystem request_handler{expected_destination, expected_thread_id}; @@ -84,7 +88,7 @@ MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, } TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, nullptr, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_destination, expected_file_name, nullptr, expected_file_size, true)) .WillOnce( Return( asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) @@ -99,10 +103,9 @@ TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { } TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, nullptr, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_destination, expected_file_name, nullptr, expected_file_size, true)) .WillOnce( - Return( - asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) + Return(nullptr) ); @@ -113,9 +116,41 @@ TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { } +TEST_F(RequestHandlerFilesystemTests, FileRequestErrorOnReadData) { + + EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<2>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(nullptr) + )); + + auto err = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); +} + +TEST_F(RequestHandlerFilesystemTests, FileRequestOK) { + + EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<2>(nullptr), + Return(nullptr) + )); + + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_destination, expected_file_name, nullptr, expected_file_size, true)) + .WillOnce( + Return(nullptr) + ); + + auto err = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(err, Eq(nullptr)); +} + + TEST_F(RequestHandlerFilesystemTests, TransferOK) { - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, nullptr, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_destination, expected_file_name, nullptr, expected_file_size, true)) .WillOnce( Return( nullptr) diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 383dd0066..e3df453b9 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -61,13 +61,17 @@ class RequestHandlerTcpTests : public testing::Test { asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{expected_beamtime_id, header, nullptr, [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::Request request{expected_beamtime_id, header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::Request request_nocallback{expected_beamtime_id, header, nullptr, nullptr}; + std::string expected_origin_fullpath = std::string("origin/") + expected_file_name + ".bin"; + asapo::Request request_filesend{expected_beamtime_id, header, nullptr, expected_origin_fullpath, nullptr}; + + + asapo::Request request_nocallback{expected_beamtime_id, header, nullptr, "", nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; uint64_t n_connections{0}; asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; @@ -594,6 +598,46 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ASSERT_THAT(called, Eq(false)); } +TEST_F(RequestHandlerTcpTests, FileRequestErrorOnReadData) { + + request_handler.PrepareProcessingRequestLocked(); + + EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<2>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(nullptr) + )); + + auto err = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); +} + + + +TEST_F(RequestHandlerTcpTests, FileRequestOK) { + ExpectOKConnect(true); + ExpectOKAuthorize(true); + ExpectOKSendHeader(true); + ExpectOKSendData(true); + ExpectOKReceive(); + + request_handler.PrepareProcessingRequestLocked(); + + EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<2>(nullptr), + Return(nullptr) + )); + + auto err = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(err, Eq(nullptr)); +} + + + + TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp index 6ec589c3d..aaca6d65b 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/producer/api/unittests/test_request_pool.cpp @@ -62,7 +62,7 @@ class RequestPoolTests : public testing::Test { MockRequestHandlerFactory request_handler_factory{mock_request_handler}; const uint8_t nthreads = 1; asapo::RequestPool pool {nthreads, &request_handler_factory}; - std::unique_ptr<Request> request{new Request{"", GenericRequestHeader{}, nullptr, nullptr}}; + std::unique_ptr<Request> request{new Request{"", GenericRequestHeader{}, nullptr, "", nullptr}}; void SetUp() override { pool.log__ = &mock_logger; } @@ -118,7 +118,7 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { - Request* request2 = new Request{"", GenericRequestHeader{}, nullptr, nullptr}; + Request* request2 = new Request{"", GenericRequestHeader{}, nullptr, "", nullptr}; ExpectSend(mock_request_handler, 2); diff --git a/producer/event_monitor_producer/CMakeLists.txt b/producer/event_monitor_producer/CMakeLists.txt index 616cd34b0..dd3ec147d 100644 --- a/producer/event_monitor_producer/CMakeLists.txt +++ b/producer/event_monitor_producer/CMakeLists.txt @@ -32,7 +32,7 @@ target_link_libraries(${TARGET_NAME} producer-api) ################################ # Executable ################################ -add_executable(${TARGET_NAME}-bin src/eventmon_main.cpp) +add_executable(${TARGET_NAME}-bin src/main_eventmon.cpp) set_target_properties(${TARGET_NAME}-bin PROPERTIES OUTPUT_NAME asapo-eventmon-producer) target_link_libraries(${TARGET_NAME}-bin ${TARGET_NAME}) @@ -60,7 +60,7 @@ ENDIF(WIN32) set(TEST_LIBRARIES "${TARGET_NAME}") -gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_SOURCE_DIR}/src/eventmon_main.cpp) +gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_SOURCE_DIR}/src/main_eventmon.cpp) install(TARGETS ${TARGET_NAME}-bin DESTINATION bin) diff --git a/producer/event_monitor_producer/src/inotify_event.cpp b/producer/event_monitor_producer/src/inotify_event.cpp index b4fda4ebd..bcc89e358 100644 --- a/producer/event_monitor_producer/src/inotify_event.cpp +++ b/producer/event_monitor_producer/src/inotify_event.cpp @@ -33,11 +33,37 @@ bool InotifyEvent::IsNewDirectoryInFolderEvent() const { bool InotifyEvent::IsDeleteDirectoryInFolderEvent() const { return IsDirectoryEvent() && ((GetMask() & IN_DELETE_SELF) || (GetMask() & IN_MOVED_FROM)); } -uint32_t InotifyEvent::NameLength() const { - return inotify_event_->len; -} + bool InotifyEvent::IsDeleteDirectoryInFolderEventByMove() const { return IsDeleteDirectoryInFolderEvent() && (GetMask() & IN_MOVED_FROM); } +void InotifyEvent::Print() const { + printf(" wd =%2d; ", inotify_event_->wd); + if (inotify_event_->cookie > 0) + printf("cookie =%4d; ", inotify_event_->cookie); + + printf("mask = "); + if (inotify_event_->mask & IN_ACCESS) printf("IN_ACCESS "); + if (inotify_event_->mask & IN_ATTRIB) printf("IN_ATTRIB "); + if (inotify_event_->mask & IN_CLOSE_NOWRITE) printf("IN_CLOSE_NOWRITE "); + if (inotify_event_->mask & IN_CLOSE_WRITE) printf("IN_CLOSE_WRITE "); + if (inotify_event_->mask & IN_CREATE) printf("IN_CREATE "); + if (inotify_event_->mask & IN_DELETE) printf("IN_DELETE "); + if (inotify_event_->mask & IN_DELETE_SELF) printf("IN_DELETE_SELF "); + if (inotify_event_->mask & IN_IGNORED) printf("IN_IGNORED "); + if (inotify_event_->mask & IN_ISDIR) printf("IN_ISDIR "); + if (inotify_event_->mask & IN_MODIFY) printf("IN_MODIFY "); + if (inotify_event_->mask & IN_MOVE_SELF) printf("IN_MOVE_SELF "); + if (inotify_event_->mask & IN_MOVED_FROM) printf("IN_MOVED_FROM "); + if (inotify_event_->mask & IN_MOVED_TO) printf("IN_MOVED_TO "); + if (inotify_event_->mask & IN_OPEN) printf("IN_OPEN "); + if (inotify_event_->mask & IN_Q_OVERFLOW) printf("IN_Q_OVERFLOW "); + if (inotify_event_->mask & IN_UNMOUNT) printf("IN_UNMOUNT "); + printf("\n"); + + if (inotify_event_->len > 0) + printf(" name = %s\n", inotify_event_->name); + +} } \ No newline at end of file diff --git a/producer/event_monitor_producer/src/inotify_event.h b/producer/event_monitor_producer/src/inotify_event.h index 472f0c7b7..a01d950c0 100644 --- a/producer/event_monitor_producer/src/inotify_event.h +++ b/producer/event_monitor_producer/src/inotify_event.h @@ -15,7 +15,6 @@ class InotifyEvent { public: InotifyEvent(const struct inotify_event* inotify_event, const std::map<int, std::string>& watched_folders_paths); uint32_t Length() const ; - uint32_t NameLength() const ; bool IsDirectoryEvent() const ; bool IsNewFileInFolderEvent() const; bool IsNewDirectoryInFolderEvent() const; @@ -24,6 +23,7 @@ class InotifyEvent { int Descriptor() const ; const char* Name() const ; uint32_t GetMask() const; + void Print() const; private: const struct inotify_event* inotify_event_; const std::map<int, std::string> watched_folders_paths_; diff --git a/producer/event_monitor_producer/src/eventmon_main.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp similarity index 93% rename from producer/event_monitor_producer/src/eventmon_main.cpp rename to producer/event_monitor_producer/src/main_eventmon.cpp index f1a1c7855..d3b329d6f 100644 --- a/producer/event_monitor_producer/src/eventmon_main.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -12,6 +12,7 @@ #include "event_detector_factory.h" #include "eventmon_logger.h" #include "event_monitor_error.h" +#include "preprocessor/definitions.h" using asapo::Producer; using asapo::EventMonConfigFactory; @@ -99,7 +100,8 @@ int main (int argc, char* argv[]) { continue; } event_header.file_id = i++; - producer->Send(event_header, nullptr, ProcessAfterSend); + producer->SendFile(event_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator + + event_header.file_name, ProcessAfterSend); } logger->Info("Producer exit. Processed " + std::to_string(i) + " files"); diff --git a/producer/event_monitor_producer/src/system_folder_watch_linux.cpp b/producer/event_monitor_producer/src/system_folder_watch_linux.cpp index 0faa622e9..7b013693c 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_linux.cpp +++ b/producer/event_monitor_producer/src/system_folder_watch_linux.cpp @@ -83,6 +83,7 @@ Error SystemFolderWatch::FindEventPath(const InotifyEvent& event, std::string* f } Error SystemFolderWatch::ProcessFileEvent(const InotifyEvent& event, FilesToSend* files) { + event.Print(); if (!event.IsNewFileInFolderEvent()) { return nullptr; } @@ -91,8 +92,9 @@ Error SystemFolderWatch::ProcessFileEvent(const InotifyEvent& event, FilesToSend if (err) { return err; } + GetDefaultEventMonLogger()->Debug(((event.GetMask() & IN_CLOSE_WRITE) ? "file closed: " : "file moved: ") + fname); + files->emplace_back(std::move(fname)); - GetDefaultEventMonLogger()->Debug((event.GetMask() & IN_CLOSE_WRITE) ? "file closed: " : "file moved: " + fname); return nullptr; } diff --git a/producer/event_monitor_producer/src/system_folder_watch_linux.h b/producer/event_monitor_producer/src/system_folder_watch_linux.h index c7c0c3950..0b9c2e7dc 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_linux.h +++ b/producer/event_monitor_producer/src/system_folder_watch_linux.h @@ -48,14 +48,13 @@ class SystemFolderWatch { std::map<int, std::string>::iterator FindEventIterator(const InotifyEvent& event, Error* err); void RemoveFolderWithSubfoldersFromWatch(const std::string& path); std::map<int, std::string>::iterator RemoveFolderFromWatch(const std::map<int, std::string>::iterator& it); - + Error ReadInotifyEvents(int* bytes_read); + Error ProcessInotifyEvents(int bytes_in_buffer, FilesToSend* events); + Error FindEventPath(const InotifyEvent& event, std::string* folder, bool add_root); private: std::unique_ptr<char[]> buffer_; std::map<int, std::string> watched_folders_paths_; int watch_fd_ = -1; - Error ReadInotifyEvents(int* bytes_read); - Error ProcessInotifyEvents(int bytes_in_buffer, FilesToSend* events); - Error FindEventPath(const InotifyEvent& event, std::string* folder, bool add_root); std::string root_folder_; }; diff --git a/receiver/src/request_handler_file_write.cpp b/receiver/src/request_handler_file_write.cpp index 552393f76..93d7acb3e 100644 --- a/receiver/src/request_handler_file_write.cpp +++ b/receiver/src/request_handler_file_write.cpp @@ -18,8 +18,8 @@ Error RequestHandlerFileWrite::ProcessRequest(Request* request) const { auto fname = request->GetFileName(); auto root_folder = GetReceiverConfig()->root_folder + kPathSeparator + request->GetBeamline() + kPathSeparator - + request->GetBeamtimeId() + kPathSeparator; - auto err = io__->WriteDataToFile(root_folder + fname, data, fsize); + + request->GetBeamtimeId(); + auto err = io__->WriteDataToFile(root_folder, fname, data, fsize, true); if (!err) { log__->Debug("saved file of size " + std::to_string(fsize) + " to " + root_folder + fname); } diff --git a/receiver/unittests/test_request_handler_file_write.cpp b/receiver/unittests/test_request_handler_file_write.cpp index 5a999e068..6c5bdce40 100644 --- a/receiver/unittests/test_request_handler_file_write.cpp +++ b/receiver/unittests/test_request_handler_file_write.cpp @@ -132,10 +132,9 @@ TEST_F(FileWriteHandlerTests, CallsWriteFile) { MockRequestData(); std::string expected_path = std::string("test_folder") + asapo::kPathSeparator + expected_beamline - + asapo::kPathSeparator + expected_beamtime_id - + asapo::kPathSeparator + expected_file_name; + + asapo::kPathSeparator + expected_beamtime_id; - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_path.c_str(), _, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_path, expected_file_name, _, expected_file_size, true)) .WillOnce( Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) ); @@ -150,7 +149,7 @@ TEST_F(FileWriteHandlerTests, WritesToLog) { MockRequestData(); - EXPECT_CALL(mock_io, WriteDataToFile_t(_, _, _)) + EXPECT_CALL(mock_io, WriteDataToFile_t(_, _, _, _, _)) .WillOnce(Return(nullptr)); EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("saved file"), diff --git a/tests/automatic/producer/file_monitor_producer/CMakeLists.txt b/tests/automatic/producer/file_monitor_producer/CMakeLists.txt index cdaa002d1..09da2f2ab 100644 --- a/tests/automatic/producer/file_monitor_producer/CMakeLists.txt +++ b/tests/automatic/producer/file_monitor_producer/CMakeLists.txt @@ -3,4 +3,5 @@ set(TARGET_NAME file-monitor-producer) ################################ # Testing ################################ +configure_file(test.json test.json COPYONLY) add_script_test("${TARGET_NAME}" "$<TARGET_FILE:event-monitor-producer-bin>" nomem) diff --git a/tests/automatic/producer/file_monitor_producer/check_linux.sh b/tests/automatic/producer/file_monitor_producer/check_linux.sh index e6ffdd307..9428e35b2 100644 --- a/tests/automatic/producer/file_monitor_producer/check_linux.sh +++ b/tests/automatic/producer/file_monitor_producer/check_linux.sh @@ -5,32 +5,32 @@ set -e trap Cleanup EXIT Cleanup() { + set +e echo cleanup - rm -rf test_in test_out #output + rm -rf /tmp/test_in /tmp/test_out output kill -9 $producer_id &>/dev/null } -mkdir -p test_in test_out +mkdir -p /tmp/test_in/test1 /tmp/test_in/test2 /tmp/test_out $1 test.json &> output & producer_id=`echo $!` -sleep 0.1 +sleep 0.5 -echo test1 > test_in/test1.dat -echo test2 > test_in/test2.tmp -#cp test_in/test2.tmp test_in/test4.dat -mkdir test_in/subdir -echo test3 > test_in/subdir/test3.dat +echo test1 > /tmp/test_in/test1/test1.dat +echo test2 > /tmp/test_in/test2/test2.tmp +mkdir -p /tmp/test_in/test2/subdir +echo test3 > /tmp/test_in/test2/subdir/test3.dat sleep 0.1 -cat test_out/test1.dat | grep test1 -cat test_out/subdir/test3.dat | grep test3 +cat /tmp/test_out/test1/test1.dat | grep test1 +cat /tmp/test_out/test2/subdir/test3.dat | grep test3 -test ! -e test_out/test2.tmp +test ! -e /tmp/test_out/test2/test2.tmp kill -s INT $producer_id sleep 0.5 cat output -cat output | grep processed 2 +cat output | grep "Processed 2" diff --git a/tests/automatic/producer/file_monitor_producer/test.json b/tests/automatic/producer/file_monitor_producer/test.json new file mode 100644 index 000000000..fe468f96b --- /dev/null +++ b/tests/automatic/producer/file_monitor_producer/test.json @@ -0,0 +1,11 @@ +{ + "AsapoEndpoint":"/tmp/test_out", + "Tag":"test_tag", + "BeamtimeID":"asapo_test", + "Mode":"filesystem", + "NThreads":1, + "LogLevel":"debug", + "RootMonitoredFolder":"/tmp/test_in", + "MonitoredSubFolders":["test1","test2"], + "IgnoreExtentions":["tmp"] +} diff --git a/tests/automatic/system_io/read_file_content/CMakeLists.txt b/tests/automatic/system_io/read_file_content/CMakeLists.txt index c65eb4537..ca5a9e0de 100644 --- a/tests/automatic/system_io/read_file_content/CMakeLists.txt +++ b/tests/automatic/system_io/read_file_content/CMakeLists.txt @@ -16,6 +16,7 @@ set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) add_test_setup_cleanup(${TARGET_NAME}) add_integration_test(${TARGET_NAME} readfile "test/1 123") +add_integration_test(${TARGET_NAME} readfile_unkown_size "test/2 unknown_size") add_integration_test(${TARGET_NAME} filenotfound "test_notexist Nosuchfileordirectory:test_notexist") add_integration_test(${TARGET_NAME} filenoaccess "file_noaccess Permissiondenied:file_noaccess") diff --git a/tests/automatic/system_io/read_file_content/read_file_content.cpp b/tests/automatic/system_io/read_file_content/read_file_content.cpp index ec5fdae9a..8e495e722 100644 --- a/tests/automatic/system_io/read_file_content/read_file_content.cpp +++ b/tests/automatic/system_io/read_file_content/read_file_content.cpp @@ -14,7 +14,14 @@ int main(int argc, char* argv[]) { asapo::Error err; auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; - auto data = io->GetDataFromFile(argv[1], expect.size(), &err); + asapo::FileData data; + uint64_t size = 0; + if (expect == "unknown_size") { + data = io->GetDataFromFile(argv[1], &size, &err); + } else { + size = expect.size(); + data = io->GetDataFromFile(argv[1], &size, &err); + } std::string result; diff --git a/tests/automatic/system_io/read_file_content/setup_linux.sh b/tests/automatic/system_io/read_file_content/setup_linux.sh index 032de0f55..bc1a7a306 100644 --- a/tests/automatic/system_io/read_file_content/setup_linux.sh +++ b/tests/automatic/system_io/read_file_content/setup_linux.sh @@ -2,6 +2,7 @@ mkdir -p test echo 123 > test/1 +echo unknown_size > test/2 touch file_noaccess chmod -rx file_noaccess diff --git a/tests/automatic/system_io/read_file_content/setup_windows.bat b/tests/automatic/system_io/read_file_content/setup_windows.bat index c28945b0a..0bceacea0 100644 --- a/tests/automatic/system_io/read_file_content/setup_windows.bat +++ b/tests/automatic/system_io/read_file_content/setup_windows.bat @@ -1,6 +1,7 @@ mkdir test echo 123 > test/1 +echo unknown_size > test/2 type nul > file_noaccess icacls file_noaccess /deny users:D diff --git a/tests/automatic/system_io/write_data_to_file/CMakeLists.txt b/tests/automatic/system_io/write_data_to_file/CMakeLists.txt index 8a576706d..dcb279d36 100644 --- a/tests/automatic/system_io/write_data_to_file/CMakeLists.txt +++ b/tests/automatic/system_io/write_data_to_file/CMakeLists.txt @@ -15,6 +15,7 @@ set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) ################################ add_test_setup_cleanup(${TARGET_NAME}) add_integration_test(${TARGET_NAME} writeok "test_file ok dummy" nomem) +add_integration_test(${TARGET_NAME} writewithfolder "folder/a/b/c/d/test_file ok dummy" nomem) add_integration_test(${TARGET_NAME} writetwice "test_file ok dummy" nomem) add_integration_test(${TARGET_NAME} dirnoaccess "test_noaccess/test_file error Permissiondenied:test_noaccess/test_file" nomem) diff --git a/tests/automatic/system_io/write_data_to_file/cleanup_linux.sh b/tests/automatic/system_io/write_data_to_file/cleanup_linux.sh index 1a50a8232..d1cae7ea7 100644 --- a/tests/automatic/system_io/write_data_to_file/cleanup_linux.sh +++ b/tests/automatic/system_io/write_data_to_file/cleanup_linux.sh @@ -2,4 +2,5 @@ rm -f test_file rmdir test_noaccess +rm -rf folder diff --git a/tests/automatic/system_io/write_data_to_file/cleanup_windows.bat b/tests/automatic/system_io/write_data_to_file/cleanup_windows.bat index 256da63ad..40f45c735 100644 --- a/tests/automatic/system_io/write_data_to_file/cleanup_windows.bat +++ b/tests/automatic/system_io/write_data_to_file/cleanup_windows.bat @@ -2,3 +2,4 @@ del test_file icacls test_noaccess /grant:r users:W rmdir /S /Q test_noaccess +rmdir /S /Q folder \ No newline at end of file diff --git a/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp b/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp index add455cd3..b629bc158 100644 --- a/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp +++ b/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp @@ -12,7 +12,7 @@ struct Params { std::string fname; std::string result; std::string message; - int length; + uint64_t length; }; Params GetParams(int argc, char* argv[]) { @@ -34,7 +34,8 @@ void AssertGoodResult(const std::unique_ptr<IO>& io, const Error& err, const Fil exit(EXIT_FAILURE); } Error read_err; - auto read_data = io->GetDataFromFile(params.fname, params.length, &read_err); + uint64_t size = params.length; + auto read_data = io->GetDataFromFile(params.fname, &size, &read_err); asapo::M_AssertContains(std::string(read_data.get(), read_data.get() + params.length), "123"); } @@ -51,13 +52,10 @@ int main(int argc, char* argv[]) { auto params = GetParams(argc, argv); auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; - auto array = new uint8_t[params.length]; - array[0] = '1'; - array[1] = '2'; - array[2] = '3'; + auto array = new uint8_t[params.length] {'1', '2', '3'}; FileData data{array}; - auto err = io->WriteDataToFile(params.fname, data, params.length); + auto err = io->WriteDataToFile("", params.fname, data, params.length, true); if (params.result == "ok") { AssertGoodResult(io, err, data, params); diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index 35a4fb096..8115ffaa9 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -55,7 +55,7 @@ Error FolderDataBroker::GetFileByIndex(uint64_t nfile_to_get, FileInfo* info, Fi } Error error; - *data = io__->GetDataFromFile(info->FullName(base_path_), info->size, &error); + *data = io__->GetDataFromFile(info->FullName(base_path_), &info->size, &error); return error; } diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index aff8b3dcf..c555473c4 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -173,7 +173,7 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, FileInfo* } Error error; - *data = io__->GetDataFromFile(info->FullName(""), info->size, &error); + *data = io__->GetDataFromFile(info->FullName(""), &info->size, &error); return error; } diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index 208c0c23d..8823ba87e 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -82,7 +82,7 @@ class IOEmptyFolder: public FakeIO { class IOCannotOpenFile: public FakeIO { public: - FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const noexcept override { + FileData GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const noexcept override { *err = asapo::IOErrorTemplates::kPermissionDenied.Generate(); return {}; }; diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 14538967a..24e7e312a 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -300,7 +300,7 @@ TEST_F(ServerDataBrokerTests, GetImageCallsReadFromFile) { auto json = to_send.Json(); MockGet(json); - EXPECT_CALL(mock_io, GetDataFromFile_t("name", 100, _)). + EXPECT_CALL(mock_io, GetDataFromFile_t("name", testing::Pointee(100), _)). WillOnce(DoAll(SetArgPointee<2>(new asapo::SimpleError{"s"}), testing::Return(nullptr))); FileData data; -- GitLab