From 26c9adbbcc2dedeb809b45b187af6464370c48ad Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Tue, 22 Dec 2020 21:16:06 +0100
Subject: [PATCH] renamed stuff to dataset_substream

---
 .../include/rapidjson/internal/regex.h        |  2 +-
 .../include/spdlog/fmt/bundled/format.h       |  2 +-
 CHANGELOG.md                                  |  3 +-
 .../cpp/include/asapo/common/data_structs.h   |  1 +
 common/cpp/include/asapo/common/networking.h  |  2 +-
 common/cpp/include/asapo/database/database.h  |  4 +-
 .../include/asapo/unittests/MockDatabase.h    |  8 ++--
 common/cpp/src/data_structs/data_structs.cpp  |  5 ++-
 common/cpp/src/database/mongodb_client.cpp    | 13 +++---
 common/cpp/src/database/mongodb_client.h      |  2 +-
 .../data_structs/test_data_structs.cpp        |  7 +++-
 .../api/cpp/include/asapo/consumer/consumer.h |  2 +-
 .../pipeline/in_to_out_python/in_to_out.py    |  2 +-
 .../dummy_data_producer.cpp                   |  4 +-
 .../api/cpp/include/asapo/producer/common.h   | 12 +++---
 .../api/cpp/include/asapo/producer/producer.h |  4 +-
 producer/api/cpp/src/producer_impl.cpp        | 18 ++++-----
 producer/api/cpp/src/producer_impl.h          |  4 +-
 producer/api/cpp/src/producer_request.cpp     |  2 +-
 .../api/cpp/unittests/test_producer_impl.cpp  | 40 +++++++++----------
 .../unittests/test_request_handler_tcp.cpp    | 18 ++++-----
 producer/api/python/asapo_producer.pxd        |  6 +--
 producer/api/python/asapo_producer.pyx.in     | 40 +++++++++----------
 .../src/eventmon_config.cpp                   | 34 ++++++++--------
 .../src/eventmon_config.h                     | 10 ++---
 .../src/eventmon_config_factory.h             |  2 +-
 .../src/main_eventmon.cpp                     | 24 +++++------
 .../unittests/mock_eventmon_config.cpp        | 30 +++++++-------
 .../unittests/test_eventmon_config.cpp        | 36 ++++++++---------
 .../src/request_handler/request_factory.cpp   |  2 +-
 .../request_handler_db_check_request.cpp      |  2 +-
 .../request_handler_db_write.cpp              | 11 +++--
 .../request_handler/test_request_factory.cpp  |  4 +-
 .../test_request_handler_db_check_request.cpp | 12 +++---
 .../test_request_handler_db_writer.cpp        | 26 +++++++-----
 .../bugfix_callback.py                        |  2 +-
 .../producer_send_after_restart/test.json.in  |  2 +-
 .../bug_fixes/receiver_cpu_usage/test.json.in |  2 +-
 .../send_recv_streams.py                      |  2 +-
 .../simple_chain_filegen/test.json.in         |  2 +-
 .../simple_chain_filegen_batches/test.json.in |  2 +-
 .../test.json.in                              |  2 +-
 .../test.json.in                              |  2 +-
 .../test.json.in                              |  2 +-
 .../insert_retrieve_dataset_mongodb.cpp       | 16 ++++----
 .../file_monitor_producer/test.json.in        |  2 +-
 .../producer/python_api/producer_api.py       | 26 ++++++------
 tests/manual/python_tests/producer/test.py    | 16 ++++----
 .../producer_wait_bug_mongo/test.py           | 16 ++++----
 .../producer_wait_threads/producer_api.py     | 12 +++---
 .../producer_wait_threads/test.py             | 12 +++---
 .../asapo-test_filemon.nomad.in               |  4 +-
 .../asapo-test_filemon_batch.nomad.in         |  4 +-
 .../asapo-test_filemon_multisource.nomad.in   |  4 +-
 ...apo-test_filemon_producer_tolocal.nomad.in |  4 +-
 55 files changed, 267 insertions(+), 261 deletions(-)

diff --git a/3d_party/rapidjson/include/rapidjson/internal/regex.h b/3d_party/rapidjson/include/rapidjson/internal/regex.h
index 02272d2af..79ec11b2b 100644
--- a/3d_party/rapidjson/include/rapidjson/internal/regex.h
+++ b/3d_party/rapidjson/include/rapidjson/internal/regex.h
@@ -50,7 +50,7 @@ RAPIDJSON_DIAG_OFF(switch - enum)
                                                    0);  //!< Represents an invalid index in GenericRegex::State::out, out1
     static const SizeType kRegexInvalidRange = ~SizeType(0);
 
-//! Regular expression engine with subset of ECMAscript grammar.
+//! Regular expression engine with dataset of ECMAscript grammar.
     /*!
         Supported regular expression syntax:
         - \c ab     Concatenation
diff --git a/3d_party/spd_log/include/spdlog/fmt/bundled/format.h b/3d_party/spd_log/include/spdlog/fmt/bundled/format.h
index b23ff8b45..3462998c3 100644
--- a/3d_party/spd_log/include/spdlog/fmt/bundled/format.h
+++ b/3d_party/spd_log/include/spdlog/fmt/bundled/format.h
@@ -680,7 +680,7 @@ inline T* make_ptr(T* ptr, std::size_t) {
 
 /**
   \rst
-  A buffer supporting a subset of ``std::vector``'s operations.
+  A buffer supporting a dataset of ``std::vector``'s operations.
   \endrst
  */
 template <typename T>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8e1ec9010..fb5002215 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -21,7 +21,8 @@ BREAKING CHANGES
 * use term `message` for blob of information we send around, rename related structs, parameters, ...  
     ####  renaming - Producer API
 * SendData/send_data -> Send/send    
-* SendFile/send_file -> SendFromFile/send_from_file
+* id_in_subset -> dataset_substream
+* subset_size -> dataset_size (and in general replace subset with dataset)
     ####  renaming - Consumer API
 * broker -> consumer
 
