From 18573a7f8c439ca3791750ac3f2d2ba173e0b137 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 31 Jul 2019 12:06:21 +0200
Subject: [PATCH] filemon producer works with batch datasets

---
 .../src/eventmon_config.cpp                   | 35 +++++++++-
 .../src/eventmon_config.h                     |  7 ++
 .../src/eventmon_config_factory.h             |  1 +
 .../src/main_eventmon.cpp                     | 14 ++++
 .../unittests/mock_eventmon_config.cpp        | 19 +++++-
 .../unittests/test_eventmon_config.cpp        | 14 ++++
 .../producer_send_after_restart/test.json.in  |  6 +-
 .../bug_fixes/receiver_cpu_usage/test.json.in |  5 +-
 tests/automatic/full_chain/CMakeLists.txt     |  1 +
 .../simple_chain_filegen/test.json.in         |  6 +-
 .../CMakeLists.txt                            | 15 ++++
 .../check_linux.sh                            | 58 ++++++++++++++++
 .../check_windows.bat                         | 68 +++++++++++++++++++
 .../simple_chain_filegen_batches/test.json.in | 16 +++++
 .../test.json.in                              |  6 +-
 .../test.json.in                              |  6 +-
 .../file_monitor_producer/test.json.in        |  6 +-
 .../worker/worker_api/worker_api.cpp          |  8 +--
 ...apo-test_filemon_producer_tolocal.nomad.in | 11 ++-
 ...-test_filemon_producer_toreceiver.nomad.in | 11 ++-
 worker/api/cpp/src/server_data_broker.cpp     |  2 +-
 .../api/cpp/unittests/test_folder_broker.cpp  |  4 +-
 22 files changed, 300 insertions(+), 19 deletions(-)
 create mode 100644 tests/automatic/full_chain/simple_chain_filegen_batches/CMakeLists.txt
 create mode 100644 tests/automatic/full_chain/simple_chain_filegen_batches/check_linux.sh
 create mode 100644 tests/automatic/full_chain/simple_chain_filegen_batches/check_windows.bat
 create mode 100644 tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in

diff --git a/producer/event_monitor_producer/src/eventmon_config.cpp b/producer/event_monitor_producer/src/eventmon_config.cpp
index ca1a07c9e..bb2c5ce10 100644
--- a/producer/event_monitor_producer/src/eventmon_config.cpp
+++ b/producer/event_monitor_producer/src/eventmon_config.cpp
@@ -11,9 +11,25 @@ EventMonConfigFactory::EventMonConfigFactory() : io__{GenerateDefaultIO()} {
 
 }
 
