diff --git a/producer/event_monitor_producer/CMakeLists.txt b/producer/event_monitor_producer/CMakeLists.txt index c817b0eafe480494e2161e050a3915b79ce45a82..993cff98d1f590232e53a9734dcaee8ed94cf380 100644 --- a/producer/event_monitor_producer/CMakeLists.txt +++ b/producer/event_monitor_producer/CMakeLists.txt @@ -10,6 +10,8 @@ IF(WIN32) set(SOURCE_FILES ${SOURCE_FILES} src/system_folder_watch_windows.cpp src/single_folder_watch_windows.cpp src/watch_io.cpp + src/shared_event_list.cpp + src/win_event.cpp ) ELSEIF(UNIX) set(SOURCE_FILES ${SOURCE_FILES} src/system_folder_watch_linux.cpp src/inotify_event.cpp src/inotify_linux.cpp) diff --git a/producer/event_monitor_producer/src/inotify_event.h b/producer/event_monitor_producer/src/inotify_event.h index a01d950c0c6748c5cdd6c70ed502e6f1330a2791..3ac932eafb727f680d566f4140edb2d274d1f14d 100644 --- a/producer/event_monitor_producer/src/inotify_event.h +++ b/producer/event_monitor_producer/src/inotify_event.h @@ -26,7 +26,7 @@ class InotifyEvent { void Print() const; private: const struct inotify_event* inotify_event_; - const std::map<int, std::string> watched_folders_paths_; + const std::map<int, std::string>& watched_folders_paths_; }; } diff --git a/producer/event_monitor_producer/src/shared_event_list.cpp b/producer/event_monitor_producer/src/shared_event_list.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0de09b9b1c45fb89a960918cfd344c0c6a5be004 --- /dev/null +++ b/producer/event_monitor_producer/src/shared_event_list.cpp @@ -0,0 +1,15 @@ +#include "shared_event_list.h" + +namespace asapo { + +FilesToSend SharedEventList::GetAndClearEvents() { + std::lock_guard<std::mutex> lock(mutex_); + FilesToSend events = std::move(events_); + events_.clear(); + return events; +} +void SharedEventList::AddEvent(std::string event) { + std::lock_guard<std::mutex> lock(mutex_); + events_.emplace_back(std::move(event)); +} +} diff --git a/producer/event_monitor_producer/src/shared_event_list.h b/producer/event_monitor_producer/src/shared_event_list.h new file mode 100644 index 0000000000000000000000000000000000000000..1fb3b2edf8f9966ce4709b2300f62fe4db104cf5 --- /dev/null +++ b/producer/event_monitor_producer/src/shared_event_list.h @@ -0,0 +1,23 @@ +#ifndef ASAPO_SHARED_EVENT_LIST_H +#define ASAPO_SHARED_EVENT_LIST_H + +#include <string> +#include <vector> +#include <mutex> + +#include "common.h" + +namespace asapo { + +class SharedEventList { + public: + FilesToSend GetAndClearEvents(); + void AddEvent(std::string event); + private: + std::mutex mutex_; + FilesToSend events_; +}; + +} + +#endif //ASAPO_SHARED_EVENT_LIST_H diff --git a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp index e0c3300f408cedbb1910e0f91333e541bcf1645c..a75b7ea86595d39a33d4cb981258c858756acf29 100644 --- a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp @@ -1,12 +1,20 @@ #include "single_folder_watch_windows.h" #include "eventmon_logger.h" + +#include <iostream> +#include <string> + namespace asapo { -SingleFolderWatch::SingleFolderWatch(std::string root_folder, std::string folder) : watch_io__{new WatchIO()}, +SingleFolderWatch::SingleFolderWatch(std::string root_folder, std::string folder,SharedEventList* event_list) : + watch_io__{new WatchIO()}, log__{GetDefaultEventMonLogger()}, root_folder_{std::move(root_folder)}, - folder_{std::move(folder)} + folder_{std::move(folder)}, + buffer_{new char[kBufLen]}, + event_list_{event_list} + { } @@ -22,9 +30,32 @@ Error SingleFolderWatch::Init() { } void SingleFolderWatch::Watch() { - if (!Init()) { + if (Init()!=nullptr) { return; } + + DWORD bytes_read = 0; + auto err =watch_io__->ReadDirectoryChanges(handle_,buffer_.get(),kBufLen,&bytes_read); + if (err == nullptr) { + ProcessEvents(bytes_read); + } +} +Error SingleFolderWatch::ProcessEvent(const WinEvent &event) { + event.Print(); + event_list_->AddEvent(event.FileName()); + return nullptr; +} +void SingleFolderWatch::ProcessEvents(DWORD bytes_to_read) { + for (char* p = buffer_.get(); p < buffer_.get() + bytes_to_read; ) { + WinEvent event{(FILE_NOTIFY_INFORMATION*) p}; + ProcessEvent(event); + p += event.Offset(); + if (event.Offset() == 0) { + break; + } + } } } + + diff --git a/producer/event_monitor_producer/src/single_folder_watch_windows.h b/producer/event_monitor_producer/src/single_folder_watch_windows.h index f2fef6262fc8868ecdaa4fda52989db29bb84491..1cd6443c331a5d5b8453a92255b33859deee955c 100644 --- a/producer/event_monitor_producer/src/single_folder_watch_windows.h +++ b/producer/event_monitor_producer/src/single_folder_watch_windows.h @@ -2,15 +2,23 @@ #define ASAPO_SINGLE_FOLDER_MONITOR_H #include <string> +#include <windows.h> +#include <winnt.h> + #include "watch_io.h" #include "logger/logger.h" +#include "shared_event_list.h" +#include "win_event.h" namespace asapo { +const uint64_t kBufLen = 1000 * (sizeof(FILE_NOTIFY_INFORMATION) + FILENAME_MAX + 1); + + class SingleFolderWatch { public: - explicit SingleFolderWatch(std::string root_folder,std::string folder); + explicit SingleFolderWatch(std::string root_folder,std::string folder,SharedEventList* event_list); void Watch(); std::unique_ptr<WatchIO> watch_io__; const AbstractLogger* log__; @@ -19,6 +27,10 @@ class SingleFolderWatch { std::string folder_; Error Init(); HANDLE handle_; + SharedEventList* event_list_; + std::unique_ptr<char[]> buffer_; + Error ProcessEvent(const WinEvent& event); + void ProcessEvents(DWORD bytes_to_read); }; } diff --git a/producer/event_monitor_producer/src/system_folder_watch_windows.cpp b/producer/event_monitor_producer/src/system_folder_watch_windows.cpp index 0157164d9a36273859261c42bdfd7cc14c746237..ed7efe0272f54b81f940e293d8461a0b00137813 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/src/system_folder_watch_windows.cpp @@ -11,14 +11,15 @@ namespace asapo { Error SystemFolderWatch::StartFolderMonitor(const std::string& root_folder, const std::vector<std::string>& monitored_folders) { for (auto& folder:monitored_folders ) { - auto thread = io__->NewThread([root_folder, folder] { - auto folder_watch = std::unique_ptr<SingleFolderWatch>(new SingleFolderWatch(root_folder, folder)); + auto thread = io__->NewThread([root_folder, folder,this] { + auto folder_watch = std::unique_ptr<SingleFolderWatch>(new SingleFolderWatch(root_folder, folder,&event_list_)); folder_watch->Watch(); }); - if (thread) { - thread->detach(); - } + threads_.emplace_back(std::move(thread)); +// if (thread) { +// thread->detach(); +// } } return nullptr; diff --git a/producer/event_monitor_producer/src/system_folder_watch_windows.h b/producer/event_monitor_producer/src/system_folder_watch_windows.h index a302b98a9d283967ad3780324343d8c73a53e9e5..d2d2f35f8c601f6012a9a61f23ff072ac97a78c9 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_windows.h +++ b/producer/event_monitor_producer/src/system_folder_watch_windows.h @@ -9,10 +9,12 @@ #include "asapo_producer.h" #include "common.h" #include "io/io.h" - +#include "shared_event_list.h" namespace asapo { + + class SystemFolderWatch { public: SystemFolderWatch(); @@ -21,7 +23,8 @@ class SystemFolderWatch { VIRTUAL FilesToSend GetFileList(Error* err); std::unique_ptr<IO> io__; private: - + SharedEventList event_list_; + std::vector<std::unique_ptr<std::thread>> threads_; }; } diff --git a/producer/event_monitor_producer/src/watch_io.cpp b/producer/event_monitor_producer/src/watch_io.cpp index d484a95b33cc2b606d6e8a70150b83753d39fe59..ba13c9e402e724de9dd1541c18ddc50d6f60a794 100644 --- a/producer/event_monitor_producer/src/watch_io.cpp +++ b/producer/event_monitor_producer/src/watch_io.cpp @@ -20,5 +20,18 @@ HANDLE WatchIO::Init(const char* folder, Error* err) { WatchIO::WatchIO() :io_{GenerateDefaultIO()}{ } +Error WatchIO::ReadDirectoryChanges(HANDLE handle, LPVOID buffer, DWORD buffer_length, LPDWORD bytes_returned) { + DWORD filter = FILE_NOTIFY_CHANGE_FILE_NAME | + FILE_NOTIFY_CHANGE_LAST_WRITE; + auto res = ReadDirectoryChangesW(handle,buffer,buffer_length,true,filter,bytes_returned,nullptr,nullptr); + printf("after read changes\n"); + if (res) { + printf("after read changes ok\n"); + return nullptr; + } else { + printf("after read problem\n"); + return io_->GetLastError(); + } +} } \ No newline at end of file diff --git a/producer/event_monitor_producer/src/watch_io.h b/producer/event_monitor_producer/src/watch_io.h index aa60c7ee0d52b65ed50296975a7c4610423446d3..3c6f72538e3b9dfdeb949d844527a82b07693947 100644 --- a/producer/event_monitor_producer/src/watch_io.h +++ b/producer/event_monitor_producer/src/watch_io.h @@ -13,6 +13,7 @@ class WatchIO { public: explicit WatchIO(); VIRTUAL HANDLE Init(const char* folder, Error* err); + VIRTUAL Error ReadDirectoryChanges(HANDLE handle,LPVOID buffer, DWORD buffer_length,LPDWORD bytes_returned); private: std::unique_ptr<IO>io_; }; diff --git a/producer/event_monitor_producer/src/win_event.cpp b/producer/event_monitor_producer/src/win_event.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ce41c6f27a3a7efb43f66f7d97a044787f8ca245 --- /dev/null +++ b/producer/event_monitor_producer/src/win_event.cpp @@ -0,0 +1,39 @@ +#include "win_event.h" + +#include <vector> + + +namespace asapo { + +WinEvent::WinEvent(const FILE_NOTIFY_INFORMATION* win_event):win_event_{win_event} { + +} +std::string WinEvent::FileName() const { + std::size_t len = win_event_->FileNameLength/ sizeof(WCHAR); + std::vector<char> buffer(len + 1); + buffer[len]=0; +// std::locale loc(""); +// std::use_facet<std::ctype<wchar_t> >(loc).narrow(win_event_->FileName, win_event_->FileName + len, '_', &buffer[0]); + for (size_t i=0;i<len;i++) { + buffer[i] = (char)win_event_->FileName[i]; + } + return std::string(&buffer[0], &buffer[len]); +} + +size_t WinEvent::Offset()const { + return win_event_->NextEntryOffset; +} + +void WinEvent::Print() const{ + printf("\nNew Event: "); + if (win_event_->Action == FILE_ACTION_ADDED) printf("FILE_ACTION_ADDED "); + if (win_event_->Action == FILE_ACTION_REMOVED) printf("FILE_ACTION_REMOVED "); + if (win_event_->Action == FILE_ACTION_MODIFIED) printf("FILE_ACTION_MODIFIED "); + if (win_event_->Action == FILE_ACTION_RENAMED_OLD_NAME) printf("FILE_ACTION_RENAMED_OLD_NAME "); + if (win_event_->Action == FILE_ACTION_RENAMED_NEW_NAME) printf("FILE_ACTION_RENAMED_NEW_NAME "); + printf("\n"); + if (win_event_->FileNameLength > 0) + printf("Filename: %s\n",FileName().c_str()); + +} +} diff --git a/producer/event_monitor_producer/src/win_event.h b/producer/event_monitor_producer/src/win_event.h new file mode 100644 index 0000000000000000000000000000000000000000..6f23f27435b355ca2d5abba015f14e511d3819b9 --- /dev/null +++ b/producer/event_monitor_producer/src/win_event.h @@ -0,0 +1,23 @@ +#ifndef ASAPO_WIN_EVENT_H +#define ASAPO_WIN_EVENT_H + +#include <windows.h> +#include <winnt.h> +#include <string> + +namespace asapo { + +class WinEvent { + public: + WinEvent(const FILE_NOTIFY_INFORMATION* win_event); + size_t Offset() const; + void Print() const; + std::string FileName() const ; + private: + const FILE_NOTIFY_INFORMATION* win_event_; + +}; + +} + +#endif //ASAPO_WIN_EVENT_H diff --git a/producer/event_monitor_producer/unittests/mock_watch_io.h b/producer/event_monitor_producer/unittests/mock_watch_io.h index dd08c057abed64d45678c782935d45f54087bf1d..12e0667730f2d100c007db05f702851cf8239a7f 100644 --- a/producer/event_monitor_producer/unittests/mock_watch_io.h +++ b/producer/event_monitor_producer/unittests/mock_watch_io.h @@ -18,6 +18,13 @@ class MockWatchIO : public WatchIO { } MOCK_METHOD2(Init_t, HANDLE (const char* folder, ErrorInterface** err)); + + Error ReadDirectoryChanges(HANDLE handle,LPVOID buffer, DWORD buffer_length,LPDWORD bytes_returned) override { + return Error{ReadDirectoryChanges_t(handle,buffer,buffer_length,bytes_returned)}; + } + + MOCK_METHOD4(ReadDirectoryChanges_t, ErrorInterface* (HANDLE handle,LPVOID buffer, DWORD buffer_length,LPDWORD bytes_returned)); + }; } diff --git a/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp b/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp index f7887c6f78c9ff9d68ba36caa2da65ca56aff727..e8d04bb1bc7995dc14ad215f7851e2ff17b7e563 100644 --- a/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp @@ -1,5 +1,7 @@ #include <gtest/gtest.h> #include <gmock/gmock.h> +#include <chrono> + #include "../src/single_folder_watch_windows.h" #include "../src/event_monitor_error.h" @@ -36,7 +38,7 @@ namespace { TEST(SingleFolderWatch, Constructor) { - SingleFolderWatch watch{"",""}; + SingleFolderWatch watch{"","",nullptr}; ASSERT_THAT(dynamic_cast<asapo::WatchIO*>(watch.watch_io__.get()), Ne(nullptr)); } @@ -60,18 +62,58 @@ class SingleFolderWatchTests : public testing::Test { std::string expected_root_folder = "c:\\tmp"; std::string expected_folder{"test1"}; HANDLE expected_handle = HANDLE(1); - SingleFolderWatch watch{expected_root_folder,expected_folder}; + asapo::SharedEventList event_list; + SingleFolderWatch watch{expected_root_folder,expected_folder,&event_list}; + char* buffer; + DWORD cur_buffer_pointer = 0; void SetUp() override { watch.watch_io__ = std::unique_ptr<asapo::WatchIO> {&mock_watch_io}; watch.log__ = &mock_logger; + buffer = new (char[asapo::kBufLen]); } void TearDown() override { watch.watch_io__.release(); + delete[] buffer; + } + void ExpectInit(); + void ExpectRead(); + DWORD AddEventToBuffer(std::string filename, DWORD action); + + }; + +DWORD SingleFolderWatchTests::AddEventToBuffer(std::string filename, DWORD action) { + size_t filename_size = filename.size(); + DWORD size = sizeof(FILE_NOTIFY_INFORMATION) + filename_size*sizeof(WCHAR); + char* buf = (char*) malloc(size); + FILE_NOTIFY_INFORMATION* event = (FILE_NOTIFY_INFORMATION*) buf; + event->NextEntryOffset = size; + event->Action = action; + for (size_t i=0;i<filename_size;i++) { + event->FileName[i] = filename[i]; } -}; + event->FileNameLength = filename_size* sizeof(WCHAR); + memcpy(buffer + cur_buffer_pointer, event, size); + cur_buffer_pointer += size; + free(buf); + return size; +} +ACTION_P(A_CopyBuf, buffer) { + memcpy(arg1, buffer, asapo::kBufLen); +} -TEST_F(SingleFolderWatchTests, InitWatchOnWatch) { + +void SingleFolderWatchTests::ExpectRead() { + EXPECT_CALL(mock_watch_io, ReadDirectoryChanges_t(expected_handle, _, asapo::kBufLen,_)) + .WillOnce(DoAll( + A_CopyBuf(buffer), + SetArgPointee<3>(cur_buffer_pointer), + Return(nullptr)) + ); +} + + +void SingleFolderWatchTests::ExpectInit() { EXPECT_CALL(mock_watch_io, Init_t(StrEq(expected_root_folder+asapo::kPathSeparator+expected_folder),_)). WillOnce(DoAll( SetArgPointee<1>(nullptr), @@ -79,6 +121,11 @@ TEST_F(SingleFolderWatchTests, InitWatchOnWatch) { ) ); + +} + +TEST_F(SingleFolderWatchTests, InitWatchOnWatch) { + ExpectInit(); watch.Watch(); } @@ -101,6 +148,23 @@ TEST_F(SingleFolderWatchTests, InitErrorOnWatch) { watch.Watch(); } +TEST_F(SingleFolderWatchTests, WatchReadsDirectoryEvents) { + ExpectInit(); + AddEventToBuffer("test",FILE_ACTION_ADDED); + AddEventToBuffer("test2",FILE_ACTION_MODIFIED); + + ExpectRead(); + watch.Watch(); + + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + auto files = event_list.GetAndClearEvents(); + + ASSERT_THAT(files.size(), Eq(2)); + ASSERT_THAT(files[0], StrEq("test")); + ASSERT_THAT(files[1], StrEq("test2")); + + +} diff --git a/producer/event_monitor_producer/unittests/test_system_folder_watch_windows.cpp b/producer/event_monitor_producer/unittests/test_system_folder_watch_windows.cpp index 300f6f8f1882eeb8f2cd2c81cf8de3bdc94dc20c..088eac984b89e013ff45cc24a31e80b40ebbe3c2 100644 --- a/producer/event_monitor_producer/unittests/test_system_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/unittests/test_system_folder_watch_windows.cpp @@ -43,7 +43,7 @@ FileInfos CreateTestFileInfos() { fi.size = 100; fi.name = "file1"; file_infos.push_back(fi); - fi.name = "subfolder/file2"; + fi.name = "subfolder\\file2"; file_infos.push_back(fi); return file_infos; } @@ -54,9 +54,9 @@ class SystemFolderWatchTests : public testing::Test { Error err; ::testing::NiceMock<asapo::MockIO> mock_io; SystemFolderWatch watch{}; - std::string expected_root_folder = "/tmp"; + std::string expected_root_folder = "c:\\tmp"; std::vector<std::string> expected_folders{"test1", "test2"}; - void SetUp() override { + void SetUp() override { watch.io__ = std::unique_ptr<asapo::IO> {&mock_io}; } void TearDown() override { @@ -64,7 +64,7 @@ class SystemFolderWatchTests : public testing::Test { } }; -TEST_F(SystemFolderWatchTests, ErrorInitInotifyStartMonitoring) { +TEST_F(SystemFolderWatchTests,StartMonitoring) { EXPECT_CALL(mock_io, NewThread_t(_)).Times(expected_folders.size()).