From 088215e4e3c7559110d6d265bf72127250d614de Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Mon, 3 Sep 2018 12:53:22 +0200 Subject: [PATCH] delete file after send option --- common/cpp/include/io/io.h | 2 +- common/cpp/include/unittests/MockIO.h | 9 +++ common/cpp/src/system_io/system_io.cpp | 8 +++ common/cpp/src/system_io/system_io.h | 1 + .../src/eventmon_config.cpp | 1 + .../src/eventmon_config.h | 1 + .../src/main_eventmon.cpp | 16 ++++- .../src/system_folder_watch_linux.cpp | 50 ++++++++++++---- .../src/system_folder_watch_linux.h | 7 ++- .../unittests/mock_eventmon_config.cpp | 2 + .../unittests/test_eventmon_config.cpp | 2 + .../test_system_folder_watch_linux.cpp | 59 +++++++++++++++---- .../file_monitor_producer/check_linux.sh | 6 +- .../producer/file_monitor_producer/test.json | 3 +- 14 files changed, 136 insertions(+), 31 deletions(-) diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index 4a0ee3ee5..5e281bedb 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -90,7 +90,7 @@ class IO { virtual size_t Read (FileDescriptor fd, void* buf, size_t length, Error* err) const = 0; virtual size_t Write (FileDescriptor fd, const void* buf, size_t length, Error* err) const = 0; - + virtual Error DeleteFile(const std::string& fname) const = 0; virtual Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const FileData& data, size_t length, bool create_directories) const = 0; virtual Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const uint8_t* data, diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index b88faa5e3..a4b590bbf 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -179,12 +179,21 @@ class MockIO : public IO { } + MOCK_CONST_METHOD1(DeleteFile_t, ErrorInterface * (const std::string& fname)); + Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data, size_t length, bool create_directories) const override { return Error{WriteDataToFile_t(root_folder, fname, data, length, create_directories)}; } + Error DeleteFile(const std::string& fname) const override { + return Error{DeleteFile_t(fname)}; + } + + + + MOCK_CONST_METHOD5(WriteDataToFile_t, ErrorInterface * (const std::string& root_folder, const std::string& fname, const uint8_t* data, size_t fsize, bool create_directories)); diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index 306451a16..97cef0e86 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -601,4 +601,12 @@ Error SystemIO::CreateDirectoryWithParents(const std::string& root_path, const s return nullptr; } +Error SystemIO::DeleteFile(const std::string& fname) const { + if(remove(fname.c_str()) == 0) { + return nullptr;; + } else { + return GetLastError(); + } +} + } diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index b368c43c1..ffcdbc305 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -110,6 +110,7 @@ class SystemIO final : public IO { size_t length, bool create_directories) const override; SubDirList GetSubDirectories(const std::string& path, Error* err) const override; std::string ReadFileToString(const std::string& fname, Error* err) const override; + Error DeleteFile(const std::string& fname) const override; }; } diff --git a/producer/event_monitor_producer/src/eventmon_config.cpp b/producer/event_monitor_producer/src/eventmon_config.cpp index 0cf4549f6..ca1a07c9e 100644 --- a/producer/event_monitor_producer/src/eventmon_config.cpp +++ b/producer/event_monitor_producer/src/eventmon_config.cpp @@ -21,6 +21,7 @@ Error EventMonConfigFactory::ParseConfigFile(std::string file_name) { (err = parser.GetUInt64("NThreads", &config.nthreads)) || (err = parser.GetString("RootMonitoredFolder", &config.root_monitored_folder)) || (err = parser.GetString("LogLevel", &config.log_level_str)) || + (err = parser.GetBool("RemoveAfterSend", &config.remove_after_send)) || (err = parser.GetArrayString("MonitoredSubFolders", &config.monitored_subfolders)) || (err = parser.GetArrayString("IgnoreExtentions", &config.ignored_extentions)); diff --git a/producer/event_monitor_producer/src/eventmon_config.h b/producer/event_monitor_producer/src/eventmon_config.h index e6f0cc0ec..494eae438 100644 --- a/producer/event_monitor_producer/src/eventmon_config.h +++ b/producer/event_monitor_producer/src/eventmon_config.h @@ -19,6 +19,7 @@ struct EventMonConfig { std::string root_monitored_folder; std::vector<std::string> monitored_subfolders; std::vector<std::string> ignored_extentions; + bool remove_after_send = false; private: std::string log_level_str; std::string mode_str; diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp index d3b329d6f..6948e3174 100644 --- a/producer/event_monitor_producer/src/main_eventmon.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -14,11 +14,15 @@ #include "event_monitor_error.h" #include "preprocessor/definitions.h" +#include "io/io_factory.h" + using asapo::Producer; using asapo::EventMonConfigFactory; using asapo::Error; using asapo::GetEventMonConfig; +auto io = asapo::GenerateDefaultIO(); + Error ReadConfigFile(int argc, char* argv[]) { if (argc != 2) { std::cerr << "Usage: " << argv[0] << " <config file>" << std::endl; @@ -47,10 +51,18 @@ std::unique_ptr<Producer> CreateProducer() { void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) { if (err) { - const auto& logger = asapo::GetDefaultEventMonLogger(); + const auto logger = asapo::GetDefaultEventMonLogger(); logger->Error("data was not successfully send: " + err->Explain()); return; } + auto config = GetEventMonConfig(); + std::string fname = config->root_monitored_folder + asapo::kPathSeparator + header.message; + auto error = io->DeleteFile(fname); + if (error) { + const auto logger = asapo::GetDefaultEventMonLogger(); + logger->Error("cannot delete file: " + fname + "" + error->Explain()); + return; + } } volatile sig_atomic_t stop_signal; @@ -72,7 +84,7 @@ int main (int argc, char* argv[]) { std::signal(SIGTERM, SignalHandler); siginterrupt(SIGINT, 1); - const auto& logger = asapo::GetDefaultEventMonLogger(); + const auto logger = asapo::GetDefaultEventMonLogger(); logger->SetLogLevel(GetEventMonConfig()->log_level); diff --git a/producer/event_monitor_producer/src/system_folder_watch_linux.cpp b/producer/event_monitor_producer/src/system_folder_watch_linux.cpp index 7b013693c..3762257a3 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_linux.cpp +++ b/producer/event_monitor_producer/src/system_folder_watch_linux.cpp @@ -67,16 +67,17 @@ std::map<int, std::string>::iterator SystemFolderWatch::FindEventIterator(const -Error SystemFolderWatch::FindEventPath(const InotifyEvent& event, std::string* folder, bool add_root) { +Error SystemFolderWatch::FindEventPaths(const InotifyEvent& event, std::string* full_path, std::string* relative_path) { Error err; auto it = FindEventIterator(event, &err); if (err) { return err; } - if (add_root) { - *folder = root_folder_ + "/" + it->second + "/" + event.Name(); - } else { - *folder = it->second + "/" + event.Name(); + if (full_path) { + *full_path = root_folder_ + "/" + it->second + "/" + event.Name(); + } + if (relative_path) { + *relative_path = it->second + "/" + event.Name(); } return nullptr; @@ -88,7 +89,7 @@ Error SystemFolderWatch::ProcessFileEvent(const InotifyEvent& event, FilesToSend return nullptr; } std::string fname; - auto err = FindEventPath(event, &fname, false); + auto err = FindEventPaths(event, nullptr, &fname); if (err) { return err; } @@ -98,15 +99,38 @@ Error SystemFolderWatch::ProcessFileEvent(const InotifyEvent& event, FilesToSend return nullptr; } -Error SystemFolderWatch::ProcessNewDirectoryInFolderEvent(const InotifyEvent& event) { - std::string newpath; - auto err = FindEventPath(event, &newpath, true); + +Error SystemFolderWatch::AddExistingFilesToEvents(const std::string& full_path, const std::string& rel_path, + FilesToSend* file_events) { + Error err; + auto files = io__->FilesInFolder(full_path, &err); if (err) { return err; } - return AddFolderAndSubfoldersToWatch(newpath); + for (auto& file : files) { + std::string fname = rel_path + kPathSeparator + std::move(file.name); + file_events->emplace_back(fname); + GetDefaultEventMonLogger()->Warning("manually added file to events, double send possible : " + fname); + } + + return nullptr; } + +Error SystemFolderWatch::ProcessNewDirectoryInFolderEvent(const InotifyEvent& event, FilesToSend* file_events) { + std::string new_full_path, new_relative_path; + auto err = FindEventPaths(event, &new_full_path, &new_relative_path); + if (err) { + return err; + } + err = AddFolderAndSubfoldersToWatch(new_full_path); + if (err) { + return err; + } + return AddExistingFilesToEvents(new_full_path, new_relative_path, file_events); +} + + std::map<int, std::string>::iterator SystemFolderWatch::RemoveFolderFromWatch(const std::map<int, std::string>::iterator& it) { inotify__->DeleteWatch(it->first, watch_fd_); @@ -143,9 +167,9 @@ Error SystemFolderWatch::ProcessDeleteDirectoryInFolderEvent(const InotifyEvent& } -Error SystemFolderWatch::ProcessDirectoryEvent(const InotifyEvent& event) { +Error SystemFolderWatch::ProcessDirectoryEvent(const InotifyEvent& event, FilesToSend* file_events) { if (event.IsNewDirectoryInFolderEvent()) { - return ProcessNewDirectoryInFolderEvent(event); + return ProcessNewDirectoryInFolderEvent(event, file_events); } if (event.IsDeleteDirectoryInFolderEvent()) { @@ -157,7 +181,7 @@ Error SystemFolderWatch::ProcessDirectoryEvent(const InotifyEvent& event) { Error SystemFolderWatch::ProcessInotifyEvent(const InotifyEvent& event, FilesToSend* file_events) { if (event.IsDirectoryEvent()) { - return ProcessDirectoryEvent(event); + return ProcessDirectoryEvent(event, file_events); } else { return ProcessFileEvent(event, file_events); } diff --git a/producer/event_monitor_producer/src/system_folder_watch_linux.h b/producer/event_monitor_producer/src/system_folder_watch_linux.h index 0b9c2e7dc..ffa2cb219 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_linux.h +++ b/producer/event_monitor_producer/src/system_folder_watch_linux.h @@ -42,15 +42,16 @@ class SystemFolderWatch { Error AddFolderToWatch(std::string folder); Error ProcessInotifyEvent(const InotifyEvent& event, FilesToSend* file_events); Error ProcessFileEvent(const InotifyEvent& event, FilesToSend* files); - Error ProcessDirectoryEvent(const InotifyEvent& event); - Error ProcessNewDirectoryInFolderEvent(const InotifyEvent& event); + Error ProcessDirectoryEvent(const InotifyEvent& event, FilesToSend* file_events); + Error ProcessNewDirectoryInFolderEvent(const InotifyEvent& event, FilesToSend* file_events); + Error AddExistingFilesToEvents(const std::string& full_path, const std::string& rel_path, FilesToSend* file_events); Error ProcessDeleteDirectoryInFolderEvent(const InotifyEvent& event); std::map<int, std::string>::iterator FindEventIterator(const InotifyEvent& event, Error* err); void RemoveFolderWithSubfoldersFromWatch(const std::string& path); std::map<int, std::string>::iterator RemoveFolderFromWatch(const std::map<int, std::string>::iterator& it); Error ReadInotifyEvents(int* bytes_read); Error ProcessInotifyEvents(int bytes_in_buffer, FilesToSend* events); - Error FindEventPath(const InotifyEvent& event, std::string* folder, bool add_root); + Error FindEventPaths(const InotifyEvent& event, std::string* full_path, std::string* relative_path); private: std::unique_ptr<char[]> buffer_; std::map<int, std::string> watched_folders_paths_; diff --git a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp index 2ff7a98bc..6cabb6da0 100644 --- a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp +++ b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp @@ -48,6 +48,8 @@ Error SetFolderMonConfig (const EventMonConfig& config) { config_string += "," + std::string("\"Mode\":") + "\"" + mode + "\""; config_string += "," + std::string("\"NThreads\":") + std::to_string(config.nthreads); config_string += "," + std::string("\"LogLevel\":") + "\"" + log_level + "\""; + config_string += "," + std::string("\"RemoveAfterSend\":") + (config.remove_after_send ? "true" : "false"); + std::string mon_folders; for (auto folder : config.monitored_subfolders) { mon_folders += "\"" + folder + "\"" + ","; diff --git a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp index fdaaa055b..71a392883 100644 --- a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp +++ b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp @@ -59,6 +59,7 @@ TEST_F(ConfigTests, ReadSettingsOK) { test_config.root_monitored_folder = "tmp"; test_config.monitored_subfolders = {"test1", "test2"}; test_config.ignored_extentions = {"tmp", "test"}; + test_config.remove_after_send = true; auto err = asapo::SetFolderMonConfig(test_config); auto config = asapo::GetEventMonConfig(); @@ -73,6 +74,7 @@ TEST_F(ConfigTests, ReadSettingsOK) { ASSERT_THAT(config->monitored_subfolders, ElementsAre("test1", "test2")); ASSERT_THAT(config->root_monitored_folder, Eq("tmp")); ASSERT_THAT(config->ignored_extentions, ElementsAre("tmp", "test")); + ASSERT_THAT(config->remove_after_send, Eq(true)); } diff --git a/producer/event_monitor_producer/unittests/test_system_folder_watch_linux.cpp b/producer/event_monitor_producer/unittests/test_system_folder_watch_linux.cpp index a5d22d6c3..72d24fc89 100644 --- a/producer/event_monitor_producer/unittests/test_system_folder_watch_linux.cpp +++ b/producer/event_monitor_producer/unittests/test_system_folder_watch_linux.cpp @@ -27,6 +27,8 @@ using ::asapo::Error; using ::asapo::ErrorInterface; using asapo::FilesToSend; using asapo::SystemFolderWatch; +using asapo::FileInfos; +using asapo::FileInfo; namespace { @@ -37,6 +39,16 @@ TEST(SystemFolderWatch, Constructor) { ASSERT_THAT(dynamic_cast<asapo::Inotify*>(watch.inotify__.get()), Ne(nullptr)); } +FileInfos CreateTestFileInfos() { + FileInfos file_infos; + FileInfo fi; + fi.size = 100; + fi.name = "file1"; + file_infos.push_back(fi); + fi.name = "subfolder/file2"; + file_infos.push_back(fi); + return file_infos; +} class SystemFolderWatchTests : public testing::Test { @@ -52,7 +64,7 @@ class SystemFolderWatchTests : public testing::Test { std::vector<std::string> expected_watches{"/tmp/test1", "/tmp/test2", "/tmp/test1/sub11", "/tmp/test2/sub21", "/tmp/test2/sub22", "/tmp/test2/sub21/sub211"}; std::string expected_filename1{"file1"}; std::string expected_filename2{"file2"}; - + FileInfos expected_fileinfos = CreateTestFileInfos(); int expected_wd = 10; std::vector<int>expected_fds = {1, 2, 3, 4, 5, 6}; void MockStartMonitoring(); @@ -69,7 +81,7 @@ class SystemFolderWatchTests : public testing::Test { } ssize_t AddEventToBuffer(std::string filename, uint32_t mask, int fd); void ExpectRead(); - void ExpectCreateFolder(std::string folder); + void ExpectCreateFolder(std::string folder, bool with_files); }; void SystemFolderWatchTests::MockStartMonitoring() { @@ -108,9 +120,9 @@ ACTION_P(A_CopyBuf, buffer) { ssize_t SystemFolderWatchTests::AddEventToBuffer(std::string filename, uint32_t mask, int fd) { - ssize_t size = sizeof(struct inotify_event) + filename.size()+1; + ssize_t size = sizeof(struct inotify_event) + filename.size() + 1; char* buf = (char*) malloc(size); - struct inotify_event* event=(struct inotify_event*) buf; + struct inotify_event* event = (struct inotify_event*) buf; event->mask = mask; event->wd = fd; strcpy(event->name, filename.c_str()); @@ -257,7 +269,7 @@ TEST_F(SystemFolderWatchTests, ProcessDeleteFolder) { ASSERT_THAT(err, Eq(nullptr)); } -void SystemFolderWatchTests::ExpectCreateFolder(std::string folder) { +void SystemFolderWatchTests::ExpectCreateFolder(std::string folder, bool with_files) { std::string newfolder = expected_root_folder + "/" + expected_folders[0] + "/" + folder; EXPECT_CALL(mock_io, GetSubDirectories_t(newfolder, _)) .WillOnce( @@ -271,34 +283,61 @@ void SystemFolderWatchTests::ExpectCreateFolder(std::string folder) { .WillOnce( Return(1) ); + + if (with_files) { + ON_CALL(mock_io, FilesInFolder_t(newfolder, _)). + WillByDefault(DoAll(testing::SetArgPointee<1>(nullptr), + testing::Return(expected_fileinfos))); + } else { + ON_CALL(mock_io, FilesInFolder_t(newfolder, _)). + WillByDefault(DoAll(testing::SetArgPointee<1>(nullptr), + testing::Return(FileInfos{}))); + } } -TEST_F(SystemFolderWatchTests, ProcessCreateFolder) { +TEST_F(SystemFolderWatchTests, ProcessCreateFolderWithFilesInIt) { MockStartMonitoring(); AddEventToBuffer("folder", IN_ISDIR | IN_CREATE, expected_fds[0]); ExpectRead(); - ExpectCreateFolder("folder"); + ExpectCreateFolder("folder", true); + + Error err; + auto events = watch.GetFileList(&err); + + ASSERT_THAT(events.size(), Eq(2)); + ASSERT_THAT(events[0].c_str(), StrEq("test1/folder/file1")); + ASSERT_THAT(events[1].c_str(), StrEq("test1/folder/subfolder/file2")); + + + ASSERT_THAT(err, Eq(nullptr)); +} + +TEST_F(SystemFolderWatchTests, ProcessCreateFolderWithoutFilesInIt) { + MockStartMonitoring(); + AddEventToBuffer("folder", IN_ISDIR | IN_CREATE, expected_fds[0]); + ExpectRead(); + ExpectCreateFolder("folder", false); Error err; auto events = watch.GetFileList(&err); ASSERT_THAT(events.size(), Eq(0)); + ASSERT_THAT(err, Eq(nullptr)); } + TEST_F(SystemFolderWatchTests, ProcessMoveFolder) { MockStartMonitoring(); AddEventToBuffer("sub21", IN_ISDIR | IN_MOVED_TO, expected_fds[0]); AddEventToBuffer("sub21", IN_ISDIR | IN_MOVED_FROM, expected_fds[1]); ExpectRead(); - ExpectCreateFolder("sub21"); + ExpectCreateFolder("sub21", false); EXPECT_CALL(mock_inotify, DeleteWatch(expected_fds[3], expected_wd)); EXPECT_CALL(mock_inotify, DeleteWatch(expected_fds[5], expected_wd)); Error err; auto events = watch.GetFileList(&err); - - ASSERT_THAT(events.size(), Eq(0)); ASSERT_THAT(err, Eq(nullptr)); } diff --git a/tests/automatic/producer/file_monitor_producer/check_linux.sh b/tests/automatic/producer/file_monitor_producer/check_linux.sh index 9428e35b2..e70b22513 100644 --- a/tests/automatic/producer/file_monitor_producer/check_linux.sh +++ b/tests/automatic/producer/file_monitor_producer/check_linux.sh @@ -7,7 +7,7 @@ trap Cleanup EXIT Cleanup() { set +e echo cleanup - rm -rf /tmp/test_in /tmp/test_out output + rm -rf /tmp/test_in /tmp/test_out #output kill -9 $producer_id &>/dev/null } @@ -29,6 +29,10 @@ cat /tmp/test_out/test2/subdir/test3.dat | grep test3 test ! -e /tmp/test_out/test2/test2.tmp +test ! -e /tmp/test_in/test1/test1.dat +test ! -e /tmp/test_in/test2/subdir/test3.dat + + kill -s INT $producer_id sleep 0.5 cat output diff --git a/tests/automatic/producer/file_monitor_producer/test.json b/tests/automatic/producer/file_monitor_producer/test.json index fe468f96b..ec29bce26 100644 --- a/tests/automatic/producer/file_monitor_producer/test.json +++ b/tests/automatic/producer/file_monitor_producer/test.json @@ -7,5 +7,6 @@ "LogLevel":"debug", "RootMonitoredFolder":"/tmp/test_in", "MonitoredSubFolders":["test1","test2"], - "IgnoreExtentions":["tmp"] + "IgnoreExtentions":["tmp"], + "RemoveAfterSend":true } -- GitLab