diff --git a/producer/event_monitor_producer/src/shared_event_list.cpp b/producer/event_monitor_producer/src/shared_event_list.cpp index 0de09b9b1c45fb89a960918cfd344c0c6a5be004..c1c548edcafb30122bdd9c87fcebd34fdaf862c0 100644 --- a/producer/event_monitor_producer/src/shared_event_list.cpp +++ b/producer/event_monitor_producer/src/shared_event_list.cpp @@ -1,15 +1,35 @@ #include "shared_event_list.h" +#include <algorithm> + +using std::chrono::high_resolution_clock; + namespace asapo { FilesToSend SharedEventList::GetAndClearEvents() { std::lock_guard<std::mutex> lock(mutex_); - FilesToSend events = std::move(events_); - events_.clear(); + FilesToSend events; + + for (auto it = events_.begin(); it != events_.end(); /* NOTHING */) { + uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() - + it->time).count(); + if (elapsed_ms > 1000) { + events.push_back(it->file_name); + it = events_.erase(it); + } else { + ++it; + } + } + return events; } void SharedEventList::AddEvent(std::string event) { std::lock_guard<std::mutex> lock(mutex_); - events_.emplace_back(std::move(event)); + auto findIter = std::find_if(events_.begin(), events_.end(), [&]( const SingleEvent& e ){ return e.file_name == event;}); + if ( events_.end() == findIter ) { + events_.emplace_back(SingleEvent{std::move(event),high_resolution_clock::now()}); + } else { + findIter->time = high_resolution_clock::now(); + } } } diff --git a/producer/event_monitor_producer/src/shared_event_list.h b/producer/event_monitor_producer/src/shared_event_list.h index 1fb3b2edf8f9966ce4709b2300f62fe4db104cf5..2ad9a83121187366125d66cdc4090950cef88651 100644 --- a/producer/event_monitor_producer/src/shared_event_list.h +++ b/producer/event_monitor_producer/src/shared_event_list.h @@ -3,19 +3,26 @@ #include <string> #include <vector> +#include <list> #include <mutex> - +#include <windows.h> +#include <chrono> #include "common.h" namespace asapo { +struct SingleEvent { + std::string file_name; + std::chrono::high_resolution_clock::time_point time; +}; + class SharedEventList { public: FilesToSend GetAndClearEvents(); void AddEvent(std::string event); private: std::mutex mutex_; - FilesToSend events_; + std::list<SingleEvent> events_; }; } diff --git a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp index a75b7ea86595d39a33d4cb981258c858756acf29..7bef111f8fc1625ae090f3596509148f6cd81a14 100644 --- a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp @@ -19,6 +19,9 @@ SingleFolderWatch::SingleFolderWatch(std::string root_folder, std::string folder } Error SingleFolderWatch::Init() { + if (handle_) { + return nullptr; + } std::string full_path = this->root_folder_ + kPathSeparator + this->folder_; Error err; handle_ = this->watch_io__->Init(full_path.c_str(), &err); @@ -33,16 +36,18 @@ void SingleFolderWatch::Watch() { if (Init()!=nullptr) { return; } - DWORD bytes_read = 0; auto err =watch_io__->ReadDirectoryChanges(handle_,buffer_.get(),kBufLen,&bytes_read); if (err == nullptr) { ProcessEvents(bytes_read); } + } + Error SingleFolderWatch::ProcessEvent(const WinEvent &event) { event.Print(); - event_list_->AddEvent(event.FileName()); + std::string fname = event.FileName(); + event_list_->AddEvent(fname); return nullptr; } void SingleFolderWatch::ProcessEvents(DWORD bytes_to_read) { diff --git a/producer/event_monitor_producer/src/single_folder_watch_windows.h b/producer/event_monitor_producer/src/single_folder_watch_windows.h index 1cd6443c331a5d5b8453a92255b33859deee955c..ad8968b97235f686dff33633010c48587097616e 100644 --- a/producer/event_monitor_producer/src/single_folder_watch_windows.h +++ b/producer/event_monitor_producer/src/single_folder_watch_windows.h @@ -26,7 +26,7 @@ class SingleFolderWatch { std::string root_folder_; std::string folder_; Error Init(); - HANDLE handle_; + HANDLE handle_{nullptr}; SharedEventList* event_list_; std::unique_ptr<char[]> buffer_; Error ProcessEvent(const WinEvent& event); diff --git a/producer/event_monitor_producer/src/watch_io.cpp b/producer/event_monitor_producer/src/watch_io.cpp index ba13c9e402e724de9dd1541c18ddc50d6f60a794..001ebd3aefecbcf78dbd4188a86948eae36eb1dc 100644 --- a/producer/event_monitor_producer/src/watch_io.cpp +++ b/producer/event_monitor_producer/src/watch_io.cpp @@ -23,7 +23,7 @@ WatchIO::WatchIO() :io_{GenerateDefaultIO()}{ Error WatchIO::ReadDirectoryChanges(HANDLE handle, LPVOID buffer, DWORD buffer_length, LPDWORD bytes_returned) { DWORD filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE; - auto res = ReadDirectoryChangesW(handle,buffer,buffer_length,true,filter,bytes_returned,nullptr,nullptr); + auto res = ReadDirectoryChangesW(handle,buffer,buffer_length,true,filter,bytes_returned,nullptr,nullptr ); printf("after read changes\n"); if (res) { printf("after read changes ok\n"); diff --git a/producer/event_monitor_producer/src/win_event.h b/producer/event_monitor_producer/src/win_event.h index 6f23f27435b355ca2d5abba015f14e511d3819b9..2ac9d0a32a5f67816481bf10e94dfd1c1abb5f53 100644 --- a/producer/event_monitor_producer/src/win_event.h +++ b/producer/event_monitor_producer/src/win_event.h @@ -15,7 +15,6 @@ class WinEvent { std::string FileName() const ; private: const FILE_NOTIFY_INFORMATION* win_event_; - }; } diff --git a/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp b/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp index e8d04bb1bc7995dc14ad215f7851e2ff17b7e563..d4446e2b810375e8d814b11a1a90bfae3a2a056d 100644 --- a/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp @@ -148,10 +148,9 @@ TEST_F(SingleFolderWatchTests, InitErrorOnWatch) { watch.Watch(); } -TEST_F(SingleFolderWatchTests, WatchReadsDirectoryEvents) { +TEST_F(SingleFolderWatchTests, WatchWaitsBeforeEventIsAvailable) { ExpectInit(); AddEventToBuffer("test",FILE_ACTION_ADDED); - AddEventToBuffer("test2",FILE_ACTION_MODIFIED); ExpectRead(); watch.Watch(); @@ -159,6 +158,47 @@ TEST_F(SingleFolderWatchTests, WatchReadsDirectoryEvents) { std::this_thread::sleep_for(std::chrono::milliseconds(30)); auto files = event_list.GetAndClearEvents(); + ASSERT_THAT(files.size(), Eq(0)); +} + +TEST_F(SingleFolderWatchTests, NewEventClearsTimeoutCounter) { + ExpectInit(); + AddEventToBuffer("test",FILE_ACTION_ADDED); + AddEventToBuffer("test2",FILE_ACTION_MODIFIED); + + ExpectRead(); + watch.Watch(); + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + Mock::VerifyAndClearExpectations(&mock_watch_io); + + std::this_thread::sleep_for(std::chrono::milliseconds(800)); + cur_buffer_pointer = 0; + AddEventToBuffer("test2",FILE_ACTION_MODIFIED); + ExpectRead(); + watch.Watch(); + + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + auto files = event_list.GetAndClearEvents(); + + ASSERT_THAT(files.size(), Eq(1)); + ASSERT_THAT(files[0], StrEq("test")); +} + + + +TEST_F(SingleFolderWatchTests, WatchReadsDirectoryEventsAfterTimeout) { + ExpectInit(); + AddEventToBuffer("test",FILE_ACTION_ADDED); + AddEventToBuffer("test2",FILE_ACTION_MODIFIED); + AddEventToBuffer("test2",FILE_ACTION_MODIFIED); + + ExpectRead(); + watch.Watch(); + + std::this_thread::sleep_for(std::chrono::milliseconds(1100)); + auto files = event_list.GetAndClearEvents(); + ASSERT_THAT(files.size(), Eq(2)); ASSERT_THAT(files[0], StrEq("test")); ASSERT_THAT(files[1], StrEq("test2"));