+Error SubsetModeToEnum(const std::string& mode_str, SubSetMode* mode) {
+    if (mode_str == "batch") {
+        *mode = SubSetMode::kBatch;
+        return nullptr;
+    }
+
+    if (mode_str == "none") {
+        *mode = SubSetMode::kNone;
+        return nullptr;
+    }
+
+    return TextError("Wrone subset mode:" + mode_str);
+}
+
 Error EventMonConfigFactory::ParseConfigFile(std::string file_name) {
     JsonFileParser parser(file_name, &io__);
     Error err = nullptr;
+    std::string subset_mode;
+
     (err = parser.GetString("AsapoEndpoint", &config.asapo_endpoint)) ||
     (err = parser.GetString("Tag", &config.tag)) ||
     (err = parser.GetString("BeamtimeID", &config.beamtime_id)) ||
@@ -23,7 +39,16 @@ Error EventMonConfigFactory::ParseConfigFile(std::string file_name) {
     (err = parser.GetString("LogLevel", &config.log_level_str)) ||
     (err = parser.GetBool("RemoveAfterSend", &config.remove_after_send)) ||
     (err = parser.GetArrayString("MonitoredSubFolders", &config.monitored_subfolders)) ||
-    (err = parser.GetArrayString("IgnoreExtentions", &config.ignored_extentions));
+    (err = parser.GetArrayString("IgnoreExtentions", &config.ignored_extentions)) ||
+    (err = parser.Embedded("Subset").GetString("Mode", &subset_mode)) ||
+    (err = SubsetModeToEnum(subset_mode, &config.subset_mode));
+    if (err) {
+        return err;
+    }
+
+    if (config.subset_mode == SubSetMode::kBatch) {
+        err = parser.Embedded("Subset").GetUInt64("BatchSize", &config.subset_batch_size);
+    }
 
     return err;
 }
@@ -34,6 +59,8 @@ Error EventMonConfigFactory::CheckConfig() {
     (err = CheckMode()) ||
     (err = CheckLogLevel()) ||
     (err = CheckNThreads());
+    (err = CheckSubsets());
+
 //todo: check monitored folders exist?
     return err;
 }
@@ -73,7 +100,13 @@ Error EventMonConfigFactory::CheckNThreads() {
     return nullptr;
 }
 
+Error EventMonConfigFactory::CheckSubsets() {
+    if (config.subset_mode == SubSetMode::kBatch && config.subset_batch_size < 1) {
+        return  TextError("Batch size should > 1");
+    }
 
+    return nullptr;
+}
 
 const EventMonConfig*  GetEventMonConfig() {
     return &config;
diff --git a/producer/event_monitor_producer/src/eventmon_config.h b/producer/event_monitor_producer/src/eventmon_config.h
index 494eae438..c48ce487c 100644
--- a/producer/event_monitor_producer/src/eventmon_config.h
+++ b/producer/event_monitor_producer/src/eventmon_config.h
@@ -9,6 +9,11 @@
 
 namespace asapo {
 
+enum class SubSetMode {
+    kNone,
+    kBatch
+};
+
 struct EventMonConfig {
     std::string asapo_endpoint;
     LogLevel log_level = LogLevel::Info;
@@ -20,6 +25,8 @@ struct EventMonConfig {
     std::vector<std::string> monitored_subfolders;
     std::vector<std::string> ignored_extentions;
     bool remove_after_send = false;
+    SubSetMode subset_mode;
+    uint64_t subset_batch_size = 1;
   private:
     std::string log_level_str;
     std::string mode_str;
diff --git a/producer/event_monitor_producer/src/eventmon_config_factory.h b/producer/event_monitor_producer/src/eventmon_config_factory.h
index eb3735158..933feb2b9 100644
--- a/producer/event_monitor_producer/src/eventmon_config_factory.h
+++ b/producer/event_monitor_producer/src/eventmon_config_factory.h
@@ -16,6 +16,7 @@ class EventMonConfigFactory {
     Error ParseConfigFile(std::string file_name);
     Error CheckMode();
     Error CheckLogLevel();
+    Error CheckSubsets();
     Error CheckNThreads();
     Error CheckConfig();
 
diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp
index 31268ff43..4d64a9f1c 100644
--- a/producer/event_monitor_producer/src/main_eventmon.cpp
+++ b/producer/event_monitor_producer/src/main_eventmon.cpp
@@ -73,6 +73,19 @@ void SignalHandler(int signal) {
 }
 
 
+void HandleSubsets(asapo::EventHeader* header) {
+    switch (GetEventMonConfig()->subset_mode) {
+    case asapo::SubSetMode::kNone:
+        return;
+    case asapo::SubSetMode::kBatch:
+        header->subset_size = GetEventMonConfig()->subset_batch_size;
+        header->subset_id = (header->file_id - 1) / header->subset_size + 1;
+        break;
+    }
+
+
+}
+
 int main (int argc, char* argv[]) {
     asapo::ExitAfterPrintVersionIfNeeded("ASAPO Event Monitor", argc, argv);
 
@@ -117,6 +130,7 @@ int main (int argc, char* argv[]) {
             continue;
         }
         event_header.file_id = ++i;
+        HandleSubsets(&event_header);
         producer->SendFile(event_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator +
                            event_header.file_name, ProcessAfterSend);
     }
diff --git a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp
index 6cabb6da0..966d185e5 100644
--- a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp
+++ b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp
@@ -50,6 +50,24 @@ Error SetFolderMonConfig (const EventMonConfig& config) {
     config_string += "," + std::string("\"LogLevel\":") + "\"" + log_level + "\"";
     config_string += "," + std::string("\"RemoveAfterSend\":") + (config.remove_after_send ? "true" : "false");
 
+    if (config.subset_mode != SubSetMode::kNone) {
+        std::string subset_mode;
+        switch (config.subset_mode) {
+        case SubSetMode::kBatch:
+            subset_mode = "batch";
+            break;
+        case SubSetMode::kNone:
+            subset_mode = "none";
+            break;
+        }
+        config_string += "," + std::string("\"Subset\":{");
+        config_string += std::string("\"Mode\":") + "\"" + subset_mode + "\"";
+        if (config.subset_mode == SubSetMode::kBatch) {
+            config_string += "," + std::string("\"BatchSize\":") + std::to_string(config.subset_batch_size);
+        }
+        config_string += "}";
+    }
+
     std::string mon_folders;
     for (auto folder : config.monitored_subfolders) {
         mon_folders += "\"" + folder + "\"" + ",";
@@ -75,7 +93,6 @@ Error SetFolderMonConfig (const EventMonConfig& config) {
 
     config_string += "}";
 
-
     EXPECT_CALL(mock_io, ReadFileToString_t("fname", _)).WillOnce(
         testing::Return(config_string)
     );
diff --git a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp
index 71a392883..4d433a054 100644
--- a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp
+++ b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp
@@ -31,6 +31,8 @@ using ::asapo::MockIO;
 using ::asapo::EventMonConfigFactory;
 using asapo::EventMonConfig;
 
+using asapo::SubSetMode;
+
 namespace {
 
 
@@ -60,6 +62,8 @@ TEST_F(ConfigTests, ReadSettingsOK) {
     test_config.monitored_subfolders = {"test1", "test2"};
     test_config.ignored_extentions = {"tmp", "test"};
     test_config.remove_after_send = true;
+    test_config.subset_mode = SubSetMode::kBatch;
+    test_config.subset_batch_size = 9;
     auto err = asapo::SetFolderMonConfig(test_config);
 
     auto config = asapo::GetEventMonConfig();
@@ -75,6 +79,9 @@ TEST_F(ConfigTests, ReadSettingsOK) {
     ASSERT_THAT(config->root_monitored_folder, Eq("tmp"));
     ASSERT_THAT(config->ignored_extentions, ElementsAre("tmp", "test"));
     ASSERT_THAT(config->remove_after_send, Eq(true));
+    ASSERT_THAT(config->subset_mode, Eq(SubSetMode::kBatch));
+    ASSERT_THAT(config->subset_batch_size, Eq(9));
+
 
 }
 
@@ -92,6 +99,13 @@ TEST_F(ConfigTests, ReadSettingsChecksNthreads) {
 
 }
 
+TEST_F(ConfigTests, ReadSettingsChecksSubsets) {
+    asapo::EventMonConfig test_config;
+    test_config.subset_batch_size = 0;
+
+    auto err = asapo::SetFolderMonConfig(test_config);
+    ASSERT_THAT(err, Ne(nullptr));
+}
 
 TEST_F(ConfigTests, ReadSettingsChecksMode) {
     asapo::EventMonConfig test_config;
diff --git a/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in
index 22b699c17..995984533 100644
--- a/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in
+++ b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in
@@ -8,5 +8,9 @@
  "RootMonitoredFolder":"@ROOT_PATH@test_in",
  "MonitoredSubFolders":["test1"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+ "Subset": {
+  	"Mode":"none"
+ }
+
 }
diff --git a/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in
index 22b699c17..4b7d3fc60 100644
--- a/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in
+++ b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in
@@ -8,5 +8,8 @@
  "RootMonitoredFolder":"@ROOT_PATH@test_in",
  "MonitoredSubFolders":["test1"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+ "Subset": {
+  	"Mode":"none"
+ }
 }
diff --git a/tests/automatic/full_chain/CMakeLists.txt b/tests/automatic/full_chain/CMakeLists.txt
index 0424006c8..45e1af265 100644
--- a/tests/automatic/full_chain/CMakeLists.txt
+++ b/tests/automatic/full_chain/CMakeLists.txt
@@ -5,6 +5,7 @@ endif()
 add_subdirectory(simple_chain_metadata)
 add_subdirectory(two_beamlines)
 add_subdirectory(simple_chain_filegen)
+add_subdirectory(simple_chain_filegen_batches)
 add_subdirectory(simple_chain_filegen_readdata_cache)
 add_subdirectory(simple_chain_filegen_readdata_file)
 add_subdirectory(simple_chain_dataset)
\ No newline at end of file
diff --git a/tests/automatic/full_chain/simple_chain_filegen/test.json.in b/tests/automatic/full_chain/simple_chain_filegen/test.json.in
index f072140a4..758d184a8 100644
--- a/tests/automatic/full_chain/simple_chain_filegen/test.json.in
+++ b/tests/automatic/full_chain/simple_chain_filegen/test.json.in
@@ -8,5 +8,9 @@
  "RootMonitoredFolder":"@ROOT_PATH@test_in",
  "MonitoredSubFolders":["test1","test2"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+  "Subset": {
+   	"Mode":"none"
+  }
+
 }
diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/CMakeLists.txt b/tests/automatic/full_chain/simple_chain_filegen_batches/CMakeLists.txt
new file mode 100644
index 000000000..c319a11ec
--- /dev/null
+++ b/tests/automatic/full_chain/simple_chain_filegen_batches/CMakeLists.txt
@@ -0,0 +1,15 @@
+set(TARGET_NAME full_chain_simple_chain_filegen_producer_batches)
+
+################################
+# Testing
+################################
+prepare_asapo()
+if (UNIX)
+    set (ROOT_PATH "/tmp/asapo/")
+else()
+    set (ROOT_PATH "c:\\\\tmp\\\\asapo\\\\")
+endif()
+
+configure_file(test.json.in test.json @ONLY)
+
+add_script_test("${TARGET_NAME}" "$<TARGET_FILE:event-monitor-producer-bin> $<TARGET_FILE:getnext_broker> $<TARGET_PROPERTY:asapo,EXENAME>" nomem)
diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/check_linux.sh b/tests/automatic/full_chain/simple_chain_filegen_batches/check_linux.sh
new file mode 100644
index 000000000..8e2be578c
--- /dev/null
+++ b/tests/automatic/full_chain/simple_chain_filegen_batches/check_linux.sh
@@ -0,0 +1,58 @@
+#!/usr/bin/env bash
+
+#set -e
+
+trap Cleanup EXIT
+
+beamtime_id=asapo_test
+token=`$3 token -secret broker_secret.key $beamtime_id`
+
+monitor_database_name=db_test
+proxy_address=127.0.0.1:8400
+
+beamline=test
+receiver_root_folder=/tmp/asapo/receiver/files
+receiver_folder=${receiver_root_folder}/${beamline}/${beamtime_id}
+
+mkdir -p /tmp/asapo/test_in/test1/
+mkdir -p /tmp/asapo/test_in/test2/
+
+Cleanup() {
+    echo cleanup
+    kill $producerid
+    rm -rf /tmp/asapo/test_in/test1
+    rm -rf /tmp/asapo/test_in/test2
+    nomad stop nginx
+    nomad stop receiver
+    nomad stop discovery
+    nomad stop broker
+    nomad stop authorizer
+    echo "db.dropDatabase()" | mongo ${beamtime_id}
+    rm -rf out
+}
+
+echo "db.${beamtime_id}.insert({dummy:1})" | mongo ${beamtime_id}
+
+nomad run nginx.nmd
+nomad run authorizer.nmd
+nomad run receiver.nmd
+nomad run discovery.nmd
+nomad run broker.nmd
+
+sleep 1
+
+#producer
+mkdir -p ${receiver_folder}
+$1 test.json &
+producerid=`echo $!`
+
+sleep 1
+
+echo hello > /tmp/asapo/test_in/test1/file1
+echo hello > /tmp/asapo/test_in/test1/file2
+echo hello > /tmp/asapo/test_in/test2/file2
+
+$2 ${proxy_address} ${receiver_folder} ${beamtime_id} 2 $token 2000 1 1 > out
+cat out
+cat out   | grep "Processed 1 dataset(s)"
+cat out   | grep "with 3 file(s)"
diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/check_windows.bat b/tests/automatic/full_chain/simple_chain_filegen_batches/check_windows.bat
new file mode 100644
index 000000000..41a436e0f
--- /dev/null
+++ b/tests/automatic/full_chain/simple_chain_filegen_batches/check_windows.bat
@@ -0,0 +1,68 @@
+
+
+SET mongo_exe="c:\Program Files\MongoDB\Server\3.6\bin\mongo.exe"
+SET beamtime_id=asapo_test
+SET beamline=test
+SET receiver_root_folder=c:\tmp\asapo\receiver\files
+SET receiver_folder="%receiver_root_folder%\%beamline%\%beamtime_id%"
+
+set producer_short_name="%~nx1"
+
+
+"%3" token -secret broker_secret.key %beamtime_id% > token
+set /P token=< token
+
+set proxy_address="127.0.0.1:8400"
+
+echo db.%beamtime_id%.insert({dummy:1}) | %mongo_exe% %beamtime_id%
+
+c:\opt\consul\nomad run receiver.nmd
+c:\opt\consul\nomad run authorizer.nmd
+c:\opt\consul\nomad run discovery.nmd
+c:\opt\consul\nomad run broker.nmd
+c:\opt\consul\nomad run nginx.nmd
+
+ping 1.0.0.0 -n 10 -w 100 > nul
+
+REM producer
+mkdir %receiver_folder%
+mkdir  c:\tmp\asapo\test_in\test1
+mkdir  c:\tmp\asapo\test_in\test2
+start /B "" "%1" test.json
+
+ping 1.0.0.0 -n 3 -w 100 > nul
+
+echo hello > c:\tmp\asapo\test_in\test1\file1
+echo hello > c:\tmp\asapo\test_in\test1\file2
+echo hello > c:\tmp\asapo\test_in\test2\file2
+
+ping 1.0.0.0 -n 10 -w 100 > nul
+
+
+REM worker
+"%2" %proxy_address% %receiver_folder% %beamtime_id% 2 %token% 1000 1 1 > out.txt
+type out.txt
+findstr /i /l /c:"Processed 1 dataset(s)"  out.txt || goto :error
+findstr /i /l /c:"with 3 file(s)"  out.txt || goto :error
+
+goto :clean
+
+:error
+call :clean
+exit /b 1
+
+:clean
+c:\opt\consul\nomad stop receiver
+c:\opt\consul\nomad stop discovery
+c:\opt\consul\nomad stop broker
+c:\opt\consul\nomad stop authorizer
+c:\opt\consul\nomad stop nginx
+rmdir /S /Q %receiver_root_folder%
+rmdir /S /Q c:\tmp\asapo\test_in\test1
+rmdir /S /Q c:\tmp\asapo\test_in\test2
+Taskkill /IM "%producer_short_name%" /F
+
+del /f token
+echo db.dropDatabase() | %mongo_exe% %beamtime_id%
+
+
diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in
new file mode 100644
index 000000000..3f25b85fe
--- /dev/null
+++ b/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in
@@ -0,0 +1,16 @@
+{
+ "AsapoEndpoint":"localhost:8400",
+ "Tag":"test_tag",
+ "BeamtimeID":"asapo_test",
+ "Mode":"tcp",
+ "NThreads":1,
+ "LogLevel":"debug",
+ "RootMonitoredFolder":"@ROOT_PATH@test_in",
+ "MonitoredSubFolders":["test1","test2"],
+ "IgnoreExtentions":["tmp"],
+ "RemoveAfterSend":true,
+ "Subset": {
+ 	"Mode":"batch",
+  	"BatchSize":3
+ }
+}
diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in
index f072140a4..b7b1f6760 100644
--- a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in
+++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in
@@ -8,5 +8,9 @@
  "RootMonitoredFolder":"@ROOT_PATH@test_in",
  "MonitoredSubFolders":["test1","test2"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+ "Subset": {
+  	"Mode":"none"
+ }
+
 }
diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in
index f072140a4..b7b1f6760 100644
--- a/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in
+++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in
@@ -8,5 +8,9 @@
  "RootMonitoredFolder":"@ROOT_PATH@test_in",
  "MonitoredSubFolders":["test1","test2"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+ "Subset": {
+  	"Mode":"none"
+ }
+
 }
diff --git a/tests/automatic/producer/file_monitor_producer/test.json.in b/tests/automatic/producer/file_monitor_producer/test.json.in
index ca77f2d90..088681a0d 100644
--- a/tests/automatic/producer/file_monitor_producer/test.json.in
+++ b/tests/automatic/producer/file_monitor_producer/test.json.in
@@ -8,5 +8,9 @@
  "RootMonitoredFolder":"@ROOT_PATH@test_in",
  "MonitoredSubFolders":["test1","test2"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+ "Subset": {
+  	"Mode":"none"
+ }
+
 }
diff --git a/tests/automatic/worker/worker_api/worker_api.cpp b/tests/automatic/worker/worker_api/worker_api.cpp
index 3d873d266..e2f56e101 100644
--- a/tests/automatic/worker/worker_api/worker_api.cpp
+++ b/tests/automatic/worker/worker_api/worker_api.cpp
@@ -43,9 +43,9 @@ void TestSingle(const std::unique_ptr<asapo::DataBroker>& broker, const std::str
     M_AssertTrue(fi.metadata == "{\"test\":10}", "GetNext metadata");
 
     asapo::FileData data;
-    err = broker->RetrieveData(&fi,&data);
+    err = broker->RetrieveData(&fi, &data);
     M_AssertTrue(err == nullptr, "RetrieveData no error");
-    M_AssertEq("hello1",std::string(data.get(),data.get()+fi.size));
+    M_AssertEq("hello1", std::string(data.get(), data.get() + fi.size));
 
 
     err = broker->GetLast(&fi, group_id, nullptr);
@@ -122,9 +122,9 @@ void TestDataset(const std::unique_ptr<asapo::DataBroker>& broker, const std::st
     M_AssertTrue(dataset.content[0].metadata == "{\"test\":10}", "GetNext metadata");
 
     asapo::FileData data;
-    err = broker->RetrieveData(&dataset.content[0],&data);
+    err = broker->RetrieveData(&dataset.content[0], &data);
     M_AssertTrue(err == nullptr, "RetrieveData no error");
-    M_AssertEq("hello1",std::string(data.get(),data.get()+dataset.content[0].size));
+    M_AssertEq("hello1", std::string(data.get(), data.get() + dataset.content[0].size));
 
 
     dataset = broker->GetLastDataset(group_id, &err);
diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in
index ed3c4f477..e40fe1738 100644
--- a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in
+++ b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in
@@ -39,7 +39,10 @@ job "asapo-produceronly" {
  "RootMonitoredFolder":"c:\\tmp\\asapo\\test_in",
  "MonitoredSubFolders":["test_folder"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+  "Subset": {
+   	"Mode":"none"
+  }
 }
         EOH
         destination = "local/test.json"
@@ -95,7 +98,11 @@ job "asapo-produceronly" {
  "RootMonitoredFolder":"/tmp/asapo/test_in",
  "MonitoredSubFolders":["test_folder"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+ "Subset": {
+  	"Mode":"none"
+ }
+
 }
         EOH
         destination = "local/test.json"
diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad.in
index a716b6384..b9c374bcd 100644
--- a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad.in
+++ b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_toreceiver.nomad.in
@@ -39,7 +39,10 @@ job "asapo-filemon-producer" {
  "RootMonitoredFolder":"u:\\asapo",
  "MonitoredSubFolders":["test_folder"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+ "Subset": {
+  	"Mode":"none"
+ }
 }
         EOH
         destination = "local/test.json"
@@ -95,7 +98,11 @@ job "asapo-filemon-producer" {
  "RootMonitoredFolder":"/run/user",
  "MonitoredSubFolders":["data"],
  "IgnoreExtentions":["tmp"],
- "RemoveAfterSend":true
+ "RemoveAfterSend":true,
+ "Subset": {
+  	"Mode":"none"
+ }
+
 }
         EOH
         destination = "local/test.json"
diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp
index 4b69c6cbf..8afacb830 100644
--- a/worker/api/cpp/src/server_data_broker.cpp
+++ b/worker/api/cpp/src/server_data_broker.cpp
@@ -215,7 +215,7 @@ Error ServerDataBroker::GetDataIfNeeded(FileInfo* info, FileData* data) {
         return nullptr;
     }
 
-    return RetrieveData(info,data);
+    return RetrieveData(info, data);
 
 }
 
diff --git a/worker/api/cpp/unittests/test_folder_broker.cpp b/worker/api/cpp/unittests/test_folder_broker.cpp
index 668e53dbb..d72d46166 100644
--- a/worker/api/cpp/unittests/test_folder_broker.cpp
+++ b/worker/api/cpp/unittests/test_folder_broker.cpp
@@ -284,8 +284,8 @@ TEST_F(GetDataFromFileTests, RetrieveDataCallsReadsFile) {
     fi.name = "test";
 
 
-    EXPECT_CALL(mock, GetDataFromFile_t(expected_base_path+asapo::kPathSeparator+"test", _, _)).
-        WillOnce(DoAll(testing::SetArgPointee<2>(nullptr), testing::Return(new uint8_t[1] {'1'})));
+    EXPECT_CALL(mock, GetDataFromFile_t(expected_base_path + asapo::kPathSeparator + "test", _, _)).
+    WillOnce(DoAll(testing::SetArgPointee<2>(nullptr), testing::Return(new uint8_t[1] {'1'})));
 
     auto err = data_broker->RetrieveData(&fi, &data);
 
-- 
GitLab