Skip to content
Snippets Groups Projects
Commit 335e8967 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

Merge pull request #54 in ASAPO/asapo from bugfix_producer-always-removes-files to develop

* commit '86da16ad':
  allow black and white lists coexist for snooper
  add whitelist to file snooper. Correct extension spelling in config file
  add stream to the file snupper
  fix always deleting files bug
parents f7bb3fc7 86da16ad
No related branches found
No related tags found
No related merge requests found
Showing
with 104 additions and 28 deletions
......@@ -39,13 +39,15 @@ Error EventMonConfigFactory::ParseConfigFile(std::string file_name) {
(err = parser.GetString("AsapoEndpoint", &config.asapo_endpoint)) ||
(err = parser.GetString("Tag", &config.tag)) ||
(err = parser.GetString("BeamtimeID", &config.beamtime_id)) ||
(err = parser.GetString("Stream", &config.stream)) ||
(err = parser.GetString("Mode", &config.mode_str)) ||
(err = parser.GetUInt64("NThreads", &config.nthreads)) ||
(err = parser.GetString("RootMonitoredFolder", &config.root_monitored_folder)) ||
(err = parser.GetString("LogLevel", &config.log_level_str)) ||
(err = parser.GetBool("RemoveAfterSend", &config.remove_after_send)) ||
(err = parser.GetArrayString("MonitoredSubFolders", &config.monitored_subfolders)) ||
(err = parser.GetArrayString("IgnoreExtentions", &config.ignored_extentions)) ||
(err = parser.GetArrayString("IgnoreExtensions", &config.ignored_extensions)) ||
(err = parser.GetArrayString("WhitelistExtensions", &config.whitelisted_extensions)) ||
(err = parser.Embedded("Subset").GetString("Mode", &subset_mode)) ||
(err = SubsetModeToEnum(subset_mode, &config.subset_mode));
if (err) {
......@@ -104,7 +106,6 @@ Error EventMonConfigFactory::CheckLogLevel() {
return err;
}
Error EventMonConfigFactory::CheckNThreads() {
if (config.nthreads == 0 || config.nthreads > kMaxProcessingThreads ) {
return TextError("NThreads should between 1 and " + std::to_string(kMaxProcessingThreads));
......
......@@ -24,12 +24,14 @@ struct EventMonConfig {
RequestHandlerType mode = RequestHandlerType::kTcp;
std::string root_monitored_folder;
std::vector<std::string> monitored_subfolders;
std::vector<std::string> ignored_extentions;
std::vector<std::string> ignored_extensions;
std::vector<std::string> whitelisted_extensions;
bool remove_after_send = false;
SubSetMode subset_mode = SubSetMode::kNone;
uint64_t subset_batch_size = 1;
uint64_t subset_multisource_nsources = 1;
uint64_t subset_multisource_sourceid = 1;
std::string stream;
private:
std::string log_level_str;
std::string mode_str;
......
......@@ -19,7 +19,6 @@ class EventMonConfigFactory {
Error CheckSubsets();
Error CheckNThreads();
Error CheckConfig();
};
}
......
......@@ -15,8 +15,8 @@ inline bool ends_with(std::string const& value, std::string const& ending) {
}
bool FolderEventDetector::IgnoreFile(const std::string& file) {
for (auto& ext : config_->ignored_extentions) {
bool FileInList(const std::vector<std::string>& list, const std::string& file) {
for (auto& ext : list) {
if (ends_with(file, ext)) {
return true;
}
......@@ -24,6 +24,17 @@ bool FolderEventDetector::IgnoreFile(const std::string& file) {
return false;
}
bool FolderEventDetector::IgnoreFile(const std::string& file) {
return FileInList(config_->ignored_extensions, file);
}
bool FolderEventDetector::FileInWhiteList(const std::string& file) {
if (config_->whitelisted_extensions.empty()) {
return true;
}
return FileInList(config_->whitelisted_extensions, file);
}
Error FolderEventDetector::UpdateEventsBuffer() {
Error err;
......@@ -37,7 +48,7 @@ Error FolderEventDetector::UpdateEventsBuffer() {
}
for (auto& file : files) {
if (!IgnoreFile(file)) {
if (!IgnoreFile(file) && FileInWhiteList(file) ) {
events_buffer_.emplace_back(EventHeader{0, 0, file});
}
}
......
......@@ -25,6 +25,7 @@ class FolderEventDetector : public AbstractEventDetector {
Error UpdateEventsBuffer();
Error GetHeaderFromBuffer(EventHeader* event_header);
bool IgnoreFile(const std::string& event);
bool FileInWhiteList(const std::string& file);
bool BufferIsEmpty() const;
};
......
......@@ -39,7 +39,7 @@ std::unique_ptr<Producer> CreateProducer() {
Error err;
auto producer = Producer::Create(config->asapo_endpoint, (uint8_t) config->nthreads,
config->mode, asapo::SourceCredentials{config->beamtime_id, "", ""}, &err);
config->mode, asapo::SourceCredentials{config->beamtime_id, config->stream, ""}, &err);
if(err) {
std::cerr << "cannot create producer: " << err << std::endl;
exit(EXIT_FAILURE);
......@@ -57,6 +57,9 @@ void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) {
return;
}
auto config = GetEventMonConfig();
if (!config->remove_after_send) {
return;
}
std::string fname = config->root_monitored_folder + asapo::kPathSeparator + header.message;
auto error = io->RemoveFile(fname);
if (error) {
......
......@@ -49,6 +49,7 @@ Error SetFolderMonConfig (const EventMonConfig& config) {
config_string += "," + std::string("\"NThreads\":") + std::to_string(config.nthreads);
config_string += "," + std::string("\"LogLevel\":") + "\"" + log_level + "\"";
config_string += "," + std::string("\"RemoveAfterSend\":") + (config.remove_after_send ? "true" : "false");
config_string += "," + std::string("\"Stream\":") + "\"" + config.stream + "\"";
std::string subset_mode;
switch (config.subset_mode) {
......@@ -85,7 +86,7 @@ Error SetFolderMonConfig (const EventMonConfig& config) {
}
std::string ignored_exts;
for (auto ext : config.ignored_extentions) {
for (auto ext : config.ignored_extensions) {
ignored_exts += "\"" + ext + "\"" + ",";
}
if (ignored_exts.size()) {
......@@ -93,9 +94,19 @@ Error SetFolderMonConfig (const EventMonConfig& config) {
}
std::string whitelisted_exts;
for (auto ext : config.whitelisted_extensions) {
whitelisted_exts += "\"" + ext + "\"" + ",";
}
if (whitelisted_exts.size()) {
whitelisted_exts.pop_back();
}
config_string += "," + std::string("\"MonitoredSubFolders\":") + "[" + mon_folders + "]";
config_string += "," + std::string("\"RootMonitoredFolder\":") + "\"" + config.root_monitored_folder + "\"";
config_string += "," + std::string("\"IgnoreExtentions\":") + "[" + ignored_exts + "]";
config_string += "," + std::string("\"IgnoreExtensions\":") + "[" + ignored_exts + "]";
config_string += "," + std::string("\"WhitelistExtensions\":") + "[" + whitelisted_exts + "]";
config_string += "," + std::string("\"Tag\":") + "\"" + config.tag + "\"";
config_string += "," + std::string("\"AsapoEndpoint\":") + "\"" + config.asapo_endpoint + "\"";
......
......@@ -60,10 +60,13 @@ TEST_F(ConfigTests, ReadSettingsOK) {
test_config.mode = asapo::RequestHandlerType::kTcp;
test_config.root_monitored_folder = "tmp";
test_config.monitored_subfolders = {"test1", "test2"};
test_config.ignored_extentions = {"tmp", "test"};
test_config.ignored_extensions = {"tmp", "test"};
test_config.remove_after_send = true;
test_config.subset_mode = SubSetMode::kBatch;
test_config.subset_batch_size = 9;
test_config.stream = "stream";
test_config.whitelisted_extensions = {"bla"};
auto err = asapo::SetFolderMonConfig(test_config);
auto config = asapo::GetEventMonConfig();
......@@ -77,12 +80,25 @@ TEST_F(ConfigTests, ReadSettingsOK) {
ASSERT_THAT(config->mode, Eq(asapo::RequestHandlerType::kTcp));
ASSERT_THAT(config->monitored_subfolders, ElementsAre("test1", "test2"));
ASSERT_THAT(config->root_monitored_folder, Eq("tmp"));
ASSERT_THAT(config->ignored_extentions, ElementsAre("tmp", "test"));
ASSERT_THAT(config->ignored_extensions, ElementsAre("tmp", "test"));
ASSERT_THAT(config->remove_after_send, Eq(true));
ASSERT_THAT(config->subset_mode, Eq(SubSetMode::kBatch));
ASSERT_THAT(config->subset_batch_size, Eq(9));
ASSERT_THAT(config->stream, Eq("stream"));
}
TEST_F(ConfigTests, ReadSettingsWhiteListOK) {
asapo::EventMonConfig test_config;
test_config.whitelisted_extensions = {"tmp", "test"};
test_config.ignored_extensions = {};
auto err = asapo::SetFolderMonConfig(test_config);
auto config = asapo::GetEventMonConfig();
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(config->whitelisted_extensions, ElementsAre("tmp", "test"));
}
TEST_F(ConfigTests, ReadSettingsMultiSourceOK) {
......
......@@ -65,7 +65,7 @@ class FolderEventDetectorTests : public testing::Test {
}
void MockStartMonitoring();
void MockGetEvents();
void InitiateAndReadSingleEvent();
asapo::EventHeader InitiateAndReadSingleEvent();
};
void FolderEventDetectorTests::MockStartMonitoring() {
......@@ -153,13 +153,14 @@ void FolderEventDetectorTests::MockGetEvents() {
));
}
void FolderEventDetectorTests::InitiateAndReadSingleEvent() {
asapo::EventHeader FolderEventDetectorTests::InitiateAndReadSingleEvent() {
MockStartMonitoring();
MockGetEvents();
detector.StartMonitoring();
asapo::EventHeader event_header;
detector.GetNextEvent(&event_header);
Mock::VerifyAndClearExpectations(&mock_system_folder_watch);
return event_header;
};
......@@ -210,7 +211,7 @@ TEST_F(FolderEventDetectorTests, GetNextEventDoesSystemCallIfListEmpty) {
}
TEST_F(FolderEventDetectorTests, GetNextIgnoresTmpFiles) {
test_config.ignored_extentions = {"tmp"};
test_config.ignored_extensions = {"tmp"};
InitiateAndReadSingleEvent();
asapo::EventHeader event_header;
err = detector.GetNextEvent(&event_header);
......@@ -222,6 +223,13 @@ TEST_F(FolderEventDetectorTests, GetNextIgnoresTmpFiles) {
ASSERT_THAT(err, Eq(asapo::EventMonitorErrorTemplates::kNoNewEvent));
}
TEST_F(FolderEventDetectorTests, GetNextRespectsWhiteList) {
test_config.whitelisted_extensions = {"tmp"};
auto event_header = InitiateAndReadSingleEvent();
ASSERT_THAT(err, Eq(nullptr));
ASSERT_THAT(event_header.file_name, Eq("test3.tmp"));
}
}
......@@ -7,8 +7,10 @@
"LogLevel":"debug",
"RootMonitoredFolder":"@ROOT_PATH@test_in",
"MonitoredSubFolders":["test1"],
"IgnoreExtentions":["tmp"],
"IgnoreExtensions":["tmp"],
"WhitelistExtensions":[],
"RemoveAfterSend":true,
"Stream": "",
"Subset": {
"Mode":"none"
}
......
......@@ -7,8 +7,10 @@
"LogLevel":"debug",
"RootMonitoredFolder":"@ROOT_PATH@test_in",
"MonitoredSubFolders":["test1"],
"IgnoreExtentions":["tmp"],
"IgnoreExtensions":["tmp"],
"WhitelistExtensions":[],
"RemoveAfterSend":true,
"Stream": "",
"Subset": {
"Mode":"none"
}
......
......@@ -52,3 +52,7 @@ echo hello > /tmp/asapo/test_in/test1/file2
echo hello > /tmp/asapo/test_in/test2/file2
$2 ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 1000 1 | tee /dev/stderr | grep "Processed 3 file(s)"
test ! -f /tmp/asapo/test_in/test1/file1
test ! -f /tmp/asapo/test_in/test1/file2
test ! -f /tmp/asapo/test_in/test2/file2
......@@ -7,8 +7,10 @@
"LogLevel":"debug",
"RootMonitoredFolder":"@ROOT_PATH@test_in",
"MonitoredSubFolders":["test1","test2"],
"IgnoreExtentions":["tmp"],
"IgnoreExtensions":["tmp"],
"WhitelistExtensions":[],
"RemoveAfterSend":true,
"Stream": "",
"Subset": {
"Mode":"none"
}
......
......@@ -56,3 +56,7 @@ $2 ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 2000 1 1 > out
cat out
cat out | grep "Processed 1 dataset(s)"
cat out | grep "with 3 file(s)"
test -f /tmp/asapo/test_in/test1/file1
test -f /tmp/asapo/test_in/test1/file2
test -f /tmp/asapo/test_in/test2/file2
......@@ -7,8 +7,10 @@
"LogLevel":"debug",
"RootMonitoredFolder":"@ROOT_PATH@test_in",
"MonitoredSubFolders":["test1","test2"],
"IgnoreExtentions":["tmp"],
"RemoveAfterSend":true,
"IgnoreExtensions":["tmp"],
"WhitelistExtensions":[],
"RemoveAfterSend":false,
"Stream": "",
"Subset": {
"Mode":"batch",
"BatchSize":3
......
......@@ -7,8 +7,10 @@
"LogLevel":"debug",
"RootMonitoredFolder":"@ROOT_PATH@test_in",
"MonitoredSubFolders":["test@ID@"],
"IgnoreExtentions":["tmp"],
"IgnoreExtensions":["tmp"],
"WhitelistExtensions":[],
"RemoveAfterSend":true,
"Stream": "",
"Subset": {
"Mode":"multisource",
"SourceId":@ID@,
......
......@@ -7,8 +7,10 @@
"LogLevel":"debug",
"RootMonitoredFolder":"@ROOT_PATH@test_in",
"MonitoredSubFolders":["test1","test2"],
"IgnoreExtentions":["tmp"],
"IgnoreExtensions":["tmp"],
"WhitelistExtensions":[],
"RemoveAfterSend":true,
"Stream": "",
"Subset": {
"Mode":"none"
}
......
......@@ -7,8 +7,10 @@
"LogLevel":"debug",
"RootMonitoredFolder":"@ROOT_PATH@test_in",
"MonitoredSubFolders":["test1","test2"],
"IgnoreExtentions":["tmp"],
"IgnoreExtensions":["tmp"],
"WhitelistExtensions":[],
"RemoveAfterSend":true,
"Stream": "",
"Subset": {
"Mode":"none"
}
......
......@@ -7,8 +7,10 @@
"LogLevel":"debug",
"RootMonitoredFolder":"@ROOT_PATH@test_in",
"MonitoredSubFolders":["test1","test2"],
"IgnoreExtentions":["tmp"],
"IgnoreExtensions":["tmp"],
"WhitelistExtensions":[],
"RemoveAfterSend":true,
"Stream": "",
"Subset": {
"Mode":"none"
}
......
......@@ -44,15 +44,15 @@ producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
data = np.arange(10,dtype=np.float64)
#send data from array
err = producer.send_data(4, stream+"/"+"file5",data,
producer.send_data(4, stream+"/"+"file5",data,
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
#send data from string
err = producer.send_data(5, stream+"/"+"file6",b"hello",
producer.send_data(5, stream+"/"+"file6",b"hello",
ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
#send metadata only
err = producer.send_data(6, stream+"/"+"file7",None,
producer.send_data(6, stream+"/"+"file7",None,
ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
......@@ -67,6 +67,6 @@ else:
time.sleep(5)
time.sleep(10)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment