From 15e944989d364e7a7fa963c6ced06197679fb036 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Thu, 13 Sep 2018 17:18:29 +0200
Subject: [PATCH] continue windows folder monitor

---
 .../src/shared_event_list.cpp                 | 26 +++++++++--
 .../src/shared_event_list.h                   | 11 ++++-
 .../src/single_folder_watch_windows.cpp       |  9 +++-
 .../src/single_folder_watch_windows.h         |  2 +-
 .../event_monitor_producer/src/watch_io.cpp   |  2 +-
 .../event_monitor_producer/src/win_event.h    |  1 -
 .../test_single_folder_watch_windows.cpp      | 44 ++++++++++++++++++-
 7 files changed, 83 insertions(+), 12 deletions(-)

diff --git a/producer/event_monitor_producer/src/shared_event_list.cpp b/producer/event_monitor_producer/src/shared_event_list.cpp
index 0de09b9b1..c1c548edc 100644
--- a/producer/event_monitor_producer/src/shared_event_list.cpp
+++ b/producer/event_monitor_producer/src/shared_event_list.cpp
@@ -1,15 +1,35 @@
 #include "shared_event_list.h"
 
+#include <algorithm>
+
+using std::chrono::high_resolution_clock;
+
 namespace asapo {
 
 FilesToSend SharedEventList::GetAndClearEvents() {
     std::lock_guard<std::mutex> lock(mutex_);
-    FilesToSend events = std::move(events_);
-    events_.clear();
+    FilesToSend events;
+
+    for (auto it = events_.begin(); it != events_.end(); /* NOTHING */) {
+        uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() -
+            it->time).count();
+        if (elapsed_ms > 1000) {
+            events.push_back(it->file_name);
+            it = events_.erase(it);
+        } else {
+            ++it;
+        }
+    }
+
     return events;
 }
 void SharedEventList::AddEvent(std::string event) {
     std::lock_guard<std::mutex> lock(mutex_);
-    events_.emplace_back(std::move(event));
+    auto findIter = std::find_if(events_.begin(), events_.end(),  [&]( const SingleEvent& e ){ return e.file_name == event;});
+    if ( events_.end() == findIter ) {
+        events_.emplace_back(SingleEvent{std::move(event),high_resolution_clock::now()});
+    } else {
+        findIter->time = high_resolution_clock::now();
+    }
 }
 }
diff --git a/producer/event_monitor_producer/src/shared_event_list.h b/producer/event_monitor_producer/src/shared_event_list.h
index 1fb3b2edf..2ad9a8312 100644
--- a/producer/event_monitor_producer/src/shared_event_list.h
+++ b/producer/event_monitor_producer/src/shared_event_list.h
@@ -3,19 +3,26 @@
 
 #include <string>
 #include <vector>
+#include <list>
 #include <mutex>
-
+#include <windows.h>
+#include <chrono>
 #include "common.h"
 
 namespace asapo {
 
+struct SingleEvent {
+  std::string file_name;
+  std::chrono::high_resolution_clock::time_point time;
+};
+
 class SharedEventList {
  public:
   FilesToSend GetAndClearEvents();
   void AddEvent(std::string event);
  private:
   std::mutex mutex_;
-  FilesToSend events_;
+  std::list<SingleEvent> events_;
 };
 
 }