diff --git a/common/cpp/include/asapo/common/data_structs.h b/common/cpp/include/asapo/common/data_structs.h
index 457e4756f..ffbc2d681 100644
--- a/common/cpp/include/asapo/common/data_structs.h
+++ b/common/cpp/include/asapo/common/data_structs.h
@@ -31,6 +31,7 @@ class MessageMeta {
     std::string source;
     std::string metadata;
     uint64_t buf_id{0};
+    uint64_t dataset_substream{0};
     std::string Json() const;
     bool SetFromJson(const std::string& json_string);
     std::string FullName(const std::string& base_path) const;
diff --git a/common/cpp/include/asapo/common/networking.h b/common/cpp/include/asapo/common/networking.h
index 7a479c2a7..bd7f5379f 100644
--- a/common/cpp/include/asapo/common/networking.h
+++ b/common/cpp/include/asapo/common/networking.h
@@ -21,7 +21,7 @@ enum class NetworkConnectionType : uint32_t {
 enum Opcode : uint8_t {
     kOpcodeUnknownOp = 1,
     kOpcodeTransferData,
-    kOpcodeTransferSubsetData,
+    kOpcodeTransferDatasetData,
     kOpcodeStreamInfo,
     kOpcodeLastStream,
     kOpcodeGetBufferData,
diff --git a/common/cpp/include/asapo/database/database.h b/common/cpp/include/asapo/database/database.h
index 289a4c8c0..d36b95322 100644
--- a/common/cpp/include/asapo/database/database.h
+++ b/common/cpp/include/asapo/database/database.h
@@ -17,8 +17,8 @@ class Database {
     virtual Error Connect(const std::string& address, const std::string& database) = 0;
     virtual Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates) const = 0;
     virtual Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const = 0;
-    virtual Error InsertAsSubset(const std::string& collection, const MessageMeta& file, uint64_t subset_id,
-                                 uint64_t subset_size,
+    virtual Error InsertAsDatasetMessage(const std::string& collection, const MessageMeta& file,
+                                 uint64_t dataset_size,
                                  bool ignore_duplicates) const = 0;
 
     virtual Error GetById(const std::string& collection, uint64_t id, MessageMeta* file) const = 0;
diff --git a/common/cpp/include/asapo/unittests/MockDatabase.h b/common/cpp/include/asapo/unittests/MockDatabase.h
index f3c47933d..691e39af8 100644
--- a/common/cpp/include/asapo/unittests/MockDatabase.h
+++ b/common/cpp/include/asapo/unittests/MockDatabase.h
@@ -19,9 +19,9 @@ class MockDatabase : public Database {
         return Error{Insert_t(collection, file, ignore_duplicates)};
     }
 
-    Error InsertAsSubset(const std::string& collection, const MessageMeta& file, uint64_t subset_id,
-                         uint64_t subset_size, bool ignore_duplicates) const override {
-        return Error{InsertAsSubset_t(collection, file, subset_id, subset_size, ignore_duplicates)};
+    Error InsertAsDatasetMessage(const std::string& collection, const MessageMeta& file,
+                         uint64_t dataset_size, bool ignore_duplicates) const override {
+        return Error{InsertAsDatasetMessage_t(collection, file, dataset_size, ignore_duplicates)};
     }
 
 
@@ -29,7 +29,7 @@ class MockDatabase : public Database {
     MOCK_CONST_METHOD3(Insert_t, ErrorInterface * (const std::string&, const MessageMeta&, bool));
 
 
-    MOCK_CONST_METHOD5(InsertAsSubset_t, ErrorInterface * (const std::string&, const MessageMeta&, uint64_t, uint64_t, bool));
+    MOCK_CONST_METHOD4(InsertAsDatasetMessage_t, ErrorInterface * (const std::string&, const MessageMeta&, uint64_t, bool));
 
 
     Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const override {
diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp
index 2c9b7deed..fc7882c29 100644
--- a/common/cpp/src/data_structs/data_structs.cpp
+++ b/common/cpp/src/data_structs/data_structs.cpp
@@ -62,8 +62,8 @@ std::string MessageMeta::Json() const {
                                                                                                                 "\"timestamp\":"
         + std::to_string(nanoseconds_from_epoch) + ","
                                                    "\"source\":\"" + source + "\","
-                                                                              "\"buf_id\":" + std::to_string(buf_id_int)
-        + ","
+                                                                              "\"buf_id\":" + std::to_string(buf_id_int) + ","
+         "\"dataset_substream\":" + std::to_string(dataset_substream) + ","
           "\"meta\":" + (metadata.size() == 0 ? std::string("{}") : metadata)
         + "}";
     return s;
@@ -119,6 +119,7 @@ bool MessageMeta::SetFromJson(const std::string &json_string) {
         parser.GetString("name", &name) ||
         parser.GetString("source", &source) ||
         parser.GetUInt64("buf_id", &buf_id) ||
+        parser.GetUInt64("dataset_substream", &dataset_substream) ||
         parser.Embedded("meta").GetRawString(&metadata) ||
         !TimeFromJson(parser, "timestamp", &timestamp)) {
         *this = old;
diff --git a/common/cpp/src/database/mongodb_client.cpp b/common/cpp/src/database/mongodb_client.cpp
index 5abe197d2..442d20b1d 100644
--- a/common/cpp/src/database/mongodb_client.cpp
+++ b/common/cpp/src/database/mongodb_client.cpp
@@ -237,9 +237,8 @@ Error MongoDBClient::AddBsonDocumentToArray(bson_t* query, bson_t* update, bool
     return err;
 }
 
-Error MongoDBClient::InsertAsSubset(const std::string &collection, const MessageMeta &file,
-                                    uint64_t subset_id,
-                                    uint64_t subset_size,
+Error MongoDBClient::InsertAsDatasetMessage(const std::string &collection, const MessageMeta &file,
+                                    uint64_t dataset_size,
                                     bool ignore_duplicates) const {
     if (!connected_) {
         return DBErrorTemplates::kNotConnected.Generate();
@@ -252,10 +251,10 @@ Error MongoDBClient::InsertAsSubset(const std::string &collection, const Message
     if (err) {
         return err;
     }
-    auto query = BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(subset_id), "}", "{", "messages._id", "{", "$ne",
-                           BCON_INT64(file.id), "}", "}", "]");
+    auto query = BCON_NEW ("$and", "[", "{", "_id", BCON_INT64(file.id), "}", "{", "messages.dataset_substream", "{", "$ne",
+                           BCON_INT64(file.dataset_substream), "}", "}", "]");
     auto update = BCON_NEW ("$setOnInsert", "{",
-                            "size", BCON_INT64(subset_size),
+                            "size", BCON_INT64(dataset_size),
                             "timestamp", BCON_INT64((int64_t) NanosecsEpochFromTimePoint(file.timestamp)),
                             "}",
                             "$addToSet", "{",
@@ -348,7 +347,7 @@ Error MongoDBClient::GetDataSetById(const std::string &collection, uint64_t id_i
     }
 
     for (const auto &message_meta : dataset.content) {
-        if (message_meta.id == id_in_set) {
+        if (message_meta.dataset_substream == id_in_set) {
             *file = message_meta;
             return nullptr;
         }
diff --git a/common/cpp/src/database/mongodb_client.h b/common/cpp/src/database/mongodb_client.h
index ad7801117..a1b9bb5ef 100644
--- a/common/cpp/src/database/mongodb_client.h
+++ b/common/cpp/src/database/mongodb_client.h
@@ -44,7 +44,7 @@ class MongoDBClient final : public Database {
     MongoDBClient();
     Error Connect(const std::string& address, const std::string& database) override;
     Error Insert(const std::string& collection, const MessageMeta& file, bool ignore_duplicates) const override;
-    Error InsertAsSubset(const std::string& collection, const MessageMeta& file, uint64_t subset_id, uint64_t subset_size,
+    Error InsertAsDatasetMessage(const std::string& collection, const MessageMeta& file, uint64_t dataset_size,
                          bool ignore_duplicates) const override;
     Error Upsert(const std::string& collection, uint64_t id, const uint8_t* data, uint64_t size) const override;
     Error GetById(const std::string& collection, uint64_t id, MessageMeta* file) const override;
diff --git a/common/cpp/unittests/data_structs/test_data_structs.cpp b/common/cpp/unittests/data_structs/test_data_structs.cpp
index 49ba236df..c801a2324 100644
--- a/common/cpp/unittests/data_structs/test_data_structs.cpp
+++ b/common/cpp/unittests/data_structs/test_data_structs.cpp
@@ -29,6 +29,7 @@ MessageMeta PrepareMessageMeta() {
     MessageMeta message_meta;
     message_meta.size = 100;
     message_meta.id = 1;
+    message_meta.dataset_substream = 3;
     message_meta.name = std::string("folder") + asapo::kPathSeparator + "test";
     message_meta.source = "host:1234";
     message_meta.buf_id = big_uint;
@@ -42,6 +43,7 @@ TEST(MessageMetaTests, Defaults) {
 
     ASSERT_THAT(message_meta.buf_id, Eq(0));
     ASSERT_THAT(message_meta.id, Eq(0));
+    ASSERT_THAT(message_meta.dataset_substream, Eq(0));
 }
 
 
@@ -50,10 +52,10 @@ TEST(MessageMetaTests, CorrectConvertToJson) {
     std::string json = message_meta.Json();
     if (asapo::kPathSeparator == '/') {
         ASSERT_THAT(json, Eq(
-                        R"({"_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","buf_id":-1,"meta":{"bla":10}})"));
+                        R"({"_id":1,"size":100,"name":"folder/test","timestamp":1000000,"source":"host:1234","buf_id":-1,"dataset_substream":3,"meta":{"bla":10}})"));
     } else {
         ASSERT_THAT(json, Eq(
-                        R"({"_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","buf_id":-1,"meta":{"bla":10}})"));
+                        R"({"_id":1,"size":100,"name":"folder\\test","timestamp":1000000,"source":"host:1234","buf_id":-1,,"dataset_substream":3,"meta":{"bla":10}})"));
     }
 }
 
@@ -103,6 +105,7 @@ TEST(MessageMetaTests, CorrectConvertFromJson) {
     ASSERT_THAT(result.buf_id, Eq(message_meta.buf_id));
     ASSERT_THAT(result.source, Eq(message_meta.source));
     ASSERT_THAT(result.metadata, Eq(message_meta.metadata));
+    ASSERT_THAT(result.dataset_substream, Eq(message_meta.dataset_substream));
 
 }
 
diff --git a/consumer/api/cpp/include/asapo/consumer/consumer.h b/consumer/api/cpp/include/asapo/consumer/consumer.h
index 913246ab2..bc343dfbc 100644
--- a/consumer/api/cpp/include/asapo/consumer/consumer.h
+++ b/consumer/api/cpp/include/asapo/consumer/consumer.h
@@ -181,7 +181,7 @@ class Consumer {
 
     //! Get all messages matching the query.
     /*!
-      \param sql_query -  query string in SQL format. Limit subset is supported
+      \param sql_query -  query string in SQL format. Limit dataset is supported
       \param err - will be set in case of error, nullptr otherwise
       \return vector of message metadata matchiing to specified query. Empty if nothing found or error
     */
diff --git a/examples/pipeline/in_to_out_python/in_to_out.py b/examples/pipeline/in_to_out_python/in_to_out.py
index 45943a99b..0e58c1b0a 100644
--- a/examples/pipeline/in_to_out_python/in_to_out.py
+++ b/examples/pipeline/in_to_out_python/in_to_out.py
@@ -44,7 +44,7 @@ while True:
         data, meta = consumer.get_next(group_id, meta_only=not transfer_data)
         print ("received: ",meta)
         n_recv = n_recv + 1
-        producer.send_data(meta['_id'],meta['name']+"_"+stream_out ,data,
+        producer.send(meta['_id'],meta['name']+"_"+stream_out ,data,
                              ingest_mode = ingest_mode, callback = callback)
     except  asapo_consumer.AsapoEndOfStreamError:
         break
diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index b44eae6a8..a23c2ed08 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -163,8 +163,8 @@ bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t it
         } else {
             for (uint64_t id = 0; id < messages_in_set; id++) {
                 auto buffer = CreateMemoryBuffer(number_of_byte);
-                message_header.id_in_subset = id + 1;
-                message_header.subset_size = messages_in_set;
+                message_header.dataset_substream = id + 1;
+                message_header.dataset_size = messages_in_set;
                 message_header.message_id = i + 1;
                 message_header.file_name = std::to_string(i + 1) + "_" + std::to_string(id + 1);
                 if (!data_source.empty()) {
diff --git a/producer/api/cpp/include/asapo/producer/common.h b/producer/api/cpp/include/asapo/producer/common.h
index 07dd2a110..5b2da89b0 100644
--- a/producer/api/cpp/include/asapo/producer/common.h
+++ b/producer/api/cpp/include/asapo/producer/common.h
@@ -30,19 +30,19 @@ struct MessageHeader {
     MessageHeader() {};
     MessageHeader(uint64_t message_id_i, uint64_t data_size_i, std::string file_name_i,
                   std::string user_metadata_i = "",
-                  uint64_t id_in_subset_i = 0,
-                  uint64_t subset_size_i = 0 ):
+                  uint64_t dataset_substream_i = 0,
+                  uint64_t dataset_size_i = 0 ):
         message_id{message_id_i}, data_size{data_size_i},
         file_name{std::move(file_name_i)},
         user_metadata{std::move(user_metadata_i)},
-        id_in_subset{id_in_subset_i},
-        subset_size{subset_size_i} {};
+        dataset_substream{dataset_substream_i},
+        dataset_size{dataset_size_i} {};
     uint64_t message_id = 0;
     uint64_t data_size = 0;
     std::string file_name;
     std::string user_metadata;
-    uint64_t id_in_subset = 0;
-    uint64_t subset_size = 0;
+    uint64_t dataset_substream = 0;
+    uint64_t dataset_size = 0;
 };
 
 }
diff --git a/producer/api/cpp/include/asapo/producer/producer.h b/producer/api/cpp/include/asapo/producer/producer.h
index db73edf62..66b0a5b15 100644
--- a/producer/api/cpp/include/asapo/producer/producer.h
+++ b/producer/api/cpp/include/asapo/producer/producer.h
@@ -80,7 +80,7 @@ class Producer {
       \param full_path - A full path of the file to send
       \return Error - Will be nullptr on success
     */
-    virtual Error SendFromFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode,
+    virtual Error SendFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode,
                                RequestCallback callback) = 0;
 
     //! Sends message from a file to a stream
@@ -89,7 +89,7 @@ class Producer {
       \param full_path - A full path of the file to send
       \return Error - Will be nullptr on success
     */
-    virtual Error SendFromFile(const MessageHeader& message_header, std::string stream, std::string full_path,
+    virtual Error SendFile(const MessageHeader& message_header, std::string stream, std::string full_path,
                                uint64_t ingest_mode,
                                RequestCallback callback) = 0;
 
diff --git a/producer/api/cpp/src/producer_impl.cpp b/producer/api/cpp/src/producer_impl.cpp
index 72f2ee4a3..76bf9b4b2 100644
--- a/producer/api/cpp/src/producer_impl.cpp
+++ b/producer/api/cpp/src/producer_impl.cpp
@@ -38,10 +38,10 @@ GenericRequestHeader ProducerImpl::GenerateNextSendRequest(const MessageHeader&
                                                            uint64_t ingest_mode) {
     GenericRequestHeader request{kOpcodeTransferData, message_header.message_id, message_header.data_size,
                                  message_header.user_metadata.size(), message_header.file_name, stream};
-    if (message_header.id_in_subset != 0) {
-        request.op_code = kOpcodeTransferSubsetData;
-        request.custom_data[kPosDataSetId] = message_header.id_in_subset;
-        request.custom_data[kPosDataSetSize] = message_header.subset_size;
+    if (message_header.dataset_substream != 0) {
+        request.op_code = kOpcodeTransferDatasetData;
+        request.custom_data[kPosDataSetId] = message_header.dataset_substream;
+        request.custom_data[kPosDataSetSize] = message_header.dataset_size;
     }
     request.custom_data[kPosIngestMode] = ingest_mode;
     return request;
@@ -80,8 +80,8 @@ Error CheckProducerRequest(const MessageHeader& message_header, uint64_t ingest_
         return ProducerErrorTemplates::kWrongInput.Generate("empty filename");
     }
 
-    if (message_header.id_in_subset > 0 && message_header.subset_size == 0) {
-        return ProducerErrorTemplates::kWrongInput.Generate("subset dimensions");
+    if (message_header.dataset_substream > 0 && message_header.dataset_size == 0) {
+        return ProducerErrorTemplates::kWrongInput.Generate("dataset dimensions");
     }
 
     if (message_header.message_id == 0) {
@@ -161,9 +161,9 @@ Error ProducerImpl::SendStreamFinishedFlag(std::string stream, uint64_t last_id,
     return Send(message_header, std::move(stream), nullptr, "", IngestModeFlags::kTransferMetaDataOnly, callback, true);
 }
 
-Error ProducerImpl::SendFromFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode,
+Error ProducerImpl::SendFile(const MessageHeader& message_header, std::string full_path, uint64_t ingest_mode,
                                  RequestCallback callback) {
-    return SendFromFile(message_header, kDefaultStream, std::move(full_path), ingest_mode, callback);
+    return SendFile(message_header, kDefaultStream, std::move(full_path), ingest_mode, callback);
 }
 
 
@@ -262,7 +262,7 @@ Error ProducerImpl::WaitRequestsFinished(uint64_t timeout_ms) {
 void ProducerImpl::StopThreads__() {
     request_pool__->StopThreads();
 }
-Error ProducerImpl::SendFromFile(const MessageHeader& message_header,
+Error ProducerImpl::SendFile(const MessageHeader& message_header,
                                         std::string stream,
                                         std::string full_path,
                                         uint64_t ingest_mode,
diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h
index 18e952f70..3e04b58e2 100644
--- a/producer/api/cpp/src/producer_impl.h
+++ b/producer/api/cpp/src/producer_impl.h
@@ -50,9 +50,9 @@ class ProducerImpl : public Producer {
   Error Send__(const MessageHeader &message_header, std::string stream, void* data, uint64_t ingest_mode,
                RequestCallback callback) override;
   void StopThreads__() override;
-  Error SendFromFile(const MessageHeader &message_header, std::string full_path, uint64_t ingest_mode,
+  Error SendFile(const MessageHeader &message_header, std::string full_path, uint64_t ingest_mode,
                      RequestCallback callback) override;
-  Error SendFromFile(const MessageHeader &message_header, std::string stream, std::string full_path, uint64_t ingest_mode,
+  Error SendFile(const MessageHeader &message_header, std::string stream, std::string full_path, uint64_t ingest_mode,
                      RequestCallback callback) override;
 
   Error SendStreamFinishedFlag(std::string stream, uint64_t last_id, std::string next_stream,
diff --git a/producer/api/cpp/src/producer_request.cpp b/producer/api/cpp/src/producer_request.cpp
index 7cc63b4f1..73a64c2a6 100644
--- a/producer/api/cpp/src/producer_request.cpp
+++ b/producer/api/cpp/src/producer_request.cpp
@@ -27,7 +27,7 @@ ProducerRequest::ProducerRequest(std::string source_credentials,
 }
 
 bool ProducerRequest::NeedSend() const {
-    if (header.op_code == kOpcodeTransferData || header.op_code == kOpcodeTransferSubsetData) {
+    if (header.op_code == kOpcodeTransferData || header.op_code == kOpcodeTransferDatasetData) {
         return header.custom_data[kPosIngestMode] & IngestModeFlags::kTransferData;
     }
     return true;
diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp
index 09f6545f3..089dae54b 100644
--- a/producer/api/cpp/unittests/test_producer_impl.cpp
+++ b/producer/api/cpp/unittests/test_producer_impl.cpp
@@ -32,8 +32,8 @@ using asapo::ProducerRequest;
 
 MATCHER_P10(M_CheckSendRequest, op_code, source_credentials, metadata, file_id, file_size, message, stream,
             ingest_mode,
-            subset_id,
-            subset_size,
+            dataset_id,
+            dataset_size,
             "Checks if a valid GenericRequestHeader was Send") {
     auto request = static_cast<ProducerRequest*>(arg);
     return ((asapo::GenericRequestHeader) (arg->header)).op_code == op_code
@@ -42,10 +42,10 @@ MATCHER_P10(M_CheckSendRequest, op_code, source_credentials, metadata, file_id,
         && request->manage_data_memory == true
         && request->source_credentials == source_credentials
         && request->metadata == metadata
-        && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1]
-            == uint64_t(subset_id) : true)
-        && (op_code == asapo::kOpcodeTransferSubsetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2]
-            == uint64_t(subset_size) : true)
+        && (op_code == asapo::kOpcodeTransferDatasetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[1]
+            == uint64_t(dataset_id) : true)
+        && (op_code == asapo::kOpcodeTransferDatasetData ? ((asapo::GenericRequestHeader) (arg->header)).custom_data[2]
+            == uint64_t(dataset_size) : true)
         && ((asapo::GenericRequestHeader) (arg->header)).custom_data[asapo::kPosIngestMode] == uint64_t(ingest_mode)
         && strcmp(((asapo::GenericRequestHeader) (arg->header)).message, message) == 0
         && strcmp(((asapo::GenericRequestHeader) (arg->header)).stream, stream) == 0;
@@ -66,8 +66,8 @@ class ProducerImplTests : public testing::Test {
   asapo::ProducerImpl producer{"", 1, 3600000, asapo::RequestHandlerType::kTcp};
   uint64_t expected_size = 100;
   uint64_t expected_id = 10;
-  uint64_t expected_subset_id = 100;
-  uint64_t expected_subset_size = 4;
+  uint64_t expected_dataset_id = 100;
+  uint64_t expected_dataset_size = 4;
   uint64_t expected_ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly;
 
   char expected_name[asapo::kMaxMessageSize] = "test_name";
@@ -118,8 +118,8 @@ TEST_F(ProducerImplTests, ErrorIfFileEmpty) {
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
 }
 
-TEST_F(ProducerImplTests, ErrorIfSubsetSizeNotDefined) {
-    EXPECT_CALL(mock_logger, Error(testing::HasSubstr("subset dimensions")));
+TEST_F(ProducerImplTests, ErrorIfDatasetSizeNotDefined) {
+    EXPECT_CALL(mock_logger, Error(testing::HasSubstr("dataset dimensions")));
     asapo::MessageHeader message_header{1, 1000, "test", "", 1};
     auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr);
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
@@ -275,9 +275,9 @@ TEST_F(ProducerImplTests, OKSendingStreamFinishWithNoNextStream) {
     ASSERT_THAT(err, Eq(nullptr));
 }
 
-TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) {
+TEST_F(ProducerImplTests, OKSendingSendDatasetDataRequest) {
     producer.SetCredentials(expected_credentials);
-    EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendRequest(asapo::kOpcodeTransferSubsetData,
+    EXPECT_CALL(mock_pull, AddRequest_t(M_CheckSendRequest(asapo::kOpcodeTransferDatasetData,
                                                                expected_credentials_str,
                                                                expected_metadata,
                                                                expected_id,
@@ -285,13 +285,13 @@ TEST_F(ProducerImplTests, OKSendingSendSubsetDataRequest) {
                                                                expected_name,
                                                                asapo::kDefaultStream.c_str(),
                                                                expected_ingest_mode,
-                                                               expected_subset_id,
-                                                               expected_subset_size), false)).WillOnce(
+                                                               expected_dataset_id,
+                                                               expected_dataset_size), false)).WillOnce(
         Return(
             nullptr));
 
     asapo::MessageHeader message_header
-        {expected_id, expected_size, expected_name, expected_metadata, expected_subset_id, expected_subset_size};
+        {expected_id, expected_size, expected_name, expected_metadata, expected_dataset_id, expected_dataset_size};
     auto err = producer.Send(message_header, nullptr, expected_ingest_mode, nullptr);
 
     ASSERT_THAT(err, Eq(nullptr));
@@ -327,7 +327,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyFileName) {
     EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0);
 
     asapo::MessageHeader message_header{expected_id, 0, expected_name};
-    auto err = producer.SendFromFile(message_header, "", expected_ingest_mode, nullptr);
+    auto err = producer.SendFile(message_header, "", expected_ingest_mode, nullptr);
 
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
 
@@ -339,7 +339,7 @@ TEST_F(ProducerImplTests, ErrorSendingEmptyRelativeFileName) {
     EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0);
 
     asapo::MessageHeader message_header{expected_id, 0, ""};
-    auto err = producer.SendFromFile(message_header, expected_fullpath, expected_ingest_mode, nullptr);
+    auto err = producer.SendFile(message_header, expected_fullpath, expected_ingest_mode, nullptr);
 
     ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
 
@@ -361,7 +361,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequest) {
         nullptr));
 
     asapo::MessageHeader message_header{expected_id, 0, expected_name};
-    auto err = producer.SendFromFile(message_header, expected_fullpath, expected_ingest_mode, nullptr);
+    auto err = producer.SendFile(message_header, expected_fullpath, expected_ingest_mode, nullptr);
 
     ASSERT_THAT(err, Eq(nullptr));
 }
@@ -383,7 +383,7 @@ TEST_F(ProducerImplTests, OKSendingSendFileRequestWithStream) {
 
     asapo::MessageHeader message_header{expected_id, 0, expected_name};
     auto err =
-        producer.SendFromFile(message_header, expected_stream, expected_fullpath, expected_ingest_mode, nullptr);
+        producer.SendFile(message_header, expected_stream, expected_fullpath, expected_ingest_mode, nullptr);
 
     ASSERT_THAT(err, Eq(nullptr));
 }
@@ -421,7 +421,7 @@ TEST_F(ProducerImplTests, ErrorSendingWrongIngestMode) {
     EXPECT_CALL(mock_pull, AddRequest_t(_, _)).Times(0);
 
     for (auto ingest_mode : ingest_modes) {
-        auto err = producer.SendFromFile(message_header, expected_fullpath, ingest_mode, nullptr);
+        auto err = producer.SendFile(message_header, expected_fullpath, ingest_mode, nullptr);
         ASSERT_THAT(err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
     }
 
diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
index d9f4a155e..25a88248b 100644
--- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp
+++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
@@ -132,8 +132,8 @@ class RequestHandlerTcpTests : public testing::Test {
   void ExpectOKSendAll(bool only_once);
   void ExpectGetFileSize(bool ok);
   void ExpectOKSend(bool only_once = false);
-  void ExpectOKSendFromFile(bool only_once = false);
-  void ExpectFailSendFromFile(const asapo::ProducerErrorTemplate &err_template, bool client_error = false);
+  void ExpectOKSendFile(bool only_once = false);
+  void ExpectFailSendFile(const asapo::ProducerErrorTemplate &err_template, bool client_error = false);
   void ExpectOKSendMetaData(bool only_once = false);
   void ExpectFailReceive(bool only_once = false);
   void ExpectOKReceive(bool only_once = true, asapo::NetworkErrorCode code = asapo::kNetErrorNoError,
@@ -302,7 +302,7 @@ void RequestHandlerTcpTests::ExpectFailSendHeader(bool only_once) {
     }
 }
 
-void RequestHandlerTcpTests::ExpectFailSendFromFile(const asapo::ProducerErrorTemplate &err_template, bool client_error) {
+void RequestHandlerTcpTests::ExpectFailSendFile(const asapo::ProducerErrorTemplate &err_template, bool client_error) {
     int i = 0;
     for (auto expected_sd : expected_sds) {
         EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t) expected_file_size))
@@ -430,7 +430,7 @@ void RequestHandlerTcpTests::ExpectOKSend(bool only_once) {
     ExpectOKSend(expected_file_size, only_once);
 }
 
-void RequestHandlerTcpTests::ExpectOKSendFromFile(bool only_once) {
+void RequestHandlerTcpTests::ExpectOKSendFile(bool only_once) {
     for (auto expected_sd : expected_sds) {
         EXPECT_CALL(mock_io, SendFile_t(expected_sd, expected_origin_fullpath, (size_t) expected_file_size))
             .Times(1)
@@ -772,13 +772,13 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) {
     ASSERT_THAT(retry, Eq(false));
 }
 
-TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFromFileWithReadError) {
+TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithReadError) {
     ExpectGetFileSize(true);
     ExpectOKConnect(true);
     ExpectOKAuthorize(true);
     ExpectOKSendHeader(true);
     ExpectOKSendMetaData(true);
-    ExpectFailSendFromFile(asapo::ProducerErrorTemplates::kLocalIOError, true);
+    ExpectFailSendFile(asapo::ProducerErrorTemplates::kLocalIOError, true);
 
     request_handler.PrepareProcessingRequestLocked();
     auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry);
@@ -790,13 +790,13 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFromFileWithReadError) {
 
 }
 
-TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFromFileWithServerError) {
+TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithServerError) {
     ExpectGetFileSize(true);
     ExpectOKConnect();
     ExpectOKAuthorize();
     ExpectOKSendHeader();
     ExpectOKSendMetaData();
-    ExpectFailSendFromFile(asapo::ProducerErrorTemplates::kInternalServerError);
+    ExpectFailSendFile(asapo::ProducerErrorTemplates::kInternalServerError);
 
     request_handler.PrepareProcessingRequestLocked();
     auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry);
@@ -839,7 +839,7 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) {
     ExpectOKAuthorize(true);
     ExpectOKSendHeader(true);
     ExpectOKSendMetaData(true);
-    ExpectOKSendFromFile(true);
+    ExpectOKSendFile(true);
     ExpectOKReceive(true, asapo::kNetErrorNoError, expected_response);
 
     request_handler.PrepareProcessingRequestLocked();
diff --git a/producer/api/python/asapo_producer.pxd b/producer/api/python/asapo_producer.pxd
index c09a2d7df..78879b01d 100644
--- a/producer/api/python/asapo_producer.pxd
+++ b/producer/api/python/asapo_producer.pxd
@@ -65,8 +65,8 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo":
     uint64_t data_size
     string file_name
     string user_metadata
-    uint64_t id_in_subset
-    uint64_t subset_size
+    uint64_t dataset_substream
+    uint64_t dataset_size
 
 cdef extern from "asapo/asapo_producer.h" namespace "asapo":
   struct  GenericRequestHeader:
@@ -95,7 +95,7 @@ cdef extern from "asapo/asapo_producer.h" namespace "asapo" nogil:
     cppclass Producer:
         @staticmethod
         unique_ptr[Producer] Create(string endpoint,uint8_t nthreads,RequestHandlerType type, SourceCredentials source,uint64_t timeout_ms, Error* error)
-        Error SendFromFile(const MessageHeader& message_header, string stream, string full_path, uint64_t ingest_mode,RequestCallback callback)
+        Error SendFile(const MessageHeader& message_header, string stream, string full_path, uint64_t ingest_mode,RequestCallback callback)
         Error Send__(const MessageHeader& message_header, string stream, void* data, uint64_t ingest_mode,RequestCallback callback)
         void StopThreads__()
         void SetLogLevel(LogLevel level)
diff --git a/producer/api/python/asapo_producer.pyx.in b/producer/api/python/asapo_producer.pyx.in
index d6322ae30..6a7e9f64e 100644
--- a/producer/api/python/asapo_producer.pyx.in
+++ b/producer/api/python/asapo_producer.pyx.in
@@ -107,8 +107,8 @@ cdef class PyProducer:
             return
          self.c_producer.get().SetLogLevel(log_level)
 
-    def __send_np_array(self, id, exposed_path,data, user_meta=None,subset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None):
-        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,subset,ingest_mode)
+    def __send_np_array(self, id, exposed_path,data, user_meta=None,dataset=None,stream="default",ingest_mode = DEFAULT_INGEST_MODE,callback=None):
+        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode)
         if data is None:
             message_header.data_size = 0
         else:
@@ -126,21 +126,21 @@ cdef class PyProducer:
         if callback != None:
             Py_XINCREF(<PyObject*>callback)
         return
-    cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,subset,ingest_mode):
+    cdef MessageHeader create_message_header(self,uint64_t id, exposed_path,user_meta,dataset,ingest_mode):
         cdef MessageHeader message_header
         message_header.message_id = id
         message_header.file_name = _bytes(exposed_path)
         message_header.user_metadata = _bytes(user_meta) if user_meta!=None else ""
-        if subset == None:
-            message_header.id_in_subset = 0
-            message_header.subset_size = 0
+        if dataset == None:
+            message_header.dataset_substream = 0
+            message_header.dataset_size = 0
         else:
-            message_header.id_in_subset = subset[0]
-            message_header.subset_size = subset[1]
+            message_header.dataset_substream = dataset[0]
+            message_header.dataset_size = dataset[1]
         return message_header
 
-    def __send_bytes(self, id, exposed_path,data, user_meta=None,subset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None):
-        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,subset,ingest_mode)
+    def __send_bytes(self, id, exposed_path,data, user_meta=None,dataset=None, stream="default", ingest_mode = DEFAULT_INGEST_MODE,callback=None):
+        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode)
         message_header.data_size = len(data)
         err = self.c_producer.get().Send__(message_header,_bytes(stream),  data_pointer_bytes(data), ingest_mode,
             unwrap_callback_with_memory(<RequestCallbackCythonMemory>self.c_callback_bytesaddr,
@@ -152,7 +152,7 @@ cdef class PyProducer:
             Py_XINCREF(<PyObject*>callback)
         return
 
-    def send_data(self, uint64_t id, exposed_path, data, user_meta=None, subset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None):
+    def send(self, uint64_t id, exposed_path, data, user_meta=None, dataset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None):
         """
          :param id: unique data id
          :type id: int
@@ -162,8 +162,8 @@ cdef class PyProducer:
          :type data: contiguous numpy or bytes array, can be None for INGEST_MODE_TRANSFER_METADATA_ONLY ingest mode
          :param user_meta: user metadata, default None
          :type user_meta: JSON string
-         :param subset: a tuple with two int values (id in subset, subset size), default None
-         :type subset: tuple
+         :param dataset: a tuple with two int values (dataset substream id, amount of dataset substreams), default None
+         :type dataset: tuple
          :param stream: stream name, default "default"
          :type stream: string
          :param ingest_mode: ingest mode flag
@@ -175,9 +175,9 @@ cdef class PyProducer:
             AsapoProducerError: actually should not happen
         """
         if type(data) == np.ndarray or data == None:
-            self.__send_np_array(id,exposed_path,data,user_meta,subset,stream,ingest_mode,callback)
+            self.__send_np_array(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback)
         elif type(data) == bytes:
-            self.__send_bytes(id,exposed_path,data,user_meta,subset,stream,ingest_mode,callback)
+            self.__send_bytes(id,exposed_path,data,user_meta,dataset,stream,ingest_mode,callback)
         else:
             raise(AsapoProducerError("wrong data type: " + str(type(data))))
     def send_stream_finished_flag(self, stream, uint64_t last_id, next_stream = None, callback = None):
@@ -234,7 +234,7 @@ cdef class PyProducer:
         if err:
             throw_exception(err)
         return json.loads(_str(info.Json(True)))
-    def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, subset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None):
+    def send_file(self, uint64_t id, local_path, exposed_path, user_meta=None, dataset=None, stream = "default", ingest_mode = DEFAULT_INGEST_MODE, callback=None):
         """
          :param id: unique data id
          :type id: int
@@ -244,8 +244,8 @@ cdef class PyProducer:
          :type exposed_path: string
          :param user_meta: user metadata, default None
          :type user_meta: JSON string
-         :param subset: a tuple with two int values (subset id, subset size), default None
-         :type subset: tuple
+         :param dataset: a tuple with two int values (dataset id, dataset size), default None
+         :type dataset: tuple
          :param stream: stream name, default "default"
          :type stream: string
          :param ingest_mode: ingest mode flag
@@ -258,9 +258,9 @@ cdef class PyProducer:
             AsapoProducerError: actually should not happen
         """
 
-        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,subset,ingest_mode)
+        cdef MessageHeader message_header = self.create_message_header(id,exposed_path,user_meta,dataset,ingest_mode)
         message_header.data_size = 0
-        err = self.c_producer.get().SendFromFile(message_header, _bytes(stream), _bytes(local_path), ingest_mode,
+        err = self.c_producer.get().SendFile(message_header, _bytes(stream), _bytes(local_path), ingest_mode,
             unwrap_callback(<RequestCallbackCython>self.c_callback, <void*>self,<void*>callback if callback != None else NULL))
         if err:
             throw_exception(err)
diff --git a/producer/event_monitor_producer/src/eventmon_config.cpp b/producer/event_monitor_producer/src/eventmon_config.cpp
index eeed35561..d3d403950 100644
--- a/producer/event_monitor_producer/src/eventmon_config.cpp
+++ b/producer/event_monitor_producer/src/eventmon_config.cpp
@@ -11,30 +11,30 @@ EventMonConfigFactory::EventMonConfigFactory() : io__{GenerateDefaultIO()} {
 
 }
 
-Error SubsetModeToEnum(const std::string& mode_str, SubSetMode* mode) {
+Error DatasetModeToEnum(const std::string& mode_str, DatasetMode* mode) {
     if (mode_str == "batch") {
-        *mode = SubSetMode::kBatch;
+        *mode = DatasetMode::kBatch;
         return nullptr;
     }
 
     if (mode_str == "none") {
-        *mode = SubSetMode::kNone;
+        *mode = DatasetMode::kNone;
         return nullptr;
     }
 
     if (mode_str == "multisource") {
-        *mode = SubSetMode::kMultiSource;
+        *mode = DatasetMode::kMultiSource;
         return nullptr;
     }
 
 
-    return TextError("Wrone subset mode:" + mode_str);
+    return TextError("Wrone dataset mode:" + mode_str);
 }
 
 Error EventMonConfigFactory::ParseConfigFile(std::string file_name) {
     JsonFileParser parser(file_name, &io__);
     Error err = nullptr;
-    std::string subset_mode;
+    std::string dataset_mode;
 
     (err = parser.GetString("AsapoEndpoint", &config.asapo_endpoint)) ||
     (err = parser.GetString("Tag", &config.tag)) ||
@@ -48,19 +48,19 @@ Error EventMonConfigFactory::ParseConfigFile(std::string file_name) {
     (err = parser.GetArrayString("MonitoredSubFolders", &config.monitored_subfolders)) ||
     (err = parser.GetArrayString("IgnoreExtensions", &config.ignored_extensions)) ||
     (err = parser.GetArrayString("WhitelistExtensions", &config.whitelisted_extensions)) ||
-    (err = parser.Embedded("Subset").GetString("Mode", &subset_mode)) ||
-    (err = SubsetModeToEnum(subset_mode, &config.subset_mode));
+    (err = parser.Embedded("Dataset").GetString("Mode", &dataset_mode)) ||
+    (err = DatasetModeToEnum(dataset_mode, &config.dataset_mode));
     if (err) {
         return err;
     }
 
-    if (config.subset_mode == SubSetMode::kBatch) {
-        err = parser.Embedded("Subset").GetUInt64("BatchSize", &config.subset_batch_size);
+    if (config.dataset_mode == DatasetMode::kBatch) {
+        err = parser.Embedded("Dataset").GetUInt64("BatchSize", &config.dataset_batch_size);
     }
 
-    if (config.subset_mode == SubSetMode::kMultiSource) {
-        err = parser.Embedded("Subset").GetUInt64("NSources", &config.subset_multisource_nsources);
-        err = parser.Embedded("Subset").GetUInt64("SourceId", &config.subset_multisource_sourceid);
+    if (config.dataset_mode == DatasetMode::kMultiSource) {
+        err = parser.Embedded("Dataset").GetUInt64("NSources", &config.dataset_multisource_nsources);
+        err = parser.Embedded("Dataset").GetUInt64("SourceId", &config.dataset_multisource_sourceid);
     }
 
 
@@ -73,7 +73,7 @@ Error EventMonConfigFactory::CheckConfig() {
     (err = CheckMode()) ||
     (err = CheckLogLevel()) ||
     (err = CheckNThreads()) ||
-    (err = CheckSubsets());
+    (err = CheckDatasets());
 
 //todo: check monitored folders exist?
     return err;
@@ -113,13 +113,13 @@ Error EventMonConfigFactory::CheckNThreads() {
     return nullptr;
 }
 
-Error EventMonConfigFactory::CheckSubsets() {
-    if (config.subset_mode == SubSetMode::kBatch && config.subset_batch_size < 1) {
+Error EventMonConfigFactory::CheckDatasets() {
+    if (config.dataset_mode == DatasetMode::kBatch && config.dataset_batch_size < 1) {
         return  TextError("Batch size should > 0");
     }
 
 
-    if (config.subset_mode == SubSetMode::kMultiSource && config.subset_multisource_nsources < 1) {
+    if (config.dataset_mode == DatasetMode::kMultiSource && config.dataset_multisource_nsources < 1) {
         return  TextError("Number of sources size should be > 0");
     }
 
diff --git a/producer/event_monitor_producer/src/eventmon_config.h b/producer/event_monitor_producer/src/eventmon_config.h
index 0fe7065e7..3f404ed36 100644
--- a/producer/event_monitor_producer/src/eventmon_config.h
+++ b/producer/event_monitor_producer/src/eventmon_config.h
@@ -9,7 +9,7 @@
 
 namespace asapo {
 
-enum class SubSetMode {
+enum class DatasetMode {
     kNone,
     kBatch,
     kMultiSource
@@ -27,10 +27,10 @@ struct EventMonConfig {
     std::vector<std::string> ignored_extensions;
     std::vector<std::string> whitelisted_extensions;
     bool remove_after_send = false;
-    SubSetMode subset_mode = SubSetMode::kNone;
-    uint64_t subset_batch_size = 1;
-    uint64_t subset_multisource_nsources = 1;
-    uint64_t subset_multisource_sourceid = 1;
+    DatasetMode dataset_mode = DatasetMode::kNone;
+    uint64_t dataset_batch_size = 1;
+    uint64_t dataset_multisource_nsources = 1;
+    uint64_t dataset_multisource_sourceid = 1;
     std::string data_source;
   private:
     std::string log_level_str;
diff --git a/producer/event_monitor_producer/src/eventmon_config_factory.h b/producer/event_monitor_producer/src/eventmon_config_factory.h
index 0c50df196..7697238fb 100644
--- a/producer/event_monitor_producer/src/eventmon_config_factory.h
+++ b/producer/event_monitor_producer/src/eventmon_config_factory.h
@@ -16,7 +16,7 @@ class EventMonConfigFactory {
     Error ParseConfigFile(std::string file_name);
     Error CheckMode();
     Error CheckLogLevel();
-    Error CheckSubsets();
+    Error CheckDatasets();
     Error CheckNThreads();
     Error CheckConfig();
 };
diff --git a/producer/event_monitor_producer/src/main_eventmon.cpp b/producer/event_monitor_producer/src/main_eventmon.cpp
index 091c8b7df..3450ded0b 100644
--- a/producer/event_monitor_producer/src/main_eventmon.cpp
+++ b/producer/event_monitor_producer/src/main_eventmon.cpp
@@ -76,18 +76,18 @@ void SignalHandler(int signal) {
 }
 
 
-void HandleSubsets(asapo::MessageHeader* header) {
-    switch (GetEventMonConfig()->subset_mode) {
-    case asapo::SubSetMode::kNone:
+void HandleDatasets(asapo::MessageHeader* header) {
+    switch (GetEventMonConfig()->dataset_mode) {
+    case asapo::DatasetMode::kNone:
         return;
-    case asapo::SubSetMode::kBatch:
-        header->subset_size = GetEventMonConfig()->subset_batch_size;
-        header->id_in_subset = (header->message_id - 1) % header->subset_size + 1;
-        header->message_id = (header->message_id - 1) / header->subset_size + 1;
+    case asapo::DatasetMode::kBatch:
+        header->dataset_size = GetEventMonConfig()->dataset_batch_size;
+        header->dataset_substream = (header->message_id - 1) % header->dataset_size + 1;
+        header->message_id = (header->message_id - 1) / header->dataset_size + 1;
         break;
-    case asapo::SubSetMode::kMultiSource:
-        header->subset_size = GetEventMonConfig()->subset_multisource_nsources;
-        header->id_in_subset = GetEventMonConfig()->subset_multisource_sourceid;
+    case asapo::DatasetMode::kMultiSource:
+        header->dataset_size = GetEventMonConfig()->dataset_multisource_nsources;
+        header->dataset_substream = GetEventMonConfig()->dataset_multisource_sourceid;
         break;
     }
 }
@@ -136,8 +136,8 @@ int main (int argc, char* argv[]) {
             continue;
         }
         message_header.message_id = ++i;
-        HandleSubsets(&message_header);
-        producer->SendFromFile(message_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator +
+        HandleDatasets(&message_header);
+        producer->SendFile(message_header, GetEventMonConfig()->root_monitored_folder + asapo::kPathSeparator +
             message_header.file_name, asapo::kDefaultIngestMode, ProcessAfterSend);
     }
 
diff --git a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp
index c41aca5ff..4d0a68286 100644
--- a/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp
+++ b/producer/event_monitor_producer/unittests/mock_eventmon_config.cpp
@@ -51,28 +51,28 @@ Error SetFolderMonConfig (const EventMonConfig& config) {
     config_string += "," + std::string("\"RemoveAfterSend\":") + (config.remove_after_send ? "true" : "false");
     config_string += "," + std::string("\"DataSource\":") + "\"" + config.data_source + "\"";
 
-    std::string subset_mode;
-    switch (config.subset_mode) {
-    case SubSetMode::kBatch:
-        subset_mode = "batch";
+    std::string dataset_mode;
+    switch (config.dataset_mode) {
+    case DatasetMode::kBatch:
+        dataset_mode = "batch";
         break;
-    case SubSetMode::kMultiSource:
-        subset_mode = "multisource";
+    case DatasetMode::kMultiSource:
+        dataset_mode = "multisource";
         break;
 
-    case SubSetMode::kNone:
-        subset_mode = "none";
+    case DatasetMode::kNone:
+        dataset_mode = "none";
         break;
 
     }
-    config_string += "," + std::string("\"Subset\":{");
-    config_string += std::string("\"Mode\":") + "\"" + subset_mode + "\"";
-    if (config.subset_mode == SubSetMode::kBatch) {
-        config_string += "," + std::string("\"BatchSize\":") + std::to_string(config.subset_batch_size);
+    config_string += "," + std::string("\"Dataset\":{");
+    config_string += std::string("\"Mode\":") + "\"" + dataset_mode + "\"";
+    if (config.dataset_mode == DatasetMode::kBatch) {
+        config_string += "," + std::string("\"BatchSize\":") + std::to_string(config.dataset_batch_size);
     }
-    if (config.subset_mode == SubSetMode::kMultiSource) {
-        config_string += "," + std::string("\"SourceId\":") + std::to_string(config.subset_multisource_sourceid);
-        config_string += "," + std::string("\"NSources\":") + std::to_string(config.subset_multisource_nsources);
+    if (config.dataset_mode == DatasetMode::kMultiSource) {
+        config_string += "," + std::string("\"SourceId\":") + std::to_string(config.dataset_multisource_sourceid);
+        config_string += "," + std::string("\"NSources\":") + std::to_string(config.dataset_multisource_nsources);
     }
 
     config_string += "}";
diff --git a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp
index 0d4e90fea..f9369e4f4 100644
--- a/producer/event_monitor_producer/unittests/test_eventmon_config.cpp
+++ b/producer/event_monitor_producer/unittests/test_eventmon_config.cpp
@@ -31,7 +31,7 @@ using ::asapo::MockIO;
 using ::asapo::EventMonConfigFactory;
 using asapo::EventMonConfig;
 
-using asapo::SubSetMode;
+using asapo::DatasetMode;
 
 namespace {
 
@@ -62,8 +62,8 @@ TEST_F(ConfigTests, ReadSettingsOK) {
     test_config.monitored_subfolders = {"test1", "test2"};
     test_config.ignored_extensions = {"tmp", "test"};
     test_config.remove_after_send = true;
-    test_config.subset_mode = SubSetMode::kBatch;
-    test_config.subset_batch_size = 9;
+    test_config.dataset_mode = DatasetMode::kBatch;
+    test_config.dataset_batch_size = 9;
     test_config.data_source = "source";
     test_config.whitelisted_extensions =  {"bla"};
 
@@ -82,8 +82,8 @@ TEST_F(ConfigTests, ReadSettingsOK) {
     ASSERT_THAT(config->root_monitored_folder, Eq("tmp"));
     ASSERT_THAT(config->ignored_extensions, ElementsAre("tmp", "test"));
     ASSERT_THAT(config->remove_after_send, Eq(true));
-    ASSERT_THAT(config->subset_mode, Eq(SubSetMode::kBatch));
-    ASSERT_THAT(config->subset_batch_size, Eq(9));
+    ASSERT_THAT(config->dataset_mode, Eq(DatasetMode::kBatch));
+    ASSERT_THAT(config->dataset_batch_size, Eq(9));
     ASSERT_THAT(config->data_source, Eq("source"));
 }
 
@@ -103,17 +103,17 @@ TEST_F(ConfigTests, ReadSettingsWhiteListOK) {
 
 TEST_F(ConfigTests, ReadSettingsMultiSourceOK) {
     asapo::EventMonConfig test_config;
-    test_config.subset_mode = SubSetMode::kMultiSource;
-    test_config.subset_multisource_nsources = 2;
-    test_config.subset_multisource_sourceid = 12;
+    test_config.dataset_mode = DatasetMode::kMultiSource;
+    test_config.dataset_multisource_nsources = 2;
+    test_config.dataset_multisource_sourceid = 12;
     auto err = asapo::SetFolderMonConfig(test_config);
 
     auto config = asapo::GetEventMonConfig();
 
     ASSERT_THAT(err, Eq(nullptr));
-    ASSERT_THAT(config->subset_mode, Eq(SubSetMode::kMultiSource));
-    ASSERT_THAT(config->subset_multisource_nsources, Eq(2));
-    ASSERT_THAT(config->subset_multisource_sourceid, Eq(12));
+    ASSERT_THAT(config->dataset_mode, Eq(DatasetMode::kMultiSource));
+    ASSERT_THAT(config->dataset_multisource_nsources, Eq(2));
+    ASSERT_THAT(config->dataset_multisource_sourceid, Eq(12));
 
 }
 
@@ -135,16 +135,16 @@ TEST_F(ConfigTests, ReadSettingsChecksNthreads) {
 
 }
 
-TEST_F(ConfigTests, ReadSettingsChecksSubsets) {
+TEST_F(ConfigTests, ReadSettingsChecksDatasets) {
     asapo::EventMonConfig test_config;
-    test_config.subset_mode = SubSetMode::kBatch;
-    test_config.subset_batch_size = 0;
+    test_config.dataset_mode = DatasetMode::kBatch;
+    test_config.dataset_batch_size = 0;
 
     auto err = asapo::SetFolderMonConfig(test_config);
     ASSERT_THAT(err, Ne(nullptr));
 
-    test_config.subset_mode = SubSetMode::kMultiSource;
-    test_config.subset_multisource_nsources = 0;
+    test_config.dataset_mode = DatasetMode::kMultiSource;
+    test_config.dataset_multisource_nsources = 0;
 
     err = asapo::SetFolderMonConfig(test_config);
     ASSERT_THAT(err, Ne(nullptr));
@@ -152,9 +152,9 @@ TEST_F(ConfigTests, ReadSettingsChecksSubsets) {
 
 }
 
-TEST_F(ConfigTests, ReadSettingsDoesnotChecksSubsetsIfNoSubsets) {
+TEST_F(ConfigTests, ReadSettingsDoesnotChecksDatasetsIfNoDatasets) {
     asapo::EventMonConfig test_config;
-    test_config.subset_batch_size = 0;
+    test_config.dataset_batch_size = 0;
 
     auto err = asapo::SetFolderMonConfig(test_config);
     ASSERT_THAT(err, Eq(nullptr));
diff --git a/receiver/src/request_handler/request_factory.cpp b/receiver/src/request_handler/request_factory.cpp
index c3372ca13..fdacdd94a 100644
--- a/receiver/src/request_handler/request_factory.cpp
+++ b/receiver/src/request_handler/request_factory.cpp
@@ -51,7 +51,7 @@ Error RequestFactory::AddHandlersToRequest(std::unique_ptr<Request> &request,
 
     switch (request_header.op_code) {
         case Opcode::kOpcodeTransferData:
-        case Opcode::kOpcodeTransferSubsetData: {
+        case Opcode::kOpcodeTransferDatasetData: {
             request->AddHandler(&request_handler_receive_metadata_);
             auto err = AddReceiveWriteHandlers(request, request_header);
             if (err) {
diff --git a/receiver/src/request_handler/request_handler_db_check_request.cpp b/receiver/src/request_handler/request_handler_db_check_request.cpp
index 60fbe61a1..b43347fee 100644
--- a/receiver/src/request_handler/request_handler_db_check_request.cpp
+++ b/receiver/src/request_handler/request_handler_db_check_request.cpp
@@ -32,7 +32,7 @@ Error RequestHandlerDbCheckRequest::GetRecordFromDb(const Request* request, Mess
         auto id_in_set = request->GetCustomData()[1];
         err = db_client__->GetDataSetById(col_name, id_in_set, id, record);
         if (!err) {
-            log__->Debug(std::string{"get subset record id "} + std::to_string(id) + " from " + col_name + " in " +
+            log__->Debug(std::string{"get dataset record id "} + std::to_string(id) + " from " + col_name + " in " +
                          db_name_ + " at " + GetReceiverConfig()->database_uri);
         }
         return err;
diff --git a/receiver/src/request_handler/request_handler_db_write.cpp b/receiver/src/request_handler/request_handler_db_write.cpp
index 2d61d1122..d0113286a 100644
--- a/receiver/src/request_handler/request_handler_db_write.cpp
+++ b/receiver/src/request_handler/request_handler_db_write.cpp
@@ -65,13 +65,12 @@ Error RequestHandlerDbWrite::InsertRecordToDb(const Request* request) const {
                          " at " + GetReceiverConfig()->database_uri);
         }
     } else {
-        auto subset_id = message_meta.id;
-        message_meta.id = request->GetCustomData()[1];
-        auto subset_size = request->GetCustomData()[2];
-        err =  db_client__->InsertAsSubset(col_name, message_meta, subset_id, subset_size, false);
+        message_meta.dataset_substream = request->GetCustomData()[1];
+        auto dataset_size = request->GetCustomData()[2];
+        err =  db_client__->InsertAsDatasetMessage(col_name, message_meta, dataset_size, false);
         if (!err) {
-            log__->Debug(std::string{"insert record as subset id "} + std::to_string(message_meta.id) + ", id in subset: " +
-                         std::to_string(subset_id) + " to " + col_name + " in " +
+            log__->Debug(std::string{"insert record to substream "} + std::to_string(message_meta.dataset_substream) + ", id: " +
+                         std::to_string(message_meta.id) + " to " + col_name + " in " +
                          db_name_ +
                          " at " + GetReceiverConfig()->database_uri);
         }
diff --git a/receiver/unittests/request_handler/test_request_factory.cpp b/receiver/unittests/request_handler/test_request_factory.cpp
index bb924ba63..5f224d3d0 100644
--- a/receiver/unittests/request_handler/test_request_factory.cpp
+++ b/receiver/unittests/request_handler/test_request_factory.cpp
@@ -80,7 +80,7 @@ TEST_F(FactoryTests, ErrorOnWrongCode) {
 }
 
 TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCode) {
-    for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferSubsetData}) {
+    for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) {
         generic_request_header.op_code = code;
         auto request = factory.GenerateRequest(generic_request_header, 1, origin_uri, &err);
 
@@ -96,7 +96,7 @@ TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCode) {
 }
 
 TEST_F(FactoryTests, ReturnsDataRequestOnkNetOpcodeSendCodeLargeFile) {
-    for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferSubsetData}) {
+    for (auto code : std::vector<asapo::Opcode> {asapo::Opcode::kOpcodeTransferData, asapo::Opcode::kOpcodeTransferDatasetData}) {
         generic_request_header.op_code = code;
         config.receive_to_disk_threshold_mb = 0;
         SetReceiverConfig(config, "none");
diff --git a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp
index 31d93693b..85c2b5da9 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_check_request.cpp
@@ -82,9 +82,9 @@ class DbCheckRequestHandlerTests : public Test {
     std::string expected_metadata = "meta";
     uint64_t expected_file_size = 10;
     uint64_t expected_id = 15;
-    uint64_t expected_subset_id = 16;
-    uint64_t expected_subset_size = 2;
-    uint64_t expected_custom_data[asapo::kNCustomParams] {0, expected_subset_id, expected_subset_size};
+    uint64_t expected_dataset_id = 16;
+    uint64_t expected_dataset_size = 2;
+    uint64_t expected_custom_data[asapo::kNCustomParams] {0, expected_dataset_id, expected_dataset_size};
     MessageMeta expected_message_meta;
     MockFunctions mock_functions;
     int n_run = 0;
@@ -179,7 +179,7 @@ void DbCheckRequestHandlerTests::ExpectRequestParams(asapo::Opcode op_code, cons
     .WillOnce(Return(op_code))
     ;
 
-    if (op_code == asapo::Opcode::kOpcodeTransferSubsetData) {
+    if (op_code == asapo::Opcode::kOpcodeTransferDatasetData) {
         EXPECT_CALL(*mock_request, GetCustomData_t())
         .WillOnce(Return(expected_custom_data))
         ;
@@ -207,8 +207,8 @@ void DbCheckRequestHandlerTests::MockGetByID(asapo::ErrorInterface* error, bool
 }
 
 void DbCheckRequestHandlerTests::MockGetSetByID(asapo::ErrorInterface* error, bool expect_compare ) {
-    ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_data_source, expect_compare);
-    EXPECT_CALL(mock_db, GetSetById_t(expected_collection_name, expected_subset_id, expected_id, _)).
+    ExpectRequestParams(asapo::Opcode::kOpcodeTransferDatasetData, expected_data_source, expect_compare);
+    EXPECT_CALL(mock_db, GetSetById_t(expected_collection_name, expected_dataset_id, expected_id, _)).
     WillOnce(DoAll(
                  SetArgPointee<3>(expected_message_meta),
                  testing::Return(error)
diff --git a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp
index b4962bedc..129e704a0 100644
--- a/receiver/unittests/request_handler/test_request_handler_db_writer.cpp
+++ b/receiver/unittests/request_handler/test_request_handler_db_writer.cpp
@@ -83,9 +83,9 @@ class DbWriterHandlerTests : public Test {
     std::string expected_metadata = "meta";
     uint64_t expected_file_size = 10;
     uint64_t expected_id = 15;
-    uint64_t expected_subset_id = 15;
-    uint64_t expected_subset_size = 2;
-    uint64_t expected_custom_data[asapo::kNCustomParams] {0, expected_subset_id, expected_subset_size};
+    uint64_t expected_substream = 20;
+    uint64_t expected_dataset_size = 2;
+    uint64_t expected_custom_data[asapo::kNCustomParams] {0, expected_substream, expected_dataset_size};
     asapo::MockHandlerDbCheckRequest mock_db_check_handler{asapo::kDBDataCollectionNamePrefix};
 
     void SetUp() override {
@@ -105,7 +105,7 @@ class DbWriterHandlerTests : public Test {
     void ExpectRequestParams(asapo::Opcode op_code, const std::string& data_source);
     void ExpectLogger();
     void ExpectDuplicatedID();
-    MessageMeta PrepareMessageMeta();
+    MessageMeta PrepareMessageMeta(bool substream = false);
     void TearDown() override {
         handler.db_client__.release();
     }
@@ -117,6 +117,7 @@ MATCHER_P(CompareMessageMeta, file, "") {
     if (arg.size != file.size) return false;
     if (arg.source != file.source) return false;
     if (arg.buf_id != file.buf_id) return false;
+    if (arg.dataset_substream != file.dataset_substream) return false;
     if (arg.name != file.name) return false;
     if (arg.id != file.id) return false;
     if (arg.metadata != file.metadata) return false;
@@ -179,7 +180,7 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std:
     .WillOnce(Return(op_code))
     ;
 
-    if (op_code == asapo::Opcode::kOpcodeTransferSubsetData) {
+    if (op_code == asapo::Opcode::kOpcodeTransferDatasetData) {
         EXPECT_CALL(*mock_request, GetCustomData_t()).Times(2).
         WillRepeatedly(Return(expected_custom_data))
         ;
@@ -189,11 +190,14 @@ void DbWriterHandlerTests::ExpectRequestParams(asapo::Opcode op_code, const std:
 
 
 
-MessageMeta DbWriterHandlerTests::PrepareMessageMeta() {
+MessageMeta DbWriterHandlerTests::PrepareMessageMeta(bool substream) {
     MessageMeta message_meta;
     message_meta.size = expected_file_size;
     message_meta.name = expected_file_name;
     message_meta.id = expected_id;
+    if (substream) {
+        message_meta.dataset_substream = expected_substream;
+    }
     message_meta.buf_id = expected_buf_id;
     message_meta.source = expected_host_ip + ":" + std::to_string(expected_port);
     message_meta.metadata = expected_metadata;
@@ -223,14 +227,14 @@ TEST_F(DbWriterHandlerTests, CallsInsert) {
     handler.ProcessRequest(mock_request.get());
 }
 
-TEST_F(DbWriterHandlerTests, CallsInsertSubset) {
+TEST_F(DbWriterHandlerTests, CallsInsertDataset) {
 
-    ExpectRequestParams(asapo::Opcode::kOpcodeTransferSubsetData, expected_data_source);
-    auto message_meta = PrepareMessageMeta();
+    ExpectRequestParams(asapo::Opcode::kOpcodeTransferDatasetData, expected_data_source);
+    auto message_meta = PrepareMessageMeta(true);
 
 
-    EXPECT_CALL(mock_db, InsertAsSubset_t(expected_collection_name, CompareMessageMeta(message_meta), expected_subset_id,
-                                          expected_subset_size, false)).
+    EXPECT_CALL(mock_db, InsertAsDatasetMessage_t(expected_collection_name, CompareMessageMeta(message_meta),
+                                                  expected_dataset_size, false)).
     WillOnce(testing::Return(   nullptr));
     ExpectLogger();
 
diff --git a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py
index f99966537..c0858795a 100644
--- a/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py
+++ b/tests/automatic/bug_fixes/error-sending-data-using-callback-method/bugfix_callback.py
@@ -18,7 +18,7 @@ class AsapoSender:
         self.ingest_mode = asapo_producer.DEFAULT_INGEST_MODE
         self._n_queued = 8
     def send(self, data, metadata):
-        self.producer.send_data(
+        self.producer.send(
                 metadata['_id'], metadata['name'], data,
                 ingest_mode=self.ingest_mode,
                 callback=self._callback)
diff --git a/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in
index 0bb40a0f2..488577ab4 100644
--- a/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in
+++ b/tests/automatic/bug_fixes/producer_send_after_restart/test.json.in
@@ -11,7 +11,7 @@
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"none"
  }
 
diff --git a/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in
index 537e5e0ad..6714f39e6 100644
--- a/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in
+++ b/tests/automatic/bug_fixes/receiver_cpu_usage/test.json.in
@@ -11,7 +11,7 @@
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"none"
  }
 }
diff --git a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py
index 3949f00b2..5f48b974a 100644
--- a/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py
+++ b/tests/automatic/full_chain/send_recv_streams_python/send_recv_streams.py
@@ -34,7 +34,7 @@ group_id  = consumer.generate_group_id()
 n_send = 10
 
 for i in range(n_send):
-    producer.send_data(i+1, "name"+str(i),None,ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY,stream = "stream", callback = callback)
+    producer.send(i+1, "name"+str(i),None,ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY,stream = "stream", callback = callback)
 
 producer.send_stream_finished_flag("stream", 10, next_stream = "next_stream", callback = callback)
 producer.wait_requests_finished(timeout)
diff --git a/tests/automatic/full_chain/simple_chain_filegen/test.json.in b/tests/automatic/full_chain/simple_chain_filegen/test.json.in
index e0979a465..fef76848b 100644
--- a/tests/automatic/full_chain/simple_chain_filegen/test.json.in
+++ b/tests/automatic/full_chain/simple_chain_filegen/test.json.in
@@ -11,7 +11,7 @@
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
-  "Subset": {
+  "Dataset": {
    	"Mode":"none"
   }
 
diff --git a/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in
index eed5e4c01..a1053cd5b 100644
--- a/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in
+++ b/tests/automatic/full_chain/simple_chain_filegen_batches/test.json.in
@@ -11,7 +11,7 @@
  "WhitelistExtensions":[],
  "RemoveAfterSend":false,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
  	"Mode":"batch",
   	"BatchSize":3
  }
diff --git a/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in
index fae62b12f..5f32f629e 100644
--- a/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in
+++ b/tests/automatic/full_chain/simple_chain_filegen_multisource/test.json.in
@@ -11,7 +11,7 @@
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
  	"Mode":"multisource",
   	"SourceId":@ID@,
   	"NSources":2
diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in
index 0bb40a0f2..488577ab4 100644
--- a/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in
+++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_cache/test.json.in
@@ -11,7 +11,7 @@
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"none"
  }
 
diff --git a/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in b/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in
index 0bb40a0f2..488577ab4 100644
--- a/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in
+++ b/tests/automatic/full_chain/simple_chain_filegen_readdata_file/test.json.in
@@ -11,7 +11,7 @@
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"none"
  }
 
diff --git a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp
index b10952d39..a19a182b1 100644
--- a/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp
+++ b/tests/automatic/mongo_db/insert_retrieve_dataset/insert_retrieve_dataset_mongodb.cpp
@@ -37,34 +37,32 @@ int main(int argc, char* argv[]) {
     asapo::MessageMeta fi;
     fi.size = 100;
     fi.name = "relpath/1";
-    uint64_t subset_id = args.file_id;
     fi.timestamp = std::chrono::system_clock::now();
     fi.buf_id = 18446744073709551615ull;
     fi.source = "host:1234";
-    fi.id = 10;
+    fi.id = args.file_id;
+    fi.dataset_substream = 10;
 
-    uint64_t subset_size = 2;
+    uint64_t dataset_size = 2;
 
     if (args.keyword != "Notconnected") {
         db.Connect("127.0.0.1", "data");
     }
 
-    auto err =  db.InsertAsSubset("test", fi, subset_id, subset_size, true);
+    auto err =  db.InsertAsDatasetMessage("test", fi, dataset_size, true);
 
 
     if (args.keyword == "DuplicateID") {
         Assert(err, "OK");
-        fi.id = 2;
-        err =  db.InsertAsSubset("test", fi, subset_id, subset_size, true);
-//        Assert(err, "OK");
-        err =  db.InsertAsSubset("test", fi, subset_id, subset_size, false);
+        err =  db.InsertAsDatasetMessage("test", fi, dataset_size, true);
+        err =  db.InsertAsDatasetMessage("test", fi, dataset_size, false);
     }
 
     Assert(err, args.keyword);
 
     if (args.keyword == "OK") { // check retrieve
         asapo::MessageMeta fi_db;
-        err = db.GetDataSetById("test", fi.id,subset_id, &fi_db);
+        err = db.GetDataSetById("test", fi.dataset_substream,fi.id, &fi_db);
         M_AssertTrue(fi_db == fi, "get record from db");
         M_AssertEq(nullptr, err);
         err = db.GetDataSetById("test", 0, 0, &fi_db);
diff --git a/tests/automatic/producer/file_monitor_producer/test.json.in b/tests/automatic/producer/file_monitor_producer/test.json.in
index 88f1e6e55..fa4d102c2 100644
--- a/tests/automatic/producer/file_monitor_producer/test.json.in
+++ b/tests/automatic/producer/file_monitor_producer/test.json.in
@@ -11,7 +11,7 @@
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"none"
  }
 
diff --git a/tests/automatic/producer/python_api/producer_api.py b/tests/automatic/producer/python_api/producer_api.py
index 3f6e54555..eb14bca76 100644
--- a/tests/automatic/producer/python_api/producer_api.py
+++ b/tests/automatic/producer/python_api/producer_api.py
@@ -49,10 +49,10 @@ producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_sou
 producer.send_file(10, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file10",
                    user_meta='{"test_key":"test_val"}', callback=None)
 
-# send subsets
-producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file2", subset=(1, 2),
+# send datasets
+producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file2", dataset=(1, 2),
                    user_meta='{"test_key":"test_val"}', callback=callback)
-producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file3", subset=(2, 2),
+producer.send_file(2, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file3", dataset=(2, 2),
                    user_meta='{"test_key":"test_val"}', callback=callback)
 
 # send meta only
@@ -62,27 +62,27 @@ producer.send_file(3, local_path="./not_exist", exposed_path="./whatever",
 data = np.arange(10, dtype=np.float64)
 
 # send data from array
-producer.send_data(4, "processed/" + data_source + "/" + "file5", data,
+producer.send(4, "processed/" + data_source + "/" + "file5", data,
                    ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
 
 # send data from string
-producer.send_data(5, "processed/" + data_source + "/" + "file6", b"hello",
+producer.send(5, "processed/" + data_source + "/" + "file6", b"hello",
                    ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
 
 # send metadata only
-producer.send_data(6, "processed/" + data_source + "/" + "file7", None,
+producer.send(6, "processed/" + data_source + "/" + "file7", None,
                    ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback=callback)
 
 # send single file/wrong filename
 producer.send_file(1, local_path="./file2", exposed_path="processed/" + data_source + "/" + "file1", callback=callback)
 
 x = np.array([[1, 2, 3], [4, 5, 6]], np.float32)
-producer.send_data(8, "processed/" + data_source + "/" + "file8", x,
+producer.send(8, "processed/" + data_source + "/" + "file8", x,
                    ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
 
 try:
     x = x.T
-    producer.send_data(8, "processed/" + data_source + "/" + "file8", x,
+    producer.send(8, "processed/" + data_source + "/" + "file8", x,
                        ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
 except asapo_producer.AsapoWrongInputError as e:
     print(e)
@@ -91,7 +91,7 @@ else:
     sys.exit(1)
 
 try:
-    producer.send_data(0, "processed/" + data_source + "/" + "file6", b"hello",
+    producer.send(0, "processed/" + data_source + "/" + "file6", b"hello",
                        ingest_mode=asapo_producer.DEFAULT_INGEST_MODE, callback=callback)
 except asapo_producer.AsapoWrongInputError as e:
     print(e)
@@ -100,7 +100,7 @@ else:
     sys.exit(1)
 
 # send to another stream
-producer.send_data(1, "processed/" + data_source + "/" + "file9", None,
+producer.send(1, "processed/" + data_source + "/" + "file9", None,
                    ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream", callback=callback)
 
 # wait normal requests finished before sending duplicates
@@ -111,13 +111,13 @@ producer.wait_requests_finished(50000)
 producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file1",
                    user_meta='{"test_key":"test_val"}', callback=callback)
 # send metadata only once again
-producer.send_data(6, "processed/" + data_source + "/" + "file7", None,
+producer.send(6, "processed/" + data_source + "/" + "file7", None,
                    ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback=callback)
 
 # send same id different data
 producer.send_file(1, local_path="./file1", exposed_path="processed/" + data_source + "/" + "file1",
                    user_meta='{"test_key1":"test_val"}', callback=callback)  # send same id different data
-producer.send_data(6, "processed/" + data_source + "/" + "file8", None,
+producer.send(6, "processed/" + data_source + "/" + "file8", None,
                    ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback=callback)
 
 # send same id without writing to database, should success
@@ -130,7 +130,7 @@ n = producer.get_requests_queue_size()
 assert_eq(n, 0, "requests in queue")
 
 # send to another data to stream stream
-producer.send_data(2, "processed/" + data_source + "/" + "file10", None,
+producer.send(2, "processed/" + data_source + "/" + "file10", None,
                    ingest_mode=asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, stream="stream", callback=callback)
 
 producer.wait_requests_finished(50000)
diff --git a/tests/manual/python_tests/producer/test.py b/tests/manual/python_tests/producer/test.py
index 3e6dc6f95..403dc1d46 100644
--- a/tests/manual/python_tests/producer/test.py
+++ b/tests/manual/python_tests/producer/test.py
@@ -35,9 +35,9 @@ producer.set_log_level("info")
 producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
 
 
-#send subsets
-producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
-producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+#send datasets
+producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
 
 #send meta only
 producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
@@ -46,25 +46,25 @@ producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
 data = np.arange(10,dtype=np.float64)
 
 #send data from array
-producer.send_data(4, data_source+"/"+"file5",data,
+producer.send(4, data_source+"/"+"file5",data,
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 #send data from string
-err = producer.send_data(5, data_source+"/"+"file6",b"hello",
+err = producer.send(5, data_source+"/"+"file6",b"hello",
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 #send metadata only
-producer.send_data(6, data_source+"/"+"file7",None,
+producer.send(6, data_source+"/"+"file7",None,
                          ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
 
 
 x = np.array([[1, 2, 3], [4, 5, 6]], np.float32)
-producer.send_data(4, data_source+"/"+"file5",x,
+producer.send(4, data_source+"/"+"file5",x,
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 try:
 	x = x.T
-	producer.send_data(4, data_source+"/"+"file5",x,
+	producer.send(4, data_source+"/"+"file5",x,
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 except:
 	pass
diff --git a/tests/manual/python_tests/producer_wait_bug_mongo/test.py b/tests/manual/python_tests/producer_wait_bug_mongo/test.py
index 145c27e04..ead637fdf 100644
--- a/tests/manual/python_tests/producer_wait_bug_mongo/test.py
+++ b/tests/manual/python_tests/producer_wait_bug_mongo/test.py
@@ -35,9 +35,9 @@ producer.set_log_level("debug")
 producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"file1", user_meta = '{"test_key":"test_val"}', callback = callback)
 
 
-#send subsets
-producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
-producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+#send datasets
+producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
 
 #send meta only
 producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
@@ -46,25 +46,25 @@ producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
 data = np.arange(10,dtype=np.float64)
 
 #send data from array
-producer.send_data(4, data_source+"/"+"file5",data,
+producer.send(4, data_source+"/"+"file5",data,
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 #send data from string
-err = producer.send_data(5, data_source+"/"+"file6",b"hello",
+err = producer.send(5, data_source+"/"+"file6",b"hello",
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 #send metadata only
-producer.send_data(6, data_source+"/"+"file7",None,
+producer.send(6, data_source+"/"+"file7",None,
                          ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
 
 
 x = np.array([[1, 2, 3], [4, 5, 6]], np.float32)
-producer.send_data(4, data_source+"/"+"file5",x,
+producer.send(4, data_source+"/"+"file5",x,
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 try:
 	x = x.T
-	producer.send_data(4, data_source+"/"+"file5",x,
+	producer.send(4, data_source+"/"+"file5",x,
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 except:
 	pass
diff --git a/tests/manual/python_tests/producer_wait_threads/producer_api.py b/tests/manual/python_tests/producer_wait_threads/producer_api.py
index 0175bb386..1c19ec7f1 100644
--- a/tests/manual/python_tests/producer_wait_threads/producer_api.py
+++ b/tests/manual/python_tests/producer_wait_threads/producer_api.py
@@ -32,9 +32,9 @@ producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"fi
 #send single file without callback
 producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"file1", user_meta = '{"test_key":"test_val"}',callback=None)
 
-#send subsets
-producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
-producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+#send datasets
+producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
 
 #send meta only
 producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
@@ -43,15 +43,15 @@ producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
 data = np.arange(10,dtype=np.float64)
 
 #send data from array
-producer.send_data(4, data_source+"/"+"file5",data,
+producer.send(4, data_source+"/"+"file5",data,
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 #send data from string
-producer.send_data(5, data_source+"/"+"file6",b"hello",
+producer.send(5, data_source+"/"+"file6",b"hello",
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 #send metadata only
-producer.send_data(6, data_source+"/"+"file7",None,
+producer.send(6, data_source+"/"+"file7",None,
                          ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
 
 producer.wait_requests_finished(1000)
diff --git a/tests/manual/python_tests/producer_wait_threads/test.py b/tests/manual/python_tests/producer_wait_threads/test.py
index bd6cdcc23..75983e6b3 100644
--- a/tests/manual/python_tests/producer_wait_threads/test.py
+++ b/tests/manual/python_tests/producer_wait_threads/test.py
@@ -33,9 +33,9 @@ producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"fi
 producer.send_file(1, local_path = "./file1", exposed_path = data_source+"/"+"file1", user_meta = '{"test_key":"test_val"}')
 
 
-#send subsets
-producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
-producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",subset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+#send datasets
+producer.send_file(2, local_path = "./file1", exposed_path = data_source+"/"+"file2",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
+producer.send_file(3, local_path = "./file1", exposed_path = data_source+"/"+"file3",dataset=(2,2),user_meta = '{"test_key":"test_val"}', callback = callback)
 
 #send meta only
 producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
@@ -44,15 +44,15 @@ producer.send_file(3, local_path = "./not_exist",exposed_path = "./whatever",
 data = np.arange(10,dtype=np.float64)
 
 #send data from array
-producer.send_data(4, data_source+"/"+"file5",data,
+producer.send(4, data_source+"/"+"file5",data,
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 #send data from string
-producer.send_data(5, data_source+"/"+"file6",b"hello",
+producer.send(5, data_source+"/"+"file6",b"hello",
                          ingest_mode = asapo_producer.DEFAULT_INGEST_MODE, callback = callback)
 
 #send metadata only
-producer.send_data(6, data_source+"/"+"file7",None,
+producer.send(6, data_source+"/"+"file7",None,
                          ingest_mode = asapo_producer.INGEST_MODE_TRANSFER_METADATA_ONLY, callback = callback)
 
 producer.wait_requests_finished(1)
diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon.nomad.in
index 91205ea9b..be78fb225 100644
--- a/tests/manual/tests_via_nomad/asapo-test_filemon.nomad.in
+++ b/tests/manual/tests_via_nomad/asapo-test_filemon.nomad.in
@@ -42,7 +42,7 @@ job "asapo-filemon" {
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"none"
  }
 }
@@ -103,7 +103,7 @@ job "asapo-filemon" {
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"none"
  }
 
diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_batch.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_batch.nomad.in
index ec8f6d680..07c84daeb 100644
--- a/tests/manual/tests_via_nomad/asapo-test_filemon_batch.nomad.in
+++ b/tests/manual/tests_via_nomad/asapo-test_filemon_batch.nomad.in
@@ -42,7 +42,7 @@ job "asapo-filemon_batch" {
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"batch",
   	"BatchSize": {{ keyOrDefault "monitor_batch_size" "3" }}
  }
@@ -104,7 +104,7 @@ job "asapo-filemon_batch" {
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"batch",
   	"BatchSize": {{ keyOrDefault "monitor_batch_size" "3" }}
  }
diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_multisource.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_multisource.nomad.in
index 77585d59c..7f7b07825 100644
--- a/tests/manual/tests_via_nomad/asapo-test_filemon_multisource.nomad.in
+++ b/tests/manual/tests_via_nomad/asapo-test_filemon_multisource.nomad.in
@@ -42,7 +42,7 @@ job "asapo-filemon_multisource" {
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"multisource",
   	"SourceId": 1,
   	"NSources":2
@@ -105,7 +105,7 @@ job "asapo-filemon_multisource" {
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"multisource",
   	"SourceId": 2,
   	"NSources":2
diff --git a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in
index 590460341..13c332d53 100644
--- a/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in
+++ b/tests/manual/tests_via_nomad/asapo-test_filemon_producer_tolocal.nomad.in
@@ -42,7 +42,7 @@ job "asapo-produceronly" {
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
-  "Subset": {
+  "Dataset": {
    	"Mode":"none"
   }
 }
@@ -103,7 +103,7 @@ job "asapo-produceronly" {
  "WhitelistExtensions":[],
  "RemoveAfterSend":true,
  "DataSource": "",
- "Subset": {
+ "Dataset": {
   	"Mode":"none"
  }
 
-- 
GitLab