Skip to content
Snippets Groups Projects
Commit 088215e4 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

delete file after send option

parent 1f8cf553
No related branches found
No related tags found
No related merge requests found
Showing
with 136 additions and 31 deletions
......@@ -90,7 +90,7 @@ class IO {
virtual size_t Read (FileDescriptor fd, void* buf, size_t length, Error* err) const = 0;
virtual size_t Write (FileDescriptor fd, const void* buf, size_t length, Error* err) const = 0;
virtual Error DeleteFile(const std::string& fname) const = 0;
virtual Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const FileData& data,
size_t length, bool create_directories) const = 0;
virtual Error WriteDataToFile (const std::string& root_folder, const std::string& fname, const uint8_t* data,
......
......@@ -179,12 +179,21 @@ class MockIO : public IO {
}
MOCK_CONST_METHOD1(DeleteFile_t, ErrorInterface * (const std::string& fname));
Error WriteDataToFile(const std::string& root_folder, const std::string& fname, const uint8_t* data,
size_t length, bool create_directories) const override {
return Error{WriteDataToFile_t(root_folder, fname, data, length, create_directories)};
}
Error DeleteFile(const std::string& fname) const override {
return Error{DeleteFile_t(fname)};
}
MOCK_CONST_METHOD5(WriteDataToFile_t, ErrorInterface * (const std::string& root_folder, const std::string& fname,
const uint8_t* data, size_t fsize, bool create_directories));
......
......@@ -601,4 +601,12 @@ Error SystemIO::CreateDirectoryWithParents(const std::string& root_path, const s
return nullptr;
}
Error SystemIO::DeleteFile(const std::string& fname) const {
if(remove(fname.c_str()) == 0) {
return nullptr;;
} else {
return GetLastError();
}
}
}
......@@ -110,6 +110,7 @@ class SystemIO final : public IO {
size_t length, bool create_directories) const override;
SubDirList GetSubDirectories(const std::string& path, Error* err) const override;
std::string ReadFileToString(const std::string& fname, Error* err) const override;
Error DeleteFile(const std::string& fname) const override;
};
}
......
......@@ -21,6 +21,7 @@ Error EventMonConfigFactory::ParseConfigFile(std::string file_name) {
(err = parser.GetUInt64("NThreads", &config.nthreads)) ||
(err = parser.GetString("RootMonitoredFolder", &config.root_monitored_folder)) ||
(err = parser.GetString("LogLevel", &config.log_level_str)) ||
(err = parser.GetBool("RemoveAfterSend", &config.remove_after_send)) ||
(err = parser.GetArrayString("MonitoredSubFolders", &config.monitored_subfolders)) ||
(err = parser.GetArrayString("IgnoreExtentions", &config.ignored_extentions));
......
......@@ -19,6 +19,7 @@ struct EventMonConfig {
std::string root_monitored_folder;
std::vector<std::string> monitored_subfolders;
std::vector<std::string> ignored_extentions;
bool remove_after_send = false;
private:
std::string log_level_str;
std::string mode_str;
......
......@@ -14,11 +14,15 @@
#include "event_monitor_error.h"
#include "preprocessor/definitions.h"
#include "io/io_factory.h"
using asapo::Producer;
using asapo::EventMonConfigFactory;
using asapo::Error;
using asapo::GetEventMonConfig;
auto io = asapo::GenerateDefaultIO();
Error ReadConfigFile(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <config file>" << std::endl;
......@@ -47,10 +51,18 @@ std::unique_ptr<Producer> CreateProducer() {
void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) {
if (err) {
const auto& logger = asapo::GetDefaultEventMonLogger();
const auto logger = asapo::GetDefaultEventMonLogger();
logger->Error("data was not successfully send: " + err->Explain());
return;
}
auto config = GetEventMonConfig();
std::string fname = config->root_monitored_folder + asapo::kPathSeparator + header.message;
auto error = io->DeleteFile(fname);
if (error) {
const auto logger = asapo::GetDefaultEventMonLogger();
logger->Error("cannot delete file: " + fname + "" + error->Explain());
return;
}
}
volatile sig_atomic_t stop_signal;
......@@ -72,7 +84,7 @@ int main (int argc, char* argv[]) {
std::signal(SIGTERM, SignalHandler);
siginterrupt(SIGINT, 1);
const auto& logger = asapo::GetDefaultEventMonLogger();
const auto logger = asapo::GetDefaultEventMonLogger();
logger->SetLogLevel(GetEventMonConfig()->log_level);
......
......@@ -67,16 +67,17 @@ std::map<int, std::string>::iterator SystemFolderWatch::FindEventIterator(const
Error SystemFolderWatch::FindEventPath(const InotifyEvent& event, std::string* folder, bool add_root) {
Error SystemFolderWatch::FindEventPaths(const InotifyEvent& event, std::string* full_path, std::string* relative_path) {
Error err;
auto it = FindEventIterator(event, &err);
if (err) {
return err;
}
if (add_root) {
*folder = root_folder_ + "/" + it->second + "/" + event.Name();
} else {
*folder = it->second + "/" + event.Name();
if (full_path) {
*full_path = root_folder_ + "/" + it->second + "/" + event.Name();
}
if (relative_path) {
*relative_path = it->second + "/" + event.Name();
}
return nullptr;
......@@ -88,7 +89,7 @@ Error SystemFolderWatch::ProcessFileEvent(const InotifyEvent& event, FilesToSend
return nullptr;
}
std::string fname;
auto err = FindEventPath(event, &fname, false);
auto err = FindEventPaths(event, nullptr, &fname);
if (err) {
return err;
}
......@@ -98,15 +99,38 @@ Error SystemFolderWatch::ProcessFileEvent(const InotifyEvent& event, FilesToSend
return nullptr;
}
Error SystemFolderWatch::ProcessNewDirectoryInFolderEvent(const InotifyEvent& event) {
std::string newpath;
auto err = FindEventPath(event, &newpath, true);
Error SystemFolderWatch::AddExistingFilesToEvents(const std::string& full_path, const std::string& rel_path,
FilesToSend* file_events) {
Error err;
auto files = io__->FilesInFolder(full_path, &err);
if (err) {
return err;
}
return AddFolderAndSubfoldersToWatch(newpath);
for (auto& file : files) {
std::string fname = rel_path + kPathSeparator + std::move(file.name);
file_events->emplace_back(fname);
GetDefaultEventMonLogger()->Warning("manually added file to events, double send possible : " + fname);
}
return nullptr;
}
Error SystemFolderWatch::ProcessNewDirectoryInFolderEvent(const InotifyEvent& event, FilesToSend* file_events) {
std::string new_full_path, new_relative_path;
auto err = FindEventPaths(event, &new_full_path, &new_relative_path);
if (err) {
return err;
}
err = AddFolderAndSubfoldersToWatch(new_full_path);
if (err) {
return err;
}
return AddExistingFilesToEvents(new_full_path, new_relative_path, file_events);
}
std::map<int, std::string>::iterator SystemFolderWatch::RemoveFolderFromWatch(const
std::map<int, std::string>::iterator& it) {
inotify__->DeleteWatch(it->first, watch_fd_);
......@@ -143,9 +167,9 @@ Error SystemFolderWatch::ProcessDeleteDirectoryInFolderEvent(const InotifyEvent&
}
Error SystemFolderWatch::ProcessDirectoryEvent(const InotifyEvent& event) {
Error SystemFolderWatch::ProcessDirectoryEvent(const InotifyEvent& event, FilesToSend* file_events) {
if (event.IsNewDirectoryInFolderEvent()) {
return ProcessNewDirectoryInFolderEvent(event);
return ProcessNewDirectoryInFolderEvent(event, file_events);
}
if (event.IsDeleteDirectoryInFolderEvent()) {
......@@ -157,7 +181,7 @@ Error SystemFolderWatch::ProcessDirectoryEvent(const InotifyEvent& event) {
Error SystemFolderWatch::ProcessInotifyEvent(const InotifyEvent& event, FilesToSend* file_events) {
if (event.IsDirectoryEvent()) {
return ProcessDirectoryEvent(event);
return ProcessDirectoryEvent(event, file_events);
} else {
return ProcessFileEvent(event, file_events);
}
......
......@@ -42,15 +42,16 @@ class SystemFolderWatch {
Error AddFolderToWatch(std::string folder);
Error ProcessInotifyEvent(const InotifyEvent& event, FilesToSend* file_events);
Error ProcessFileEvent(const InotifyEvent& event, FilesToSend* files);
Error ProcessDirectoryEvent(const InotifyEvent& event);
Error ProcessNewDirectoryInFolderEvent(const InotifyEvent& event);
Error ProcessDirectoryEvent(const InotifyEvent& event, FilesToSend* file_events);
Error ProcessNewDirectoryInFolderEvent(const InotifyEvent& event, FilesToSend* file_events);
Error AddExistingFilesToEvents(const std::string& full_path, const std::string& rel_path, FilesToSend* file_events);
Error ProcessDeleteDirectoryInFolderEvent(const InotifyEvent& event);
std::map<int, std::string>::iterator FindEventIterator(const InotifyEvent& event, Error* err);
void RemoveFolderWithSubfoldersFromWatch(const std::string& path);
std::map<int, std::string>::iterator RemoveFolderFromWatch(const std::map<int, std::string>::iterator& it);
Error ReadInotifyEvents(int* bytes_read);
Error ProcessInotifyEvents(int bytes_in_buffer, FilesToSend* events);
Error FindEventPath(const InotifyEvent& event, std::string* folder, bool add_root);
Error FindEventPaths(const InotifyEvent& event, std::string* full_path, std::string* relative_path);
private:
std::unique_ptr<char[]> buffer_;
std::map<int, std::string> watched_folders_paths_;
......
......@@ -48,6 +48,8 @@ Error SetFolderMonConfig (const EventMonConfig& config) {
config_string += "," + std::string("\"Mode\":") + "\"" + mode + "\"";
config_string += "," + std::string("\"NThreads\":") + std::to_string(config.nthreads);
config_string += "," + std::string("\"LogLevel\":") + "\"" + log_level + "\"";
config_string += "," + std::string("\"RemoveAfterSend\":") + (config.remove_after_send ? "true" : "false");
std::string mon_folders;
for (auto folder : config.monitored_subfolders) {
mon_folders += "\"" + folder + "\"" + ",";
......
......@@ -59,6 +59,7 @@ TEST_F(ConfigTests, ReadSettingsOK) {
test_config.root_monitored_folder = "tmp";
test_config.monitored_subfolders = {"test1", "test2"};
test_config.ignored_extentions = {"tmp", "test"};
test_config.remove_after_send = true;
auto err = asapo::SetFolderMonConfig(test_config);
auto config = asapo::GetEventMonConfig();
......@@ -73,6 +74,7 @@ TEST_F(ConfigTests, ReadSettingsOK) {
ASSERT_THAT(config->monitored_subfolders, ElementsAre("test1", "test2"));
ASSERT_THAT(config->root_monitored_folder, Eq("tmp"));
ASSERT_THAT(config->ignored_extentions, ElementsAre("tmp", "test"));
ASSERT_THAT(config->remove_after_send, Eq(true));
}
......
......@@ -27,6 +27,8 @@ using ::asapo::Error;
using ::asapo::ErrorInterface;
using asapo::FilesToSend;
using asapo::SystemFolderWatch;
using asapo::FileInfos;
using asapo::FileInfo;
namespace {
......@@ -37,6 +39,16 @@ TEST(SystemFolderWatch, Constructor) {
ASSERT_THAT(dynamic_cast<asapo::Inotify*>(watch.inotify__.get()), Ne(nullptr));
}
FileInfos CreateTestFileInfos() {
FileInfos file_infos;
FileInfo fi;
fi.size = 100;
fi.name = "file1";
file_infos.push_back(fi);
fi.name = "subfolder/file2";
file_infos.push_back(fi);
return file_infos;
}
class SystemFolderWatchTests : public testing::Test {
......@@ -52,7 +64,7 @@ class SystemFolderWatchTests : public testing::Test {
std::vector<std::string> expected_watches{"/tmp/test1", "/tmp/test2", "/tmp/test1/sub11", "/tmp/test2/sub21", "/tmp/test2/sub22", "/tmp/test2/sub21/sub211"};
std::string expected_filename1{"file1"};
std::string expected_filename2{"file2"};
FileInfos expected_fileinfos = CreateTestFileInfos();
int expected_wd = 10;
std::vector<int>expected_fds = {1, 2, 3, 4, 5, 6};
void MockStartMonitoring();
......@@ -69,7 +81,7 @@ class SystemFolderWatchTests : public testing::Test {
}
ssize_t AddEventToBuffer(std::string filename, uint32_t mask, int fd);
void ExpectRead();
void ExpectCreateFolder(std::string folder);
void ExpectCreateFolder(std::string folder, bool with_files);
};
void SystemFolderWatchTests::MockStartMonitoring() {
......@@ -108,9 +120,9 @@ ACTION_P(A_CopyBuf, buffer) {
ssize_t SystemFolderWatchTests::AddEventToBuffer(std::string filename, uint32_t mask, int fd) {
ssize_t size = sizeof(struct inotify_event) + filename.size()+1;
ssize_t size = sizeof(struct inotify_event) + filename.size() + 1;
char* buf = (char*) malloc(size);
struct inotify_event* event=(struct inotify_event*) buf;
struct inotify_event* event = (struct inotify_event*) buf;
event->mask = mask;
event->wd = fd;
strcpy(event->name, filename.c_str());
......@@ -257,7 +269,7 @@ TEST_F(SystemFolderWatchTests, ProcessDeleteFolder) {
ASSERT_THAT(err, Eq(nullptr));
}
void SystemFolderWatchTests::ExpectCreateFolder(std::string folder) {
void SystemFolderWatchTests::ExpectCreateFolder(std::string folder, bool with_files) {
std::string newfolder = expected_root_folder + "/" + expected_folders[0] + "/" + folder;
EXPECT_CALL(mock_io, GetSubDirectories_t(newfolder, _))
.WillOnce(
......@@ -271,34 +283,61 @@ void SystemFolderWatchTests::ExpectCreateFolder(std::string folder) {
.WillOnce(
Return(1)
);
if (with_files) {
ON_CALL(mock_io, FilesInFolder_t(newfolder, _)).
WillByDefault(DoAll(testing::SetArgPointee<1>(nullptr),
testing::Return(expected_fileinfos)));
} else {
ON_CALL(mock_io, FilesInFolder_t(newfolder, _)).
WillByDefault(DoAll(testing::SetArgPointee<1>(nullptr),
testing::Return(FileInfos{})));
}
}
TEST_F(SystemFolderWatchTests, ProcessCreateFolder) {
TEST_F(SystemFolderWatchTests, ProcessCreateFolderWithFilesInIt) {
MockStartMonitoring();
AddEventToBuffer("folder", IN_ISDIR | IN_CREATE, expected_fds[0]);
ExpectRead();
ExpectCreateFolder("folder");
ExpectCreateFolder("folder", true);
Error err;
auto events = watch.GetFileList(&err);
ASSERT_THAT(events.size(), Eq(2));
ASSERT_THAT(events[0].c_str(), StrEq("test1/folder/file1"));
ASSERT_THAT(events[1].c_str(), StrEq("test1/folder/subfolder/file2"));
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(SystemFolderWatchTests, ProcessCreateFolderWithoutFilesInIt) {
MockStartMonitoring();
AddEventToBuffer("folder", IN_ISDIR | IN_CREATE, expected_fds[0]);
ExpectRead();
ExpectCreateFolder("folder", false);
Error err;
auto events = watch.GetFileList(&err);
ASSERT_THAT(events.size(), Eq(0));
ASSERT_THAT(err, Eq(nullptr));
}
TEST_F(SystemFolderWatchTests, ProcessMoveFolder) {
MockStartMonitoring();
AddEventToBuffer("sub21", IN_ISDIR | IN_MOVED_TO, expected_fds[0]);
AddEventToBuffer("sub21", IN_ISDIR | IN_MOVED_FROM, expected_fds[1]);
ExpectRead();
ExpectCreateFolder("sub21");
ExpectCreateFolder("sub21", false);
EXPECT_CALL(mock_inotify, DeleteWatch(expected_fds[3], expected_wd));
EXPECT_CALL(mock_inotify, DeleteWatch(expected_fds[5], expected_wd));
Error err;
auto events = watch.GetFileList(&err);
ASSERT_THAT(events.size(), Eq(0));
ASSERT_THAT(err, Eq(nullptr));
}
......
......@@ -7,7 +7,7 @@ trap Cleanup EXIT
Cleanup() {
set +e
echo cleanup
rm -rf /tmp/test_in /tmp/test_out output
rm -rf /tmp/test_in /tmp/test_out #output
kill -9 $producer_id &>/dev/null
}
......@@ -29,6 +29,10 @@ cat /tmp/test_out/test2/subdir/test3.dat | grep test3
test ! -e /tmp/test_out/test2/test2.tmp
test ! -e /tmp/test_in/test1/test1.dat
test ! -e /tmp/test_in/test2/subdir/test3.dat
kill -s INT $producer_id
sleep 0.5
cat output
......
......@@ -7,5 +7,6 @@
"LogLevel":"debug",
"RootMonitoredFolder":"/tmp/test_in",
"MonitoredSubFolders":["test1","test2"],
"IgnoreExtentions":["tmp"]
"IgnoreExtentions":["tmp"],
"RemoveAfterSend":true
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment