diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h index dab7dd358488816191351df2561e20fe98afcb89..c0065769e47ea3e08a9d4e84164e6e6e44a0438f 100644 --- a/common/cpp/include/common/data_structs.h +++ b/common/cpp/include/common/data_structs.h @@ -28,8 +28,8 @@ inline bool operator==(const FileInfo& lhs, const FileInfo& rhs) { } - using FileData = std::unique_ptr<uint8_t[]>; + using FileInfos = std::vector<FileInfo>; using SubDirList = std::vector<std::string>; diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index 4af8883a76eb2f223268c77d4ebf1c73c611dca9..4a0ee3ee54df8e2b750e52a1b387055904bab7d1 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -14,17 +14,17 @@ namespace asapo { //Need to be "enum" since multiple flags are allowed -enum FileOpenMode { - IO_OPEN_MODE_READ = 1 << 0, +enum FileOpenMode : unsigned short { + IO_OPEN_MODE_READ = 1, IO_OPEN_MODE_WRITE = 1 << 1, - IO_OPEN_MODE_RW = IO_OPEN_MODE_READ | IO_OPEN_MODE_WRITE, - IO_OPEN_MODE_CREATE = 1 << 2, - IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS = 1 << 3, + IO_OPEN_MODE_RW = 1 << 2, + IO_OPEN_MODE_CREATE = 1 << 3, + IO_OPEN_MODE_CREATE_AND_FAIL_IF_EXISTS = 1 << 4, /** * Will set the length of a file to 0 * Only works if file is open with READ and WRITE mode */ - IO_OPEN_MODE_SET_LENGTH_0 = 1 << 4, + IO_OPEN_MODE_SET_LENGTH_0 = 1 << 5, }; enum class AddressFamilies { @@ -91,11 +91,13 @@ class IO { virtual size_t Read (FileDescriptor fd, void* buf, size_t length, Error* err) const = 0; virtual size_t Write (FileDescriptor fd, const void* buf, size_t length, Error* err) const = 0; - virtual Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const = 0; - virtual Error WriteDataToFile (const std::string& fname, const uint8_t* data, size_t length) const = 0; + virtual Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const FileData& data, + size_t length, bool create_directories) const = 0; + virtual Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const uint8_t* data, + size_t length, bool create_directories) const = 0; virtual void CreateNewDirectory (const std::string& directory_name, Error* err) const = 0; - virtual FileData GetDataFromFile (const std::string& fname, uint64_t fsize, Error* err) const = 0; + virtual FileData GetDataFromFile (const std::string& fname, uint64_t* fsize, Error* err) const = 0; virtual SubDirList GetSubDirectories(const std::string& path, Error* err) const = 0; virtual std::vector<FileInfo> FilesInFolder (const std::string& folder, Error* err) const = 0; virtual std::string ReadFileToString (const std::string& fname, Error* err) const = 0; diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index 0a7b826e1e98d6d3a063aa94bc07931b2424da0a..b88faa5e36ef1f04557a46b65e9b27cee7ecb1d7 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -164,26 +164,29 @@ class MockIO : public IO { } MOCK_CONST_METHOD2(CreateNewDirectory_t, void(const std::string& directory_name, ErrorInterface** err)); - FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const override { + FileData GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const override { ErrorInterface* error = nullptr; auto data = GetDataFromFile_t(fname, fsize, &error); err->reset(error); return FileData(data); } - MOCK_CONST_METHOD3(GetDataFromFile_t, uint8_t* (const std::string& fname, uint64_t fsize, ErrorInterface** err)); + MOCK_CONST_METHOD3(GetDataFromFile_t, uint8_t* (const std::string& fname, uint64_t* fsize, ErrorInterface** err)); - Error WriteDataToFile(const std::string& fname, const FileData& data, size_t length) const override { - return Error{WriteDataToFile_t(fname, data.get(), length)}; + Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const FileData& data, + size_t length, bool create_directories) const override { + return Error{WriteDataToFile_t(root_folder, fname, data.get(), length, create_directories)}; } - Error WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const override { - return Error{WriteDataToFile_t(fname, data, length)}; + Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, + size_t length, bool create_directories) const override { + return Error{WriteDataToFile_t(root_folder, fname, data, length, create_directories)}; } - MOCK_CONST_METHOD3(WriteDataToFile_t, ErrorInterface * (const std::string& fname, const uint8_t* data, size_t fsize)); + MOCK_CONST_METHOD5(WriteDataToFile_t, ErrorInterface * (const std::string& root_folder, const std::string& fname, + const uint8_t* data, size_t fsize, bool create_directories)); std::vector<FileInfo> FilesInFolder(const std::string& folder, Error* err) const override { ErrorInterface* error = nullptr; diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index 9cf61980748436621882ad2859b8c697bd03d0b2..306451a16ea2ebe9eea6053e392f617fb4b4e723 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -19,7 +19,7 @@ #endif #include "system_io.h" - +#include "preprocessor/definitions.h" namespace asapo { @@ -79,19 +79,28 @@ uint8_t* AllocateArray(uint64_t fsize, Error* err) { // PRIVATE FUNCTIONS - END -FileData SystemIO::GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const { +FileData SystemIO::GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const { + + if (*fsize == 0 && !fname.empty()) { + auto info = GetFileInfo(fname, err); + if (*err != nullptr) { + return nullptr; + } + *fsize = info.size; + } + *err = nullptr; auto fd = Open(fname, IO_OPEN_MODE_READ, err); if (*err != nullptr) { return nullptr; } - auto data_array = AllocateArray(fsize, err); + auto data_array = AllocateArray(*fsize, err); if (*err != nullptr) { return nullptr; } - Read(fd, data_array, fsize, err); + Read(fd, data_array, *fsize, err); if (*err != nullptr) { (*err)->Append(fname); Close(fd, nullptr); @@ -132,9 +141,28 @@ void asapo::SystemIO::CreateNewDirectory(const std::string& directory_name, Erro } } -Error SystemIO::WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const { +Error SystemIO::WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, + size_t length, bool create_directories) const { + std::string full_name; + if (!root_folder.empty()) { + full_name = root_folder + kPathSeparator + fname; + } else { + full_name = fname; + } Error err; - auto fd = Open(fname, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW, &err); + auto fd = Open(full_name, IO_OPEN_MODE_CREATE | IO_OPEN_MODE_RW, &err); + if (err == IOErrorTemplates::kFileNotFound && create_directories) { + size_t pos = fname.rfind(kPathSeparator); + if (pos == std::string::npos) { + return IOErrorTemplates::kFileNotFound.Generate(); + } + auto err_create = CreateDirectoryWithParents(root_folder, fname.substr(0, pos)); + if (err_create) { + return err_create; + } + return WriteDataToFile(root_folder, fname, data, length, false); + } + if (err) { return err; } @@ -149,23 +177,21 @@ Error SystemIO::WriteDataToFile(const std::string& fname, const uint8_t* data, s } -Error SystemIO::WriteDataToFile(const std::string& fname, const FileData& data, size_t length) const { - return WriteDataToFile(fname, data.get(), length); +Error SystemIO::WriteDataToFile(const std::string& root_folder, const std::string& fname, const FileData& data, + size_t length, bool create_directories) const { + return WriteDataToFile(root_folder, fname, data.get(), length, create_directories); } std::string SystemIO::ReadFileToString(const std::string& fname, Error* err) const { - auto info = GetFileInfo(fname, err); - if (*err != nullptr) { - return ""; - } - auto data = GetDataFromFile(fname, info.size, err); + uint64_t size = 0; + auto data = GetDataFromFile(fname, &size, err); if (*err != nullptr) { return ""; } - return std::string(reinterpret_cast<const char*>(data.get()), info.size); + return std::string(reinterpret_cast<const char*>(data.get()), size); } @@ -218,7 +244,7 @@ asapo::FileDescriptor asapo::SystemIO::CreateAndConnectIPTCPSocket(const std::st int SystemIO::FileOpenModeToPosixFileOpenMode(int open_flags) const { int flags = 0; - if((open_flags & IO_OPEN_MODE_READ && open_flags & IO_OPEN_MODE_WRITE) || open_flags & IO_OPEN_MODE_RW) { + if(((open_flags & IO_OPEN_MODE_READ) && (open_flags & IO_OPEN_MODE_WRITE)) || (open_flags & IO_OPEN_MODE_RW)) { flags |= O_RDWR; } else { if (open_flags & IO_OPEN_MODE_READ) { @@ -554,4 +580,25 @@ size_t SystemIO::Transfer(ssize_t (* method)(FileDescriptor, void*, size_t), Fil return already_transferred; } +Error SystemIO::CreateDirectoryWithParents(const std::string& root_path, const std::string& path) const { + for( std::string::const_iterator iter = path.begin() ; iter != path.end(); ) { + iter = std::find( iter, path.end(), kPathSeparator ); + std::string new_path; + if (root_path.empty()) { + new_path = std::string( path.begin(), iter); + } else { + new_path = root_path + kPathSeparator + std::string(path.begin(), iter); + } + Error err; + CreateNewDirectory(new_path, &err); + if (err && err != IOErrorTemplates::kFileAlreadyExists) { + return err; + } + if (iter != path.end()) { + ++iter; + } + } + return nullptr; +} + } diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index 156a25b484c9b4cda95c3b1c36a3c2c80133e807..b368c43c199d1d84c80abee0aa818f625d8539da 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -64,6 +64,7 @@ class SystemIO final : public IO { void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, Error* err) const; void GetSubDirectoriesRecursively(const std::string& path, SubDirList* subdirs, Error* err) const; + Error CreateDirectoryWithParents(const std::string& root_path, const std::string& path) const; public: /* * Special @@ -102,9 +103,11 @@ class SystemIO final : public IO { size_t Read(FileDescriptor fd, void* buf, size_t length, Error* err) const override; size_t Write(FileDescriptor fd, const void* buf, size_t length, Error* err) const override; void CreateNewDirectory(const std::string& directory_name, Error* err) const override; - FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const override; - Error WriteDataToFile (const std::string& fname, const FileData& data, size_t length) const override; - Error WriteDataToFile(const std::string& fname, const uint8_t* data, size_t length) const override; + FileData GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const override; + Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const FileData& data, + size_t length, bool create_directories) const override; + Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, + size_t length, bool create_directories) const override; SubDirList GetSubDirectories(const std::string& path, Error* err) const override; std::string ReadFileToString(const std::string& fname, Error* err) const override; }; diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index 2c8c4f2fd592302382882909bb3064fd78d2ee74..aec527a91d55bdb13f0d55e3f02b604e891fed3e 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -81,7 +81,7 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it for(uint64_t i = 0; i < iterations; i++) { auto buffer = CreateMemoryBuffer(number_of_byte); asapo::EventHeader event_header{i + 1, number_of_byte, std::to_string(i)}; - auto err = producer->Send(event_header, std::move(buffer), &ProcessAfterSend); + auto err = producer->SendData(event_header, std::move(buffer), &ProcessAfterSend); if (err) { std::cerr << "Cannot send file: " << err << std::endl; return false; diff --git a/producer/api/CMakeLists.txt b/producer/api/CMakeLists.txt index e30da7fec7b71975b21b5cf774900ee60819eac2..ca21d6cf8290029834d5544489a93006b31d65bc 100644 --- a/producer/api/CMakeLists.txt +++ b/producer/api/CMakeLists.txt @@ -8,7 +8,7 @@ set(SOURCE_FILES src/request_pool.cpp src/receiver_discovery_service.cpp src/request_handler_factory.cpp - src/request.cpp include/producer/common.h) + src/request.cpp) ################################ diff --git a/producer/api/include/producer/producer.h b/producer/api/include/producer/producer.h index ecded5dd8c3bfb3d73f9a0ff49aaf515b187cc37..6332739d04860a383f0d957e22c21bb40b9c58c1 100644 --- a/producer/api/include/producer/producer.h +++ b/producer/api/include/producer/producer.h @@ -11,6 +11,7 @@ namespace asapo { + class Producer { public: //! Creates a new producer @@ -25,12 +26,18 @@ class Producer { //! Sends data to the receiver /*! - \param file_id - The id of the file. An error will be returned if this file id already exists on the receiver. + \param event_header - A stucture with the meta information (file name, size). \param data - A pointer to the data to send - \param file_size - The size of the data. \return Error - Will be nullptr on success */ - virtual Error Send(const EventHeader& event_header, FileData data, RequestCallback callback) = 0; + virtual Error SendData(const EventHeader& event_header, FileData data, RequestCallback callback) = 0; + //! Sends files to the receiver + /*! + \param event_header - A stucture with the meta information (file name, size is ignored). + \param file name - A full path of the file to send + \return Error - Will be nullptr on success + */ + virtual Error SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) = 0; //! Set internal log level virtual void SetLogLevel(LogLevel level) = 0; diff --git a/producer/api/src/producer_impl.cpp b/producer/api/src/producer_impl.cpp index 80c09055b25e0296115a65e977994edaf39979d6..2f1a700d6c5ce2786d438088ba3851686cd190b5 100644 --- a/producer/api/src/producer_impl.cpp +++ b/producer/api/src/producer_impl.cpp @@ -45,23 +45,36 @@ Error CheckProducerRequest(size_t file_size, size_t filename_size) { return nullptr; } - -Error ProducerImpl::Send(const EventHeader& event_header, FileData data, +Error ProducerImpl::Send(const EventHeader& event_header, + FileData data, + std::string full_path, RequestCallback callback) { - auto err = CheckProducerRequest(event_header.file_size, event_header.file_name.size()); if (err) { log__->Error("error checking request - " + err->Explain()); return err; } - auto request_header = GenerateNextSendRequest(event_header.file_id, event_header.file_size, event_header.file_name); + return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{beamtime_id_, request_header, + std::move(data), std::move(full_path), callback} + }); + +} + - return request_pool__->AddRequest(std::unique_ptr<Request> {new Request{beamtime_id_, request_header, std::move(data), callback}}); +Error ProducerImpl::SendData(const EventHeader& event_header, FileData data, + RequestCallback callback) { + return Send(event_header, std::move(data), "", callback); } + +Error ProducerImpl::SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) { + return Send(event_header, nullptr, std::move(full_path), callback); +} + + void ProducerImpl::SetLogLevel(LogLevel level) { log__->SetLogLevel(level); } diff --git a/producer/api/src/producer_impl.h b/producer/api/src/producer_impl.h index 55d6e7bbbb72d04badbf3622d00db1024f58fc0c..da104f95c0ceca02e674bb573e2c13dfa603282e 100644 --- a/producer/api/src/producer_impl.h +++ b/producer/api/src/producer_impl.h @@ -28,12 +28,13 @@ class ProducerImpl : public Producer { void SetLogLevel(LogLevel level) override; void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; - Error Send(const EventHeader& event_header, FileData data, RequestCallback callback) override; + Error SendData(const EventHeader& event_header, FileData data, RequestCallback callback) override; + Error SendFile(const EventHeader& event_header, std::string full_path, RequestCallback callback) override; AbstractLogger* log__; std::unique_ptr<RequestPool> request_pool__; Error SetBeamtimeId(std::string beamtime_id) override; - private: + Error Send(const EventHeader& event_header, FileData data, std::string full_path, RequestCallback callback); GenericRequestHeader GenerateNextSendRequest(uint64_t file_id, uint64_t file_size, std::string file_name); std::string beamtime_id_; }; diff --git a/producer/api/src/request.cpp b/producer/api/src/request.cpp index 2b671dfc4fd7e04854bd0f1709b1e4ede22123c9..faae53e91368730a16b18706ff4199287319f7e6 100644 --- a/producer/api/src/request.cpp +++ b/producer/api/src/request.cpp @@ -1 +1,14 @@ #include "request.h" + +namespace asapo { + +Error Request::ReadDataFromFileIfNeeded(const IO* io) { + if (data != nullptr || original_filepath.empty()) { + return nullptr; + } + Error err; + data = io->GetDataFromFile(original_filepath, &header.data_size, &err); + return err; +} + +} \ No newline at end of file diff --git a/producer/api/src/request.h b/producer/api/src/request.h index b522385a32bae06f7551cb8ea1bdd4a68790c3c1..3b5fe21c1a9cc3e9707765a90f3332423f465b8d 100644 --- a/producer/api/src/request.h +++ b/producer/api/src/request.h @@ -4,6 +4,7 @@ #include "common/networking.h" #include "producer/common.h" #include "common/data_structs.h" +#include "io/io.h" namespace asapo { @@ -11,7 +12,9 @@ struct Request { std::string beamtime_id; GenericRequestHeader header; FileData data; + std::string original_filepath; RequestCallback callback; + Error ReadDataFromFileIfNeeded(const IO* io); }; } diff --git a/producer/api/src/request_handler.h b/producer/api/src/request_handler.h index e99bbd284e57c5ff3f5dab18d49778cc79e05d92..64b39ced568e954c08a9d44d42eb0e1b065a71a5 100644 --- a/producer/api/src/request_handler.h +++ b/producer/api/src/request_handler.h @@ -12,7 +12,7 @@ class RequestHandler { public: virtual void PrepareProcessingRequestLocked() = 0; virtual void TearDownProcessingRequestLocked(const Error& error_from_process) = 0; - virtual Error ProcessRequestUnlocked(const Request* request) = 0; + virtual Error ProcessRequestUnlocked(Request* request) = 0; virtual bool ReadyProcessRequest() = 0; virtual ~RequestHandler() = default; }; diff --git a/producer/api/src/request_handler_filesystem.cpp b/producer/api/src/request_handler_filesystem.cpp index e557c2645c9d1426cefc19f7a00132a53e2389e5..cca62163b7ad6ab746301f5e36598af57b5a5f14 100644 --- a/producer/api/src/request_handler_filesystem.cpp +++ b/producer/api/src/request_handler_filesystem.cpp @@ -13,9 +13,14 @@ RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folde } -Error RequestHandlerFilesystem::ProcessRequestUnlocked(const Request* request) { - std::string fullpath = destination_folder_ + "/" + request->header.message + ".bin"; - auto err = io__->WriteDataToFile(fullpath, (uint8_t*)request->data.get(), request->header.data_size); +Error RequestHandlerFilesystem::ProcessRequestUnlocked(Request* request) { + auto err = request->ReadDataFromFileIfNeeded(io__.get()); + if (err) { + return err; + } + + err = io__->WriteDataToFile(destination_folder_, request->header.message, (uint8_t*)request->data.get(), + request->header.data_size, true); if (request->callback) { request->callback(request->header, std::move(err)); } diff --git a/producer/api/src/request_handler_filesystem.h b/producer/api/src/request_handler_filesystem.h index ba29a407ed2ef2414ffb60990e14e079122e608f..607816cf86a597df363ddac113da65c38d6e9d51 100644 --- a/producer/api/src/request_handler_filesystem.h +++ b/producer/api/src/request_handler_filesystem.h @@ -17,7 +17,7 @@ namespace asapo { class RequestHandlerFilesystem: public RequestHandler { public: explicit RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id); - Error ProcessRequestUnlocked(const Request* request) override; + Error ProcessRequestUnlocked(Request* request) override; bool ReadyProcessRequest() override { return true; }; diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index e82af41d319bb6f132913065650da6cf8ac03348..f5e271da1b0fd1c6ac07a7272e45601dd3da90c3 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -158,10 +158,7 @@ bool RequestHandlerTcp::ServerError(const Error& err) { return err != nullptr && err != ProducerErrorTemplates::kFileIdAlreadyInUse; } -Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { - if (NeedRebalance()) { - CloseConnectionToPeformRebalance(); - } +Error RequestHandlerTcp::SendDataToOneOfTheReceivers(Request* request) { for (auto receiver_uri : receivers_list_) { if (Disconnected()) { auto err = ConnectToReceiver(request->beamtime_id, receiver_uri); @@ -184,6 +181,19 @@ Error RequestHandlerTcp::ProcessRequestUnlocked(const Request* request) { return ProducerErrorTemplates::kCannotSendDataToReceivers.Generate(); } + +Error RequestHandlerTcp::ProcessRequestUnlocked(Request* request) { + auto err = request->ReadDataFromFileIfNeeded(io__.get()); + if (err) { + return err; + } + + if (NeedRebalance()) { + CloseConnectionToPeformRebalance(); + } + return SendDataToOneOfTheReceivers(request); +} + bool RequestHandlerTcp::Connected() { return sd_ != kDisconnectedSocketDescriptor; } diff --git a/producer/api/src/request_handler_tcp.h b/producer/api/src/request_handler_tcp.h index c89b282a775351f50152468a9fe9b475fd1692e8..dec7f4d4ce678f150eb99b427c85b7b6c87b6aa7 100644 --- a/producer/api/src/request_handler_tcp.h +++ b/producer/api/src/request_handler_tcp.h @@ -19,7 +19,7 @@ namespace asapo { class RequestHandlerTcp: public RequestHandler { public: explicit RequestHandlerTcp(ReceiverDiscoveryService* discovery_service, uint64_t thread_id, uint64_t* shared_counter); - Error ProcessRequestUnlocked(const Request* request) override; + Error ProcessRequestUnlocked(Request* request) override; bool ReadyProcessRequest() override; void PrepareProcessingRequestLocked() override; void TearDownProcessingRequestLocked(const Error& error_from_process) override; @@ -31,6 +31,7 @@ class RequestHandlerTcp: public RequestHandler { private: Error Authorize(const std::string& beamtime_id); Error ConnectToReceiver(const std::string& beamtime_id, const std::string& receiver_address); + Error SendDataToOneOfTheReceivers(Request* request); Error SendHeaderAndData(const Request*); Error ReceiveResponse(); Error TrySendToReceiver(const Request* request); diff --git a/producer/api/unittests/mocking.h b/producer/api/unittests/mocking.h index 7d231cc2afa4b359d13f495f8b943071af1d2646..0ccb688c936a411f73e73399533c9831074a8f99 100644 --- a/producer/api/unittests/mocking.h +++ b/producer/api/unittests/mocking.h @@ -40,7 +40,7 @@ class MockRequestPull : public RequestPool { class MockRequestHandler : public RequestHandler { public: - Error ProcessRequestUnlocked(const Request* request) override { + Error ProcessRequestUnlocked(Request* request) override { return Error{ProcessRequestUnlocked_t(request)}; } void TearDownProcessingRequestLocked(const Error& error_from_process) override { diff --git a/producer/api/unittests/test_producer.cpp b/producer/api/unittests/test_producer.cpp index ee69cb8eb8476b6a0bce7606e639a260d424925b..fa99a9b790c8634a5bd7f305c8d72fe2777c918d 100644 --- a/producer/api/unittests/test_producer.cpp +++ b/producer/api/unittests/test_producer.cpp @@ -51,7 +51,7 @@ TEST(Producer, SimpleWorkflowWihoutConnection) { &err); asapo::EventHeader event_header{1, 1, ""}; - auto err_send = producer->Send(event_header, nullptr, nullptr); + auto err_send = producer->SendData(event_header, nullptr, nullptr); std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_THAT(producer, Ne(nullptr)); diff --git a/producer/api/unittests/test_producer_impl.cpp b/producer/api/unittests/test_producer_impl.cpp index 16824cf7e5d1e5cc29fd63d53ed2fbb06e561393..0d922f238f32dc0720eccbecc3bb68f1809eb012 100644 --- a/producer/api/unittests/test_producer_impl.cpp +++ b/producer/api/unittests/test_producer_impl.cpp @@ -34,7 +34,7 @@ MATCHER_P5(M_CheckSendDataRequest, op_code, beamtime_id, file_id, file_size, mes "Checks if a valid GenericRequestHeader was Send") { return ((asapo::GenericRequestHeader)(arg->header)).op_code == op_code && ((asapo::GenericRequestHeader)(arg->header)).data_id == file_id - && ((asapo::GenericRequestHeader)(arg->header)).data_size == file_size + && ((asapo::GenericRequestHeader)(arg->header)).data_size == uint(file_size) && arg->beamtime_id == beamtime_id && strcmp(((asapo::GenericRequestHeader)(arg->header)).message, message) == 0; } @@ -66,14 +66,14 @@ TEST_F(ProducerImplTests, SendReturnsError) { EXPECT_CALL(mock_pull, AddRequest_t(_)).WillOnce(Return( asapo::ProducerErrorTemplates::kRequestPoolIsFull.Generate().release())); asapo::EventHeader event_header{1, 1, ""}; - auto err = producer.Send(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kRequestPoolIsFull)); } TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { std::string long_string(asapo::kMaxMessageSize + 100, 'a'); asapo::EventHeader event_header{1, 1, long_string}; - auto err = producer.Send(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileNameTooLong)); } @@ -81,26 +81,47 @@ TEST_F(ProducerImplTests, ErrorIfFileNameTooLong) { TEST_F(ProducerImplTests, ErrorIfSizeTooLarge) { EXPECT_CALL(mock_logger, Error(testing::HasSubstr("error checking"))); asapo::EventHeader event_header{1, asapo::ProducerImpl::kMaxChunkSize + 1, ""}; - auto err = producer.Send(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, nullptr); ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kFileTooLarge)); } -TEST_F(ProducerImplTests, OKSendingRequest) { +TEST_F(ProducerImplTests, OKSendingSendDataRequest) { uint64_t expected_size = 100; uint64_t expected_id = 10; char expected_name[asapo::kMaxMessageSize] = "test_name"; std::string expected_beamtimeid = "beamtime_id"; producer.SetBeamtimeId(expected_beamtimeid); - Request request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, nullptr, nullptr}; + Request request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, expected_size, expected_name}, + nullptr, "", nullptr}; EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, expected_beamtimeid, expected_id, expected_size, expected_name))).WillOnce(Return( nullptr)); asapo::EventHeader event_header{expected_id, expected_size, expected_name}; - auto err = producer.Send(event_header, nullptr, nullptr); + auto err = producer.SendData(event_header, nullptr, nullptr); + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(ProducerImplTests, OKSendingSendFileRequest) { + uint64_t expected_id = 10; + char expected_name[asapo::kMaxMessageSize] = "test_name"; + std::string expected_beamtimeid = "beamtime_id"; + std::string expected_fullpath = "filename"; + + producer.SetBeamtimeId(expected_beamtimeid); + Request request{"", asapo::GenericRequestHeader{asapo::kOpcodeTransferData, expected_id, 0, expected_name}, + nullptr, "", nullptr}; + + EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendDataRequest(asapo::kOpcodeTransferData, + expected_beamtimeid, expected_id, 0, expected_name))).WillOnce(Return( + nullptr)); + + asapo::EventHeader event_header{expected_id, 0, expected_name}; + auto err = producer.SendFile(event_header, expected_fullpath, nullptr); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/producer/api/unittests/test_receiver_discovery_service.cpp b/producer/api/unittests/test_receiver_discovery_service.cpp index c31136152534e2d352c58a46d6642b73352d7ffa..ddf55532fccb3dde4cf93292b9c57d677457502f 100644 --- a/producer/api/unittests/test_receiver_discovery_service.cpp +++ b/producer/api/unittests/test_receiver_discovery_service.cpp @@ -115,7 +115,7 @@ TEST_F(ReceiversStatusTests, GetsReqestedInformation) { )); status.StartCollectingData(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(30)); auto nc = status.MaxConnections(); ASSERT_THAT(nc, Eq(8)); diff --git a/producer/api/unittests/test_request_handler_filesystem.cpp b/producer/api/unittests/test_request_handler_filesystem.cpp index 6082d7dc93fd7c233e949dfb2282e9aeb86e64b1..a87a585063dbaa69326f8b9c2a9cd74d449dd211 100644 --- a/producer/api/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/unittests/test_request_handler_filesystem.cpp @@ -45,19 +45,23 @@ class RequestHandlerFilesystemTests : public testing::Test { std::string expected_file_name = "test_name"; uint64_t expected_thread_id = 2; std::string expected_destination = "destination"; - std::string expected_fullpath = expected_destination + "/" + expected_file_name + ".bin"; + std::string expected_fullpath = expected_destination + "/" + expected_file_name; + std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; + asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{"", header, nullptr, [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::Request request{"", header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::Request request_nocallback{"", header, nullptr, nullptr}; + asapo::Request request_nocallback{"", header, nullptr, "", nullptr}; + asapo::Request request_filesend{"", header, nullptr, expected_origin_fullpath, nullptr}; + testing::NiceMock<asapo::MockLogger> mock_logger; asapo::RequestHandlerFilesystem request_handler{expected_destination, expected_thread_id}; @@ -84,7 +88,7 @@ MATCHER_P2(M_CheckSendDataRequest, file_id, file_size, } TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, nullptr, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_destination, expected_file_name, nullptr, expected_file_size, true)) .WillOnce( Return( asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) @@ -99,10 +103,9 @@ TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) { } TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, nullptr, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_destination, expected_file_name, nullptr, expected_file_size, true)) .WillOnce( - Return( - asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) + Return(nullptr) ); @@ -113,9 +116,41 @@ TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) { } +TEST_F(RequestHandlerFilesystemTests, FileRequestErrorOnReadData) { + + EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<2>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(nullptr) + )); + + auto err = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); +} + +TEST_F(RequestHandlerFilesystemTests, FileRequestOK) { + + EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<2>(nullptr), + Return(nullptr) + )); + + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_destination, expected_file_name, nullptr, expected_file_size, true)) + .WillOnce( + Return(nullptr) + ); + + auto err = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(err, Eq(nullptr)); +} + + TEST_F(RequestHandlerFilesystemTests, TransferOK) { - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_fullpath, nullptr, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_destination, expected_file_name, nullptr, expected_file_size, true)) .WillOnce( Return( nullptr) diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 383dd0066f23b9747b1ba5e62c6acbba6fee94ca..e3df453b93a75726a43301284a50f80619102216 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -61,13 +61,17 @@ class RequestHandlerTcpTests : public testing::Test { asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; - asapo::Request request{expected_beamtime_id, header, nullptr, [this](asapo::GenericRequestHeader header, asapo::Error err) { + asapo::Request request{expected_beamtime_id, header, nullptr, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { called = true; callback_err = std::move(err); callback_header = header; }}; - asapo::Request request_nocallback{expected_beamtime_id, header, nullptr, nullptr}; + std::string expected_origin_fullpath = std::string("origin/") + expected_file_name + ".bin"; + asapo::Request request_filesend{expected_beamtime_id, header, nullptr, expected_origin_fullpath, nullptr}; + + + asapo::Request request_nocallback{expected_beamtime_id, header, nullptr, "", nullptr}; testing::NiceMock<asapo::MockLogger> mock_logger; uint64_t n_connections{0}; asapo::RequestHandlerTcp request_handler{&mock_discovery_service, expected_thread_id, &n_connections}; @@ -594,6 +598,46 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) { ASSERT_THAT(called, Eq(false)); } +TEST_F(RequestHandlerTcpTests, FileRequestErrorOnReadData) { + + request_handler.PrepareProcessingRequestLocked(); + + EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<2>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()), + Return(nullptr) + )); + + auto err = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(err, Eq(asapo::IOErrorTemplates::kUnknownIOError)); +} + + + +TEST_F(RequestHandlerTcpTests, FileRequestOK) { + ExpectOKConnect(true); + ExpectOKAuthorize(true); + ExpectOKSendHeader(true); + ExpectOKSendData(true); + ExpectOKReceive(); + + request_handler.PrepareProcessingRequestLocked(); + + EXPECT_CALL(mock_io, GetDataFromFile_t(expected_origin_fullpath, testing::Pointee(expected_file_size), _)) + .WillOnce( + DoAll( + testing::SetArgPointee<2>(nullptr), + Return(nullptr) + )); + + auto err = request_handler.ProcessRequestUnlocked(&request_filesend); + ASSERT_THAT(err, Eq(nullptr)); +} + + + + TEST_F(RequestHandlerTcpTests, SendOK) { ExpectOKConnect(true); ExpectOKAuthorize(true); diff --git a/producer/api/unittests/test_request_pool.cpp b/producer/api/unittests/test_request_pool.cpp index 6ec589c3d949d5fddc5a288b92c37cad683f8d04..aaca6d65bbb4a4939491adc7df11564d28e2eff6 100644 --- a/producer/api/unittests/test_request_pool.cpp +++ b/producer/api/unittests/test_request_pool.cpp @@ -62,7 +62,7 @@ class RequestPoolTests : public testing::Test { MockRequestHandlerFactory request_handler_factory{mock_request_handler}; const uint8_t nthreads = 1; asapo::RequestPool pool {nthreads, &request_handler_factory}; - std::unique_ptr<Request> request{new Request{"", GenericRequestHeader{}, nullptr, nullptr}}; + std::unique_ptr<Request> request{new Request{"", GenericRequestHeader{}, nullptr, "", nullptr}}; void SetUp() override { pool.log__ = &mock_logger; } @@ -118,7 +118,7 @@ TEST_F(RequestPoolTests, AddRequestCallsSend) { TEST_F(RequestPoolTests, AddRequestCallsSendTwoRequests) { - Request* request2 = new Request{"", GenericRequestHeader{}, nullptr, nullptr}; + Request* request2 = new Request{"", GenericRequestHeader{}, nullptr, "", nullptr}; ExpectSend(mock_request_handler, 2); diff --git a/producer/event_monitor_producer/CMakeLists.txt b/producer/event_monitor_producer/CMakeLists.txt index 616cd34b0b88a776ca315b0640cf8441c54df18b..dd3ec147dc90ddb8732c1520169c9f3787da3417 100644 --- a/producer/event_monitor_producer/CMakeLists.txt +++ b/producer/event_monitor_producer/CMakeLists.txt @@ -32,7 +32,7 @@ target_link_libraries(${TARGET_NAME} producer-api) ################################ # Executable ################################ -add_executable(${TARGET_NAME}-bin src/eventmon_main.cpp) +add_executable(${TARGET_NAME}-bin src/main_eventmon.cpp) set_target_properties(${TARGET_NAME}-bin PROPERTIES OUTPUT_NAME asapo-eventmon-producer) target_link_libraries(${TARGET_NAME}-bin ${TARGET_NAME}) @@ -60,7 +60,7 @@ ENDIF(WIN32) set(TEST_LIBRARIES "${TARGET_NAME}") -gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_SOURCE_DIR}/src/eventmon_main.cpp) +gtest(${TARGET_NAME} "${TEST_SOURCE_FILES}" "${TEST_LIBRARIES}" ${CMAKE_CURRENT_SOURCE_DIR}/src/main_eventmon.cpp) install(TARGETS ${TARGET_NAME}-bin DESTINATION bin) diff --git a/producer/event_monitor_producer/src/inotify_event.cpp b/producer/event_monitor_producer/src/inotify_event.cpp index b4fda4ebd9f730a12f23025bb049271499bb1c3f..bcc89e358b96023136195a50391a1d1cf6405cef 100644 --- a/producer/event_monitor_producer/src/inotify_event.cpp +++ b/producer/event_monitor_producer/src/inotify_event.cpp @@ -33,11 +33,37 @@ bool InotifyEvent::IsNewDirectoryInFolderEvent() const { bool InotifyEvent::IsDeleteDirectoryInFolderEvent() const { return IsDirectoryEvent() && ((GetMask() & IN_DELETE_SELF) || (GetMask() & IN_MOVED_FROM)); } -uint32_t InotifyEvent::NameLength() const { - return inotify_event_->len; -} + bool InotifyEvent::IsDeleteDirectoryInFolderEventByMove() const { return IsDeleteDirectoryInFolderEvent() && (GetMask() & IN_MOVED_FROM); } +void InotifyEvent::Print() const { + printf(" wd =%2d; ", inotify_event_->wd); + if (inotify_event_->cookie > 0) + printf("cookie =%4d; ", inotify_event_->cookie); + + printf("mask = "); + if (inotify_event_->mask & IN_ACCESS) printf("IN_ACCESS "); + if (inotify_event_->mask & IN_ATTRIB) printf("IN_ATTRIB "); + if (inotify_event_->mask & IN_CLOSE_NOWRITE) printf("IN_CLOSE_NOWRITE "); + if (inotify_event_->mask & IN_CLOSE_WRITE) printf("IN_CLOSE_WRITE "); + if (inotify_event_->mask & IN_CREATE) printf("IN_CREATE "); + if (inotify_event_->mask & IN_DELETE) printf("IN_DELETE "); + if (inotify_event_->mask & IN_DELETE_SELF) printf("IN_DELETE_SELF "); + if (inotify_event_->mask & IN_IGNORED) printf("IN_IGNORED "); + if (inotify_event_->mask & IN_ISDIR) printf("IN_ISDIR "); + if (inotify_event_->mask & IN_MODIFY) printf("IN_MODIFY "); + if (inotify_event_->mask & IN_MOVE_SELF) printf("IN_MOVE_SELF "); + if (inotify_event_->mask & IN_MOVED_FROM) printf("IN_MOVED_FROM "); + if (inotify_event_->mask & IN_MOVED_TO) printf("IN_MOVED_TO "); + if (inotify_event_->mask & IN_OPEN) printf("IN_OPEN "); + if (inotify_event_->mask & IN_Q_OVERFLOW) printf("IN_Q_OVERFLOW "); + if (inotify_event_->mask & IN_UNMOUNT) printf("IN_UNMOUNT "); + printf("\n"); + + if (inotify_event_->len > 0) + printf(" name = %s\n", inotify_event_->name); + +} } \ No newline at end of file diff --git a/producer/event_monitor_producer/src/inotify_event.h b/producer/event_monitor_producer/src/inotify_event.h index 472f0c7b7ce3d2e081bacd06ea4dbc2f163a3acf..a01d950c0c6748c5cdd6c70ed502e6f1330a2791 100644 --- a/producer/event_monitor_producer/src/inotify_event.h +++ b/producer/event_monitor_producer/src/inotify_event.h @@ -15,7 +15,6 @@ class InotifyEvent { public: InotifyEvent(const struct inotify_event* inotify_event, const std::map<int, std::string>& watched_folders_paths); uint32_t Length() const ; - uint32_t NameLength() const ; bool IsDirectoryEvent() const ; bool IsNewFileInFolderEvent() const; bool IsNewDirectoryInFolderEvent() const; @@ -24,6 +23,7 @@ class InotifyEvent { int Descriptor() const ; const char* Name() const ; uint32_t GetMask() const; + void Print() const; private: const struct inotify_event* inotify_event_; const std::map<int, std::string> watched_folders_paths_; diff --git a/producer/event_monitor_producer/src/eventmon_main.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp similarity index 93% rename from producer/event_monitor_producer/src/eventmon_main.cpp rename to producer/event_monitor_producer/src/main_eventmon.cpp index f1a1c78552a615cb9447c47e9a4a7175c63105e3..d3b329d6f5ed9accb1bdf7701ff60449c64c8dc8 100644 --- a/producer/event_monitor_producer/src/eventmon_main.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -12,6 +12,7 @@ #include "event_detector_factory.h" #include "eventmon_logger.h" #include "event_monitor_error.h" +#include "preprocessor/definitions.h" using asapo::Producer; using asapo::EventMonConfigFactory; @@ -99,7 +100,8 @@ int main (int argc, char* argv[]) { continue; } event_header.file_id = i++; - producer->Send(event_header, nullptr, ProcessAfterSend); + producer->SendFile(event_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator + + event_header.file_name, ProcessAfterSend); } logger->Info("Producer exit. Processed " + std::to_string(i) + " files"); diff --git a/producer/event_monitor_producer/src/system_folder_watch_linux.cpp b/producer/event_monitor_producer/src/system_folder_watch_linux.cpp index 0faa622e9de0297ee51f1f6d7331c3091c470374..7b013693ceec1c3c5b697178d6f8df8e9b280c26 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_linux.cpp +++ b/producer/event_monitor_producer/src/system_folder_watch_linux.cpp @@ -83,6 +83,7 @@ Error SystemFolderWatch::FindEventPath(const InotifyEvent& event, std::string* f } Error SystemFolderWatch::ProcessFileEvent(const InotifyEvent& event, FilesToSend* files) { + event.Print(); if (!event.IsNewFileInFolderEvent()) { return nullptr; } @@ -91,8 +92,9 @@ Error SystemFolderWatch::ProcessFileEvent(const InotifyEvent& event, FilesToSend if (err) { return err; } + GetDefaultEventMonLogger()->Debug(((event.GetMask() & IN_CLOSE_WRITE) ? "file closed: " : "file moved: ") + fname); + files->emplace_back(std::move(fname)); - GetDefaultEventMonLogger()->Debug((event.GetMask() & IN_CLOSE_WRITE) ? "file closed: " : "file moved: " + fname); return nullptr; } diff --git a/producer/event_monitor_producer/src/system_folder_watch_linux.h b/producer/event_monitor_producer/src/system_folder_watch_linux.h index c7c0c39504ed043818ad694cc8a7f28ee1874e94..0b9c2e7dc7e291fb76d822ab87e3bab6b99f0178 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_linux.h +++ b/producer/event_monitor_producer/src/system_folder_watch_linux.h @@ -48,14 +48,13 @@ class SystemFolderWatch { std::map<int, std::string>::iterator FindEventIterator(const InotifyEvent& event, Error* err); void RemoveFolderWithSubfoldersFromWatch(const std::string& path); std::map<int, std::string>::iterator RemoveFolderFromWatch(const std::map<int, std::string>::iterator& it); - + Error ReadInotifyEvents(int* bytes_read); + Error ProcessInotifyEvents(int bytes_in_buffer, FilesToSend* events); + Error FindEventPath(const InotifyEvent& event, std::string* folder, bool add_root); private: std::unique_ptr<char[]> buffer_; std::map<int, std::string> watched_folders_paths_; int watch_fd_ = -1; - Error ReadInotifyEvents(int* bytes_read); - Error ProcessInotifyEvents(int bytes_in_buffer, FilesToSend* events); - Error FindEventPath(const InotifyEvent& event, std::string* folder, bool add_root); std::string root_folder_; }; diff --git a/receiver/src/request_handler_file_write.cpp b/receiver/src/request_handler_file_write.cpp index 552393f76dbb4ad0cec55feb6435f163614a9c5e..93d7acb3ef9272b503dc0ded5709a0535b23a0ab 100644 --- a/receiver/src/request_handler_file_write.cpp +++ b/receiver/src/request_handler_file_write.cpp @@ -18,8 +18,8 @@ Error RequestHandlerFileWrite::ProcessRequest(Request* request) const { auto fname = request->GetFileName(); auto root_folder = GetReceiverConfig()->root_folder + kPathSeparator + request->GetBeamline() + kPathSeparator - + request->GetBeamtimeId() + kPathSeparator; - auto err = io__->WriteDataToFile(root_folder + fname, data, fsize); + + request->GetBeamtimeId(); + auto err = io__->WriteDataToFile(root_folder, fname, data, fsize, true); if (!err) { log__->Debug("saved file of size " + std::to_string(fsize) + " to " + root_folder + fname); } diff --git a/receiver/unittests/test_request_handler_file_write.cpp b/receiver/unittests/test_request_handler_file_write.cpp index 5a999e0681aecb1009088bcf92d60b842dec515e..6c5bdce40bbf1eef0cee9b7c79c5e72f17ddcc88 100644 --- a/receiver/unittests/test_request_handler_file_write.cpp +++ b/receiver/unittests/test_request_handler_file_write.cpp @@ -132,10 +132,9 @@ TEST_F(FileWriteHandlerTests, CallsWriteFile) { MockRequestData(); std::string expected_path = std::string("test_folder") + asapo::kPathSeparator + expected_beamline - + asapo::kPathSeparator + expected_beamtime_id - + asapo::kPathSeparator + expected_file_name; + + asapo::kPathSeparator + expected_beamtime_id; - EXPECT_CALL(mock_io, WriteDataToFile_t(expected_path.c_str(), _, expected_file_size)) + EXPECT_CALL(mock_io, WriteDataToFile_t(expected_path, expected_file_name, _, expected_file_size, true)) .WillOnce( Return(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()) ); @@ -150,7 +149,7 @@ TEST_F(FileWriteHandlerTests, WritesToLog) { MockRequestData(); - EXPECT_CALL(mock_io, WriteDataToFile_t(_, _, _)) + EXPECT_CALL(mock_io, WriteDataToFile_t(_, _, _, _, _)) .WillOnce(Return(nullptr)); EXPECT_CALL(mock_logger, Debug(AllOf(HasSubstr("saved file"), diff --git a/tests/automatic/producer/file_monitor_producer/CMakeLists.txt b/tests/automatic/producer/file_monitor_producer/CMakeLists.txt index cdaa002d1ae26820bbb3e5db641ec566a693e573..09da2f2aba4c46f37334b6b978a8d1706d9fc715 100644 --- a/tests/automatic/producer/file_monitor_producer/CMakeLists.txt +++ b/tests/automatic/producer/file_monitor_producer/CMakeLists.txt @@ -3,4 +3,5 @@ set(TARGET_NAME file-monitor-producer) ################################ # Testing ################################ +configure_file(test.json test.json COPYONLY) add_script_test("${TARGET_NAME}" "$<TARGET_FILE:event-monitor-producer-bin>" nomem) diff --git a/tests/automatic/producer/file_monitor_producer/check_linux.sh b/tests/automatic/producer/file_monitor_producer/check_linux.sh index e6ffdd3078f23755c9e5609590a151a3917bfe34..9428e35b291d441fb929077e8cac748cd0ee92e4 100644 --- a/tests/automatic/producer/file_monitor_producer/check_linux.sh +++ b/tests/automatic/producer/file_monitor_producer/check_linux.sh @@ -5,32 +5,32 @@ set -e trap Cleanup EXIT Cleanup() { + set +e echo cleanup - rm -rf test_in test_out #output + rm -rf /tmp/test_in /tmp/test_out output kill -9 $producer_id &>/dev/null } -mkdir -p test_in test_out +mkdir -p /tmp/test_in/test1 /tmp/test_in/test2 /tmp/test_out $1 test.json &> output & producer_id=`echo $!` -sleep 0.1 +sleep 0.5 -echo test1 > test_in/test1.dat -echo test2 > test_in/test2.tmp -#cp test_in/test2.tmp test_in/test4.dat -mkdir test_in/subdir -echo test3 > test_in/subdir/test3.dat +echo test1 > /tmp/test_in/test1/test1.dat +echo test2 > /tmp/test_in/test2/test2.tmp +mkdir -p /tmp/test_in/test2/subdir +echo test3 > /tmp/test_in/test2/subdir/test3.dat sleep 0.1 -cat test_out/test1.dat | grep test1 -cat test_out/subdir/test3.dat | grep test3 +cat /tmp/test_out/test1/test1.dat | grep test1 +cat /tmp/test_out/test2/subdir/test3.dat | grep test3 -test ! -e test_out/test2.tmp +test ! -e /tmp/test_out/test2/test2.tmp kill -s INT $producer_id sleep 0.5 cat output -cat output | grep processed 2 +cat output | grep "Processed 2" diff --git a/tests/automatic/producer/file_monitor_producer/test.json b/tests/automatic/producer/file_monitor_producer/test.json new file mode 100644 index 0000000000000000000000000000000000000000..fe468f96bfbc4f7a7656f4ca961644e50f1e1e49 --- /dev/null +++ b/tests/automatic/producer/file_monitor_producer/test.json @@ -0,0 +1,11 @@ +{ + "AsapoEndpoint":"/tmp/test_out", + "Tag":"test_tag", + "BeamtimeID":"asapo_test", + "Mode":"filesystem", + "NThreads":1, + "LogLevel":"debug", + "RootMonitoredFolder":"/tmp/test_in", + "MonitoredSubFolders":["test1","test2"], + "IgnoreExtentions":["tmp"] +} diff --git a/tests/automatic/system_io/read_file_content/CMakeLists.txt b/tests/automatic/system_io/read_file_content/CMakeLists.txt index c65eb453715599a2237d6097832c420af90bbad0..ca5a9e0de6b73fb53c6717999828fff356c03a2d 100644 --- a/tests/automatic/system_io/read_file_content/CMakeLists.txt +++ b/tests/automatic/system_io/read_file_content/CMakeLists.txt @@ -16,6 +16,7 @@ set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) add_test_setup_cleanup(${TARGET_NAME}) add_integration_test(${TARGET_NAME} readfile "test/1 123") +add_integration_test(${TARGET_NAME} readfile_unkown_size "test/2 unknown_size") add_integration_test(${TARGET_NAME} filenotfound "test_notexist Nosuchfileordirectory:test_notexist") add_integration_test(${TARGET_NAME} filenoaccess "file_noaccess Permissiondenied:file_noaccess") diff --git a/tests/automatic/system_io/read_file_content/read_file_content.cpp b/tests/automatic/system_io/read_file_content/read_file_content.cpp index ec5fdae9a0f04d2808b29be2f1d4c1722d99816d..8e495e7227040ba0f046a561e74bdb1c443182f7 100644 --- a/tests/automatic/system_io/read_file_content/read_file_content.cpp +++ b/tests/automatic/system_io/read_file_content/read_file_content.cpp @@ -14,7 +14,14 @@ int main(int argc, char* argv[]) { asapo::Error err; auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; - auto data = io->GetDataFromFile(argv[1], expect.size(), &err); + asapo::FileData data; + uint64_t size = 0; + if (expect == "unknown_size") { + data = io->GetDataFromFile(argv[1], &size, &err); + } else { + size = expect.size(); + data = io->GetDataFromFile(argv[1], &size, &err); + } std::string result; diff --git a/tests/automatic/system_io/read_file_content/setup_linux.sh b/tests/automatic/system_io/read_file_content/setup_linux.sh index 032de0f55bfb851e6cbded0e4da93ec75012ec89..bc1a7a30632b1c5239e638c42e0be77c54150b24 100644 --- a/tests/automatic/system_io/read_file_content/setup_linux.sh +++ b/tests/automatic/system_io/read_file_content/setup_linux.sh @@ -2,6 +2,7 @@ mkdir -p test echo 123 > test/1 +echo unknown_size > test/2 touch file_noaccess chmod -rx file_noaccess diff --git a/tests/automatic/system_io/read_file_content/setup_windows.bat b/tests/automatic/system_io/read_file_content/setup_windows.bat index c28945b0ac1612ea881e8d53f3c493ba8100b5e7..0bceacea09c2184ea86bf4e7918b66cabf341d15 100644 --- a/tests/automatic/system_io/read_file_content/setup_windows.bat +++ b/tests/automatic/system_io/read_file_content/setup_windows.bat @@ -1,6 +1,7 @@ mkdir test echo 123 > test/1 +echo unknown_size > test/2 type nul > file_noaccess icacls file_noaccess /deny users:D diff --git a/tests/automatic/system_io/write_data_to_file/CMakeLists.txt b/tests/automatic/system_io/write_data_to_file/CMakeLists.txt index 8a576706d71ad65f73eeb8257400307c4f1bdf43..dcb279d3679b5d41f38e366a4a43b2ad5b9930cc 100644 --- a/tests/automatic/system_io/write_data_to_file/CMakeLists.txt +++ b/tests/automatic/system_io/write_data_to_file/CMakeLists.txt @@ -15,6 +15,7 @@ set_target_properties(${TARGET_NAME} PROPERTIES LINKER_LANGUAGE CXX) ################################ add_test_setup_cleanup(${TARGET_NAME}) add_integration_test(${TARGET_NAME} writeok "test_file ok dummy" nomem) +add_integration_test(${TARGET_NAME} writewithfolder "folder/a/b/c/d/test_file ok dummy" nomem) add_integration_test(${TARGET_NAME} writetwice "test_file ok dummy" nomem) add_integration_test(${TARGET_NAME} dirnoaccess "test_noaccess/test_file error Permissiondenied:test_noaccess/test_file" nomem) diff --git a/tests/automatic/system_io/write_data_to_file/cleanup_linux.sh b/tests/automatic/system_io/write_data_to_file/cleanup_linux.sh index 1a50a82324730dc9630f63142d6328926b2ca9f4..d1cae7ea77f8ae0dc3b089df57f44ace16e6a71b 100644 --- a/tests/automatic/system_io/write_data_to_file/cleanup_linux.sh +++ b/tests/automatic/system_io/write_data_to_file/cleanup_linux.sh @@ -2,4 +2,5 @@ rm -f test_file rmdir test_noaccess +rm -rf folder diff --git a/tests/automatic/system_io/write_data_to_file/cleanup_windows.bat b/tests/automatic/system_io/write_data_to_file/cleanup_windows.bat index 256da63ada32b6f56a0273cc6ea378675be066cf..40f45c7354e0abde094a89e58bbbd1947c035a19 100644 --- a/tests/automatic/system_io/write_data_to_file/cleanup_windows.bat +++ b/tests/automatic/system_io/write_data_to_file/cleanup_windows.bat @@ -2,3 +2,4 @@ del test_file icacls test_noaccess /grant:r users:W rmdir /S /Q test_noaccess +rmdir /S /Q folder \ No newline at end of file diff --git a/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp b/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp index add455cd3eb1a6ba79a7b6bf0e8ee5c8214119c3..b629bc158236772c8fdeccdb72a4d6f08439d31d 100644 --- a/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp +++ b/tests/automatic/system_io/write_data_to_file/write_data_to_file.cpp @@ -12,7 +12,7 @@ struct Params { std::string fname; std::string result; std::string message; - int length; + uint64_t length; }; Params GetParams(int argc, char* argv[]) { @@ -34,7 +34,8 @@ void AssertGoodResult(const std::unique_ptr<IO>& io, const Error& err, const Fil exit(EXIT_FAILURE); } Error read_err; - auto read_data = io->GetDataFromFile(params.fname, params.length, &read_err); + uint64_t size = params.length; + auto read_data = io->GetDataFromFile(params.fname, &size, &read_err); asapo::M_AssertContains(std::string(read_data.get(), read_data.get() + params.length), "123"); } @@ -51,13 +52,10 @@ int main(int argc, char* argv[]) { auto params = GetParams(argc, argv); auto io = std::unique_ptr<asapo::IO> {asapo::GenerateDefaultIO()}; - auto array = new uint8_t[params.length]; - array[0] = '1'; - array[1] = '2'; - array[2] = '3'; + auto array = new uint8_t[params.length] {'1', '2', '3'}; FileData data{array}; - auto err = io->WriteDataToFile(params.fname, data, params.length); + auto err = io->WriteDataToFile("", params.fname, data, params.length, true); if (params.result == "ok") { AssertGoodResult(io, err, data, params); diff --git a/worker/api/cpp/src/folder_data_broker.cpp b/worker/api/cpp/src/folder_data_broker.cpp index 35a4fb096f3dc4fc0bffb1805df536b68beb2075..8115ffaa91716464f9c4b22b7abf2bf6c7ce537d 100644 --- a/worker/api/cpp/src/folder_data_broker.cpp +++ b/worker/api/cpp/src/folder_data_broker.cpp @@ -55,7 +55,7 @@ Error FolderDataBroker::GetFileByIndex(uint64_t nfile_to_get, FileInfo* info, Fi } Error error; - *data = io__->GetDataFromFile(info->FullName(base_path_), info->size, &error); + *data = io__->GetDataFromFile(info->FullName(base_path_), &info->size, &error); return error; } diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index aff8b3dcf522e664d95f8883b8b2f6efba0d3ab6..c555473c404bcd65c92c66e40c621a389e5c12a5 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -173,7 +173,7 @@ Error ServerDataBroker::GetImageFromServer(GetImageServerOperation op, FileInfo* } Error error; - *data = io__->GetDataFromFile(info->FullName(""), info->size, &error); + *data = io__->GetDataFromFile(info->FullName(""), &info->size, &error); return error; } diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp index 208c0c23d531fab13fe4730b5797a13789e745ef..8823ba87ed4c10fc81f80be93981d9fe6dc667a9 100644 --- a/worker/api/cpp/unittests/test_folder_broker.cpp +++ b/worker/api/cpp/unittests/test_folder_broker.cpp @@ -82,7 +82,7 @@ class IOEmptyFolder: public FakeIO { class IOCannotOpenFile: public FakeIO { public: - FileData GetDataFromFile(const std::string& fname, uint64_t fsize, Error* err) const noexcept override { + FileData GetDataFromFile(const std::string& fname, uint64_t* fsize, Error* err) const noexcept override { *err = asapo::IOErrorTemplates::kPermissionDenied.Generate(); return {}; }; diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 14538967a72bb6fbedce751433ad0ccc9c8978b7..24e7e312acd9e822669702f7716f130f35923feb 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -300,7 +300,7 @@ TEST_F(ServerDataBrokerTests, GetImageCallsReadFromFile) { auto json = to_send.Json(); MockGet(json); - EXPECT_CALL(mock_io, GetDataFromFile_t("name", 100, _)). + EXPECT_CALL(mock_io, GetDataFromFile_t("name", testing::Pointee(100), _)). WillOnce(DoAll(SetArgPointee<2>(new asapo::SimpleError{"s"}), testing::Return(nullptr))); FileData data;