diff --git a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp
index a75b7ea86..7bef111f8 100644
--- a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp
+++ b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp
@@ -19,6 +19,9 @@ SingleFolderWatch::SingleFolderWatch(std::string root_folder, std::string folder
 }
 
 Error SingleFolderWatch::Init()  {
+    if (handle_) {
+        return nullptr;
+    }
     std::string full_path = this->root_folder_ + kPathSeparator + this->folder_;
     Error err;
     handle_ = this->watch_io__->Init(full_path.c_str(), &err);
@@ -33,16 +36,18 @@ void SingleFolderWatch::Watch() {
     if (Init()!=nullptr) {
         return;
     }
-
     DWORD bytes_read = 0;
     auto err =watch_io__->ReadDirectoryChanges(handle_,buffer_.get(),kBufLen,&bytes_read);
     if (err == nullptr) {
         ProcessEvents(bytes_read);
     }
+
 }
+
 Error SingleFolderWatch::ProcessEvent(const WinEvent &event) {
     event.Print();
-    event_list_->AddEvent(event.FileName());
+    std::string fname = event.FileName();
+    event_list_->AddEvent(fname);
     return nullptr;
 }
 void SingleFolderWatch::ProcessEvents(DWORD bytes_to_read) {
diff --git a/producer/event_monitor_producer/src/single_folder_watch_windows.h b/producer/event_monitor_producer/src/single_folder_watch_windows.h
index 1cd6443c3..ad8968b97 100644
--- a/producer/event_monitor_producer/src/single_folder_watch_windows.h
+++ b/producer/event_monitor_producer/src/single_folder_watch_windows.h
@@ -26,7 +26,7 @@ class SingleFolderWatch {
   std::string root_folder_;
   std::string folder_;
   Error Init();
-  HANDLE handle_;
+  HANDLE handle_{nullptr};
   SharedEventList* event_list_;
   std::unique_ptr<char[]> buffer_;
   Error ProcessEvent(const WinEvent& event);
diff --git a/producer/event_monitor_producer/src/watch_io.cpp b/producer/event_monitor_producer/src/watch_io.cpp
index ba13c9e40..001ebd3ae 100644
--- a/producer/event_monitor_producer/src/watch_io.cpp
+++ b/producer/event_monitor_producer/src/watch_io.cpp
@@ -23,7 +23,7 @@ WatchIO::WatchIO() :io_{GenerateDefaultIO()}{
 Error WatchIO::ReadDirectoryChanges(HANDLE handle, LPVOID buffer, DWORD buffer_length, LPDWORD bytes_returned) {
     DWORD filter = FILE_NOTIFY_CHANGE_FILE_NAME |
         FILE_NOTIFY_CHANGE_LAST_WRITE;
-    auto res = ReadDirectoryChangesW(handle,buffer,buffer_length,true,filter,bytes_returned,nullptr,nullptr);
+    auto res = ReadDirectoryChangesW(handle,buffer,buffer_length,true,filter,bytes_returned,nullptr,nullptr );
     printf("after read changes\n");
     if (res) {
         printf("after read changes ok\n");
diff --git a/producer/event_monitor_producer/src/win_event.h b/producer/event_monitor_producer/src/win_event.h
index 6f23f2743..2ac9d0a32 100644
--- a/producer/event_monitor_producer/src/win_event.h
+++ b/producer/event_monitor_producer/src/win_event.h
@@ -15,7 +15,6 @@ class WinEvent {
   std::string FileName() const ;
  private:
   const FILE_NOTIFY_INFORMATION* win_event_;
-
 };
 
 }
diff --git a/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp b/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp
index e8d04bb1b..d4446e2b8 100644
--- a/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp
+++ b/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp
@@ -148,10 +148,9 @@ TEST_F(SingleFolderWatchTests, InitErrorOnWatch) {
     watch.Watch();
 }
 
-TEST_F(SingleFolderWatchTests, WatchReadsDirectoryEvents) {
+TEST_F(SingleFolderWatchTests, WatchWaitsBeforeEventIsAvailable) {
     ExpectInit();
     AddEventToBuffer("test",FILE_ACTION_ADDED);
-    AddEventToBuffer("test2",FILE_ACTION_MODIFIED);
 
     ExpectRead();
     watch.Watch();
@@ -159,6 +158,47 @@ TEST_F(SingleFolderWatchTests, WatchReadsDirectoryEvents) {
     std::this_thread::sleep_for(std::chrono::milliseconds(30));
     auto files = event_list.GetAndClearEvents();
 
+    ASSERT_THAT(files.size(), Eq(0));
+}
+
+TEST_F(SingleFolderWatchTests, NewEventClearsTimeoutCounter) {
+    ExpectInit();
+    AddEventToBuffer("test",FILE_ACTION_ADDED);
+    AddEventToBuffer("test2",FILE_ACTION_MODIFIED);
+
+    ExpectRead();
+    watch.Watch();
+    std::this_thread::sleep_for(std::chrono::milliseconds(30));
+    Mock::VerifyAndClearExpectations(&mock_watch_io);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(800));
+    cur_buffer_pointer = 0;
+    AddEventToBuffer("test2",FILE_ACTION_MODIFIED);
+    ExpectRead();
+    watch.Watch();
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(300));
+
+    auto files = event_list.GetAndClearEvents();
+
+    ASSERT_THAT(files.size(), Eq(1));
+    ASSERT_THAT(files[0], StrEq("test"));
+}
+
+
+
+TEST_F(SingleFolderWatchTests, WatchReadsDirectoryEventsAfterTimeout) {
+    ExpectInit();
+    AddEventToBuffer("test",FILE_ACTION_ADDED);
+    AddEventToBuffer("test2",FILE_ACTION_MODIFIED);
+    AddEventToBuffer("test2",FILE_ACTION_MODIFIED);
+
+    ExpectRead();
+    watch.Watch();
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(1100));
+    auto files = event_list.GetAndClearEvents();
+
     ASSERT_THAT(files.size(), Eq(2));
     ASSERT_THAT(files[0], StrEq("test"));
     ASSERT_THAT(files[1], StrEq("test2"));
-- 
GitLab