diff --git a/producer/event_monitor_producer/CMakeLists.txt b/producer/event_monitor_producer/CMakeLists.txt index 993cff98d1f590232e53a9734dcaee8ed94cf380..fb44c774f3654b14d4dda1d2b7da2c14c966f7a0 100644 --- a/producer/event_monitor_producer/CMakeLists.txt +++ b/producer/event_monitor_producer/CMakeLists.txt @@ -32,8 +32,6 @@ target_link_libraries(${TARGET_NAME} ${ASAPO_COMMON_IO_LIBRARIES}) target_link_libraries(${TARGET_NAME} producer-api) - - ################################ # Executable ################################ diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp index e2ff71129fafbd6bf93abb2be94cbd1421a48ccd..09fc559ed3aaaed8f913055025af39cdcf7aed2f 100644 --- a/producer/event_monitor_producer/src/main_eventmon.cpp +++ b/producer/event_monitor_producer/src/main_eventmon.cpp @@ -105,7 +105,7 @@ int main (int argc, char* argv[]) { asapo::EventHeader event_header; auto err = event_detector->GetNextEvent(&event_header); if (stop_signal) { - break; // we check it here because signal can interrupt system call (ready by inotify and result n incomplete event data) + break; // we check it here because signal can interrupt system call (ready by inotify and result in incomplete event data) } if (err) { if (err != asapo::EventMonitorErrorTemplates::kNoNewEvent) { diff --git a/producer/event_monitor_producer/src/shared_event_list.cpp b/producer/event_monitor_producer/src/shared_event_list.cpp index c1c548edcafb30122bdd9c87fcebd34fdaf862c0..7ea80afcdf27756d06b83a41b4eb417079dcbd46 100644 --- a/producer/event_monitor_producer/src/shared_event_list.cpp +++ b/producer/event_monitor_producer/src/shared_event_list.cpp @@ -9,25 +9,23 @@ namespace asapo { FilesToSend SharedEventList::GetAndClearEvents() { std::lock_guard<std::mutex> lock(mutex_); FilesToSend events; - for (auto it = events_.begin(); it != events_.end(); /* NOTHING */) { uint64_t elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>( high_resolution_clock::now() - it->time).count(); - if (elapsed_ms > 1000) { + if (!it->apply_delay || elapsed_ms > kFileDelayMs) { events.push_back(it->file_name); it = events_.erase(it); } else { ++it; } } - return events; } -void SharedEventList::AddEvent(std::string event) { +void SharedEventList::AddEvent(std::string event,bool apply_delay) { std::lock_guard<std::mutex> lock(mutex_); auto findIter = std::find_if(events_.begin(), events_.end(), [&]( const SingleEvent& e ){ return e.file_name == event;}); if ( events_.end() == findIter ) { - events_.emplace_back(SingleEvent{std::move(event),high_resolution_clock::now()}); + events_.emplace_back(SingleEvent{std::move(event),high_resolution_clock::now(),apply_delay}); } else { findIter->time = high_resolution_clock::now(); } diff --git a/producer/event_monitor_producer/src/shared_event_list.h b/producer/event_monitor_producer/src/shared_event_list.h index 2ad9a83121187366125d66cdc4090950cef88651..737ba5803f59536286fc9f13358c04c362f2fa82 100644 --- a/producer/event_monitor_producer/src/shared_event_list.h +++ b/producer/event_monitor_producer/src/shared_event_list.h @@ -11,15 +11,18 @@ namespace asapo { +const uint64_t kFileDelayMs = 500; + struct SingleEvent { std::string file_name; std::chrono::high_resolution_clock::time_point time; + bool apply_delay; }; class SharedEventList { public: FilesToSend GetAndClearEvents(); - void AddEvent(std::string event); + void AddEvent(std::string event,bool apply_delay); private: std::mutex mutex_; std::list<SingleEvent> events_; diff --git a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp index 7bef111f8fc1625ae090f3596509148f6cd81a14..ecb4283d241e8e2e82e484ac2b0b86daa0da59a5 100644 --- a/producer/event_monitor_producer/src/single_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/src/single_folder_watch_windows.cpp @@ -32,22 +32,33 @@ Error SingleFolderWatch::Init() { return nullptr; } -void SingleFolderWatch::Watch() { - if (Init()!=nullptr) { - return; +Error SingleFolderWatch::Watch() { + auto err = Init(); + if (err) { + return err; } DWORD bytes_read = 0; - auto err =watch_io__->ReadDirectoryChanges(handle_,buffer_.get(),kBufLen,&bytes_read); + err =watch_io__->ReadDirectoryChanges(handle_,buffer_.get(),kBufLen,&bytes_read); if (err == nullptr) { ProcessEvents(bytes_read); } + return err; } Error SingleFolderWatch::ProcessEvent(const WinEvent &event) { + + if (!event.ShouldInitiateTransfer()) { + return nullptr; + } + + std::string fname = folder_+kPathSeparator + event.FileName(); + if (watch_io__->IsDirectory(root_folder_+kPathSeparator+fname)) { + return nullptr; + } + event.Print(); - std::string fname = event.FileName(); - event_list_->AddEvent(fname); + event_list_->AddEvent(fname,event.ShouldBeProcessedAfterDelay()); return nullptr; } void SingleFolderWatch::ProcessEvents(DWORD bytes_to_read) { diff --git a/producer/event_monitor_producer/src/single_folder_watch_windows.h b/producer/event_monitor_producer/src/single_folder_watch_windows.h index ad8968b97235f686dff33633010c48587097616e..2ffe25764a9d7458952252a4573f7496761c9a06 100644 --- a/producer/event_monitor_producer/src/single_folder_watch_windows.h +++ b/producer/event_monitor_producer/src/single_folder_watch_windows.h @@ -19,7 +19,7 @@ const uint64_t kBufLen = 1000 * (sizeof(FILE_NOTIFY_INFORMATION) + FILENAME_MAX class SingleFolderWatch { public: explicit SingleFolderWatch(std::string root_folder,std::string folder,SharedEventList* event_list); - void Watch(); + Error Watch(); std::unique_ptr<WatchIO> watch_io__; const AbstractLogger* log__; private: diff --git a/producer/event_monitor_producer/src/system_folder_watch_windows.cpp b/producer/event_monitor_producer/src/system_folder_watch_windows.cpp index ed7efe0272f54b81f940e293d8461a0b00137813..b878c26cd24e07961622ce43d8f08e8f4ac8aad3 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/src/system_folder_watch_windows.cpp @@ -13,23 +13,25 @@ Error SystemFolderWatch::StartFolderMonitor(const std::string& root_folder, for (auto& folder:monitored_folders ) { auto thread = io__->NewThread([root_folder, folder,this] { auto folder_watch = std::unique_ptr<SingleFolderWatch>(new SingleFolderWatch(root_folder, folder,&event_list_)); - folder_watch->Watch(); + while (true) { + auto err = folder_watch->Watch(); + if (err) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + } }); threads_.emplace_back(std::move(thread)); -// if (thread) { -// thread->detach(); -// } } return nullptr; } FilesToSend SystemFolderWatch::GetFileList(Error* err) { - FilesToSend events; *err = nullptr; - return events; + return event_list_.GetAndClearEvents(); } + SystemFolderWatch::SystemFolderWatch() :io__{GenerateDefaultIO()}{ } diff --git a/producer/event_monitor_producer/src/watch_io.cpp b/producer/event_monitor_producer/src/watch_io.cpp index 001ebd3aefecbcf78dbd4188a86948eae36eb1dc..4f3e865b5475ce4c16be7d3dc8d7c839b06b4b2d 100644 --- a/producer/event_monitor_producer/src/watch_io.cpp +++ b/producer/event_monitor_producer/src/watch_io.cpp @@ -1,5 +1,6 @@ #include "watch_io.h" #include "io/io_factory.h" + namespace asapo { HANDLE WatchIO::Init(const char* folder, Error* err) { @@ -24,14 +25,16 @@ Error WatchIO::ReadDirectoryChanges(HANDLE handle, LPVOID buffer, DWORD buffer_l DWORD filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE; auto res = ReadDirectoryChangesW(handle,buffer,buffer_length,true,filter,bytes_returned,nullptr,nullptr ); - printf("after read changes\n"); if (res) { - printf("after read changes ok\n"); return nullptr; } else { - printf("after read problem\n"); return io_->GetLastError(); } } +bool WatchIO::IsDirectory(const std::string &path) { + auto attr = GetFileAttributesA(path.c_str()); + return (attr & FILE_ATTRIBUTE_DIRECTORY) > 0; +} + } \ No newline at end of file diff --git a/producer/event_monitor_producer/src/watch_io.h b/producer/event_monitor_producer/src/watch_io.h index 3c6f72538e3b9dfdeb949d844527a82b07693947..037f6ada9233670cd13245eba35a8b2e38c08c78 100644 --- a/producer/event_monitor_producer/src/watch_io.h +++ b/producer/event_monitor_producer/src/watch_io.h @@ -14,6 +14,7 @@ class WatchIO { explicit WatchIO(); VIRTUAL HANDLE Init(const char* folder, Error* err); VIRTUAL Error ReadDirectoryChanges(HANDLE handle,LPVOID buffer, DWORD buffer_length,LPDWORD bytes_returned); + VIRTUAL bool IsDirectory(const std::string& path); private: std::unique_ptr<IO>io_; }; diff --git a/producer/event_monitor_producer/src/win_event.cpp b/producer/event_monitor_producer/src/win_event.cpp index ce41c6f27a3a7efb43f66f7d97a044787f8ca245..b4a7623bc4d880a23b2df5d77f77cd4e087547bb 100644 --- a/producer/event_monitor_producer/src/win_event.cpp +++ b/producer/event_monitor_producer/src/win_event.cpp @@ -36,4 +36,16 @@ void WinEvent::Print() const{ printf("Filename: %s\n",FileName().c_str()); } +bool WinEvent::IsFileModifiedEvent() const { + return win_event_->Action == FILE_ACTION_MODIFIED; +} +bool WinEvent::IsFileMovedEvent() const { + return win_event_->Action == FILE_ACTION_RENAMED_NEW_NAME; +} +bool WinEvent::ShouldInitiateTransfer() const { + return IsFileModifiedEvent() || IsFileMovedEvent(); +} +bool WinEvent::ShouldBeProcessedAfterDelay() const { + return !IsFileMovedEvent(); +} } diff --git a/producer/event_monitor_producer/src/win_event.h b/producer/event_monitor_producer/src/win_event.h index 2ac9d0a32a5f67816481bf10e94dfd1c1abb5f53..b0b1e4907ae9d70c8249a33495f445d11b8b62a9 100644 --- a/producer/event_monitor_producer/src/win_event.h +++ b/producer/event_monitor_producer/src/win_event.h @@ -12,7 +12,11 @@ class WinEvent { WinEvent(const FILE_NOTIFY_INFORMATION* win_event); size_t Offset() const; void Print() const; - std::string FileName() const ; + std::string FileName() const; + bool IsFileModifiedEvent() const; + bool IsFileMovedEvent() const ; + bool ShouldInitiateTransfer() const ; + bool ShouldBeProcessedAfterDelay() const ; private: const FILE_NOTIFY_INFORMATION* win_event_; }; diff --git a/producer/event_monitor_producer/unittests/mock_watch_io.h b/producer/event_monitor_producer/unittests/mock_watch_io.h index 12e0667730f2d100c007db05f702851cf8239a7f..a6b380df3d1fdba4e12f2c90235667f125bb6696 100644 --- a/producer/event_monitor_producer/unittests/mock_watch_io.h +++ b/producer/event_monitor_producer/unittests/mock_watch_io.h @@ -25,7 +25,10 @@ class MockWatchIO : public WatchIO { MOCK_METHOD4(ReadDirectoryChanges_t, ErrorInterface* (HANDLE handle,LPVOID buffer, DWORD buffer_length,LPDWORD bytes_returned)); -}; + MOCK_METHOD1(IsDirectory, bool (const std::string&)); + + + }; } diff --git a/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp b/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp index d4446e2b810375e8d814b11a1a90bfae3a2a056d..5f01512353baf849ae766d061393dfbe2fe00aed 100644 --- a/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/unittests/test_single_folder_watch_windows.cpp @@ -42,18 +42,6 @@ TEST(SingleFolderWatch, Constructor) { ASSERT_THAT(dynamic_cast<asapo::WatchIO*>(watch.watch_io__.get()), Ne(nullptr)); } -FileInfos CreateTestFileInfos() { - FileInfos file_infos; - FileInfo fi; - fi.size = 100; - fi.name = "file1"; - file_infos.push_back(fi); - fi.name = "subfolder/file2"; - file_infos.push_back(fi); - return file_infos; -} - - class SingleFolderWatchTests : public testing::Test { public: Error err; @@ -77,6 +65,7 @@ class SingleFolderWatchTests : public testing::Test { } void ExpectInit(); void ExpectRead(); + void ExpectDirectory(bool yes); DWORD AddEventToBuffer(std::string filename, DWORD action); }; @@ -122,6 +111,12 @@ void SingleFolderWatchTests::ExpectInit() { ); +} +void SingleFolderWatchTests::ExpectDirectory(bool yes) { + EXPECT_CALL(mock_watch_io, IsDirectory(_)). + WillRepeatedly(Return(yes) + ); + } TEST_F(SingleFolderWatchTests, InitWatchOnWatch) { @@ -150,8 +145,8 @@ TEST_F(SingleFolderWatchTests, InitErrorOnWatch) { TEST_F(SingleFolderWatchTests, WatchWaitsBeforeEventIsAvailable) { ExpectInit(); - AddEventToBuffer("test",FILE_ACTION_ADDED); - + AddEventToBuffer("test",FILE_ACTION_MODIFIED); + ExpectDirectory(false); ExpectRead(); watch.Watch(); @@ -163,9 +158,9 @@ TEST_F(SingleFolderWatchTests, WatchWaitsBeforeEventIsAvailable) { TEST_F(SingleFolderWatchTests, NewEventClearsTimeoutCounter) { ExpectInit(); - AddEventToBuffer("test",FILE_ACTION_ADDED); + AddEventToBuffer("test",FILE_ACTION_MODIFIED); AddEventToBuffer("test2",FILE_ACTION_MODIFIED); - + ExpectDirectory(false); ExpectRead(); watch.Watch(); std::this_thread::sleep_for(std::chrono::milliseconds(30)); @@ -182,28 +177,71 @@ TEST_F(SingleFolderWatchTests, NewEventClearsTimeoutCounter) { auto files = event_list.GetAndClearEvents(); ASSERT_THAT(files.size(), Eq(1)); - ASSERT_THAT(files[0], StrEq("test")); + ASSERT_THAT(files[0], StrEq(expected_folder+"\\test")); } TEST_F(SingleFolderWatchTests, WatchReadsDirectoryEventsAfterTimeout) { ExpectInit(); - AddEventToBuffer("test",FILE_ACTION_ADDED); + AddEventToBuffer("test",FILE_ACTION_MODIFIED); AddEventToBuffer("test2",FILE_ACTION_MODIFIED); AddEventToBuffer("test2",FILE_ACTION_MODIFIED); - + ExpectDirectory(false); ExpectRead(); watch.Watch(); - std::this_thread::sleep_for(std::chrono::milliseconds(1100)); + std::this_thread::sleep_for(std::chrono::milliseconds(asapo::kFileDelayMs+10)); auto files = event_list.GetAndClearEvents(); ASSERT_THAT(files.size(), Eq(2)); - ASSERT_THAT(files[0], StrEq("test")); - ASSERT_THAT(files[1], StrEq("test2")); + ASSERT_THAT(files[0], StrEq(expected_folder+"\\test")); + ASSERT_THAT(files[1], StrEq(expected_folder+"\\test2")); + + +} + + +TEST_F(SingleFolderWatchTests, DirectoriesAreIgnored) { + ExpectInit(); + AddEventToBuffer("test",FILE_ACTION_MODIFIED); + ExpectDirectory(true); + ExpectRead(); + watch.Watch(); + + std::this_thread::sleep_for(std::chrono::milliseconds(asapo::kFileDelayMs+10)); + auto files = event_list.GetAndClearEvents(); + ASSERT_THAT(files.size(), Eq(0)); +} + + +TEST_F(SingleFolderWatchTests, OtherEventTypesAreIgnored) { + ExpectInit(); + AddEventToBuffer("test1",FILE_ACTION_ADDED); + AddEventToBuffer("test2",FILE_ACTION_REMOVED); + AddEventToBuffer("test3",FILE_ACTION_RENAMED_OLD_NAME); + ExpectDirectory(false); + ExpectRead(); + watch.Watch(); + std::this_thread::sleep_for(std::chrono::milliseconds(asapo::kFileDelayMs+10)); + auto files = event_list.GetAndClearEvents(); + + ASSERT_THAT(files.size(), Eq(0)); +} + +TEST_F(SingleFolderWatchTests, NoWaitOnRenameEvent) { + ExpectInit(); + AddEventToBuffer("test",FILE_ACTION_RENAMED_NEW_NAME); + ExpectDirectory(false); + ExpectRead(); + watch.Watch(); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + auto files = event_list.GetAndClearEvents(); + + ASSERT_THAT(files.size(), Eq(1)); } diff --git a/tests/automatic/producer/CMakeLists.txt b/tests/automatic/producer/CMakeLists.txt index d0a774182a12521b9632c16ec0b6dcec025fc7a3..eaebbcdc3a681ee3bf8edf49017fc69b6b66073e 100644 --- a/tests/automatic/producer/CMakeLists.txt +++ b/tests/automatic/producer/CMakeLists.txt @@ -1,4 +1 @@ -#todo: add windows -if (UNIX) - add_subdirectory(file_monitor_producer) -endif() +add_subdirectory(file_monitor_producer) diff --git a/tests/automatic/producer/file_monitor_producer/CMakeLists.txt b/tests/automatic/producer/file_monitor_producer/CMakeLists.txt index 09da2f2aba4c46f37334b6b978a8d1706d9fc715..b9c4f876f8ffb2ca9093b3177245e8e8aca1d9fd 100644 --- a/tests/automatic/producer/file_monitor_producer/CMakeLists.txt +++ b/tests/automatic/producer/file_monitor_producer/CMakeLists.txt @@ -3,5 +3,11 @@ set(TARGET_NAME file-monitor-producer) ################################ # Testing ################################ -configure_file(test.json test.json COPYONLY) +if (UNIX) + set (ROOT_PATH "/tmp/asapo/") +else() + set (ROOT_PATH "c:\\\\tmp\\\\asapo\\\\") +endif() + +configure_file(test.json.in test.json @ONLY) add_script_test("${TARGET_NAME}" "$<TARGET_FILE:event-monitor-producer-bin>" nomem) diff --git a/tests/automatic/producer/file_monitor_producer/check_windows.bat b/tests/automatic/producer/file_monitor_producer/check_windows.bat new file mode 100644 index 0000000000000000000000000000000000000000..db53ef2d3337bf251771837a671c0f0e52ee7120 --- /dev/null +++ b/tests/automatic/producer/file_monitor_producer/check_windows.bat @@ -0,0 +1,51 @@ +SET root_folder=c:\tmp\asapo +set short_name="%~nx1" + + +ping 1.0.0.0 -n 1 -w 100 > nul + +mkdir %root_folder%\test_in\test1 +mkdir %root_folder%\test_in\test2 +mkdir %root_folder%\test_out + +start /B "" "%1" test.json > output +ping 1.0.0.0 -n 2 -w 100 > nul + +echo test1 > %root_folder%\test_in\test1\test1.dat +echo test2 > %root_folder%\test_in\test2\test1.tmp + +mkdir %root_folder%\test_in\test2\subdir +echo test3 > %root_folder%\test_in\test2\subdir\test3.dat + +ping 1.0.0.0 -n 2 -w 100 > nul + +type %root_folder%\test_out\test1\test1.dat | findstr /c:"test1" || goto :error +type %root_folder%\test_out\test2\subdir\test3.dat | findstr /c:"test3" || goto :error + +if exist "%root_folder%\test_out\test2\test2.tmp" ( +goto :error +) + +if exist "%root_folder%\test_in\test1\test1.dat" ( +goto :error +) + +if exist "%root_folder%\test_in\test2\subdir\test3.dat" ( +goto :error +) + +rem type output +rem type output | findstr /c:"Processed 2" || goto :error + +goto :clean + +:error +call :clean +exit /b 1 + +:clean +Taskkill /IM "%short_name%" /F +rmdir /S /Q %root_folder%\test_in +rmdir /S /Q %root_folder%\test_out + + diff --git a/tests/automatic/producer/file_monitor_producer/test.json b/tests/automatic/producer/file_monitor_producer/test.json.in similarity index 70% rename from tests/automatic/producer/file_monitor_producer/test.json rename to tests/automatic/producer/file_monitor_producer/test.json.in index ec29bce26c88568f87c8d37307629a8694cc1110..ca77f2d906a2136ff200ae42ad8a6b0feec1c7c5 100644 --- a/tests/automatic/producer/file_monitor_producer/test.json +++ b/tests/automatic/producer/file_monitor_producer/test.json.in @@ -1,11 +1,11 @@ { - "AsapoEndpoint":"/tmp/test_out", + "AsapoEndpoint":"@ROOT_PATH@test_out", "Tag":"test_tag", "BeamtimeID":"asapo_test", "Mode":"filesystem", "NThreads":1, "LogLevel":"debug", - "RootMonitoredFolder":"/tmp/test_in", + "RootMonitoredFolder":"@ROOT_PATH@test_in", "MonitoredSubFolders":["test1","test2"], "IgnoreExtentions":["tmp"], "RemoveAfterSend":true