From 3c3f0ef1303b390f7f9f544ea94039c74cfb760b Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 18 Nov 2020 11:01:14 +0100
Subject: [PATCH] incomplete datasets for consumer

---
 CHANGELOG.md                                  |   6 +
 common/cpp/include/common/data_structs.h      |   1 +
 common/cpp/src/data_structs/data_structs.cpp  |   1 +
 .../api/cpp/include/consumer/consumer_error.h |  16 +-
 .../api/cpp/include/consumer/data_broker.h    |  24 +-
 consumer/api/cpp/src/server_data_broker.cpp   | 266 +++---
 consumer/api/cpp/src/server_data_broker.h     |  25 +-
 .../api/cpp/unittests/test_server_broker.cpp  | 897 +++++++++---------
 consumer/api/python/asapo_consumer.pxd        |   9 +-
 consumer/api/python/asapo_consumer.pyx.in     |  37 +-
 10 files changed, 697 insertions(+), 585 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 932a5b354..0a42ce988 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,11 +2,17 @@
 
 FEATURES
 * implemented possibility to send data without writing to database (no need of consecutive indexes, etc. but will not be able to consume such data)
+* allow to return incomplete datasets (wihout error if one sets minimum dataset size, otherwise with "partial data" error)
 
  IMPROVEMENTS
 * Producer API - return original data in callback payload.  
 * Producer API - allow to set queue limits (number of pending requests and/or max memory), reject new requests if reached the limits  
 
+
+BREAKING CHANGES
+* Consumer API - get_next_dataset, get_last_dataset, get_dataset_by_id return dictionary with 'id','expected_size','content' fields, not tuple
+
+
 ## 20.09.1
 
 FEATURES
diff --git a/common/cpp/include/common/data_structs.h b/common/cpp/include/common/data_structs.h
index b44afb983..c17921eb7 100644
--- a/common/cpp/include/common/data_structs.h
+++ b/common/cpp/include/common/data_structs.h
@@ -64,6 +64,7 @@ using IdList = std::vector<uint64_t>;
 
 struct DataSet {
     uint64_t id;
+    uint64_t expected_size;
     FileInfos content;
     bool SetFromJson(const std::string& json_string);
 };
diff --git a/common/cpp/src/data_structs/data_structs.cpp b/common/cpp/src/data_structs/data_structs.cpp
index 72cb5fddf..65c2c4ceb 100644
--- a/common/cpp/src/data_structs/data_structs.cpp
+++ b/common/cpp/src/data_structs/data_structs.cpp
@@ -92,6 +92,7 @@ bool DataSet::SetFromJson(const std::string &json_string) {
     std::vector<std::string> vec_fi_endcoded;
     Error parse_err;
     (parse_err = parser.GetArrayRawStrings("images", &vec_fi_endcoded)) ||
+        (parse_err = parser.GetUInt64("size", &expected_size)) ||
         (parse_err = parser.GetUInt64("_id", &id));
     if (parse_err) {
         *this = old;
diff --git a/consumer/api/cpp/include/consumer/consumer_error.h b/consumer/api/cpp/include/consumer/consumer_error.h
index 2cb3929c5..25531f196 100644
--- a/consumer/api/cpp/include/consumer/consumer_error.h
+++ b/consumer/api/cpp/include/consumer/consumer_error.h
@@ -13,12 +13,19 @@ enum class ConsumerErrorType {
     kUnavailableService,
     kInterruptedTransaction,
     kLocalIOError,
-    kWrongInput
+    kWrongInput,
+    kPartialData
 };
 
 using ConsumerErrorTemplate = ServiceErrorTemplate<ConsumerErrorType, ErrorType::kConsumerError>;
 
 
+class PartialErrorData : public CustomErrorData {
+ public:
+  uint64_t id;
+  uint64_t expected_size;
+};
+
 class ConsumerErrorData : public CustomErrorData {
   public:
     uint64_t id;
@@ -29,6 +36,13 @@ class ConsumerErrorData : public CustomErrorData {
 
 namespace ConsumerErrorTemplates {
 
+
+auto const kPartialData = ConsumerErrorTemplate {
+    "partial data", ConsumerErrorType::kPartialData
+};
+
+
+
 auto const kLocalIOError = ConsumerErrorTemplate {
     "local i/o error", ConsumerErrorType::kLocalIOError
 };
diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h
index e76881e85..51c624c23 100644
--- a/consumer/api/cpp/include/consumer/data_broker.h
+++ b/consumer/api/cpp/include/consumer/data_broker.h
@@ -122,29 +122,35 @@ class DataBroker {
     /*!
       \param err -  will be set to error data cannot be read, nullptr otherwise.
       \param group_id - group id to use.
+      \param substream - substream to use ("" for default).
+      \param min_size - wait until dataset has min_size data tuples (0 for maximum size)
       \return DataSet - information about the dataset
-    */
-    virtual DataSet GetNextDataset(std::string group_id, Error* err) = 0;
-    virtual DataSet GetNextDataset(std::string group_id, std::string substream, Error* err) = 0;
 
-    //! Receive last available completed dataset.
+    */
+    virtual DataSet GetNextDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) = 0;
+    virtual DataSet GetNextDataset(std::string group_id, uint64_t min_size, Error* err) = 0;
+    //! Receive last available dataset which has min_size data tuples.
     /*!
       \param err -  will be set to error data cannot be read, nullptr otherwise.
       \param group_id - group id to use.
+      \param substream - substream to use ("" for default).
+      \param min_size - amount of data tuples in dataset (0 for maximum size)
       \return DataSet - information about the dataset
     */
-    virtual DataSet GetLastDataset(std::string group_id, Error* err) = 0;
-    virtual DataSet GetLastDataset(std::string group_id, std::string substream, Error* err) = 0;
+    virtual DataSet GetLastDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) = 0;
+    virtual DataSet GetLastDataset(std::string group_id, uint64_t min_size, Error* err) = 0;
 
     //! Receive dataset by id.
     /*!
       \param id - dataset id
-      \param err -  will be set to error data cannot be read or dataset is incomplete, nullptr otherwise.
+      \param err -  will be set to error data cannot be read or dataset size less than min_size, nullptr otherwise.
       \param group_id - group id to use.
+      \param substream - substream to use ("" for default).
+      \param min_size - wait until dataset has min_size data tuples (0 for maximum size)
       \return DataSet - information about the dataset
     */
-    virtual DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) = 0;
-    virtual DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, Error* err) = 0;
+    virtual DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, uint64_t min_size, Error* err) = 0;
+    virtual DataSet GetDatasetById(uint64_t id, std::string group_id, uint64_t min_size, Error* err) = 0;
 
     //! Receive single image by id.
     /*!
diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp
index 017f89e26..3a64894ce 100644
--- a/consumer/api/cpp/src/server_data_broker.cpp
+++ b/consumer/api/cpp/src/server_data_broker.cpp
@@ -19,17 +19,42 @@ namespace asapo {
 const std::string ServerDataBroker::kBrokerServiceName = "asapo-broker";
 const std::string ServerDataBroker::kFileTransferServiceName = "asapo-file-transfer";
 
-Error GetNoDataResponseFromJson(const std::string& json_string, ConsumerErrorData* data) {
+Error GetNoDataResponseFromJson(const std::string &json_string, ConsumerErrorData* data) {
     JsonStringParser parser(json_string);
     Error err;
     if ((err = parser.GetUInt64("id", &data->id)) || (err = parser.GetUInt64("id_max", &data->id_max))
-            || (err = parser.GetString("next_substream", &data->next_substream))) {
+        || (err = parser.GetString("next_substream", &data->next_substream))) {
         return err;
     }
     return nullptr;
 }
 
-Error ConsumerErrorFromNoDataResponse(const std::string& response) {
+Error GetPartialDataResponseFromJson(const std::string &json_string, PartialErrorData* data) {
+    Error err;
+    auto parser = JsonStringParser(json_string);
+    uint64_t  id,size;
+    if ((err = parser.GetUInt64("size", &size)) ||
+        (err = parser.GetUInt64("_id", &id))) {
+        return err;
+    }
+    data->id = id;
+    data->expected_size = size;
+    return nullptr;
+}
+
+Error ConsumerErrorFromPartialDataResponse(const std::string &response) {
+    PartialErrorData data;
+    auto parse_error = GetPartialDataResponseFromJson(response, &data);
+    if (parse_error) {
+        return ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response);
+    }
+    auto err = ConsumerErrorTemplates::kPartialData.Generate();
+    PartialErrorData* error_data = new PartialErrorData{data};
+    err->SetCustomData(std::unique_ptr<CustomErrorData>{error_data});
+    return err;
+}
+
+Error ConsumerErrorFromNoDataResponse(const std::string &response) {
     if (response.find("get_record_by_id") != std::string::npos) {
         ConsumerErrorData data;
         auto parse_error = GetNoDataResponseFromJson(response, &data);
@@ -44,41 +69,35 @@ Error ConsumerErrorFromNoDataResponse(const std::string& response) {
             err = ConsumerErrorTemplates::kNoData.Generate();
         }
         ConsumerErrorData* error_data = new ConsumerErrorData{data};
-        err->SetCustomData(std::unique_ptr<CustomErrorData> {error_data});
+        err->SetCustomData(std::unique_ptr<CustomErrorData>{error_data});
         return err;
     }
     return ConsumerErrorTemplates::kNoData.Generate();
 }
 
-Error ConsumerErrorFromHttpCode(const RequestOutput* response, const HttpCode& code) {
+Error ConsumerErrorFromHttpCode(const RequestOutput* response, const HttpCode &code) {
     switch (code) {
-    case HttpCode::OK:
-        return nullptr;
-    case HttpCode::BadRequest:
-        return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string());
-    case HttpCode::Unauthorized:
-        return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string());
-    case HttpCode::InternalServerError:
-        return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string());
-    case HttpCode::NotFound:
-        return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string());
-    case HttpCode::Conflict:
-        return ConsumerErrorFromNoDataResponse(response->to_string());
-    default:
-        return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string());
-    }
-}
-Error ConsumerErrorFromServerError(const Error& server_err) {
+        case HttpCode::OK:return nullptr;
+        case HttpCode::PartialContent:return ConsumerErrorFromPartialDataResponse(response->to_string());
+        case HttpCode::BadRequest:return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string());
+        case HttpCode::Unauthorized:return ConsumerErrorTemplates::kWrongInput.Generate(response->to_string());
+        case HttpCode::InternalServerError:return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string());
+        case HttpCode::NotFound:return ConsumerErrorTemplates::kUnavailableService.Generate(response->to_string());
+        case HttpCode::Conflict:return ConsumerErrorFromNoDataResponse(response->to_string());
+        default:return ConsumerErrorTemplates::kInterruptedTransaction.Generate(response->to_string());
+    }
+}
+Error ConsumerErrorFromServerError(const Error &server_err) {
     if (server_err == HttpErrorTemplates::kTransferError) {
         return ConsumerErrorTemplates::kInterruptedTransaction.Generate(
-                   "error processing request: " + server_err->Explain());
+            "error processing request: " + server_err->Explain());
     } else {
         return ConsumerErrorTemplates::kUnavailableService.Generate(
-                   "error processing request: " + server_err->Explain());
+            "error processing request: " + server_err->Explain());
     }
 }
 
-Error ProcessRequestResponce(const Error& server_err, const RequestOutput* response, const HttpCode& code) {
+Error ProcessRequestResponce(const Error &server_err, const RequestOutput* response, const HttpCode &code) {
     if (server_err != nullptr) {
         return ConsumerErrorFromServerError(server_err);
     }
@@ -113,41 +132,39 @@ NetworkConnectionType ServerDataBroker::CurrentConnectionType() const {
     return current_connection_type_;
 }
 
-
 std::string ServerDataBroker::RequestWithToken(std::string uri) {
     return std::move(uri) + "?token=" + source_credentials_.user_token;
 }
 
-Error ServerDataBroker::ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) {
+Error ServerDataBroker::ProcessPostRequest(const RequestInfo &request, RequestOutput* response, HttpCode* code) {
     Error err;
     switch (request.output_mode) {
-    case OutputDataMode::string:
-        response->string_output =
-            httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params,
-                               request.cookie,
-                               request.body,
-                               code,
-                               &err);
-        break;
-    case OutputDataMode::array:
-        err =
-            httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie,
-                               request.body, &response->data_output, response->data_output_size, code);
-        break;
-    default:
-        break;
+        case OutputDataMode::string:
+            response->string_output =
+                httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params,
+                                   request.cookie,
+                                   request.body,
+                                   code,
+                                   &err);
+            break;
+        case OutputDataMode::array:
+            err =
+                httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie,
+                                   request.body, &response->data_output, response->data_output_size, code);
+            break;
+        default:break;
     }
     return err;
 }
 
-Error ServerDataBroker::ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) {
+Error ServerDataBroker::ProcessGetRequest(const RequestInfo &request, RequestOutput* response, HttpCode* code) {
     Error err;
     response->string_output =
         httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, code, &err);
     return err;
 }
 
-Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri) {
+Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInfo &request, std::string* service_uri) {
     Error err;
     HttpCode code;
     if (request.post) {
@@ -161,7 +178,7 @@ Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInf
     return ProcessRequestResponce(err, response, code);
 }
 
-Error ServerDataBroker::DiscoverService(const std::string& service_name, std::string* uri_to_set) {
+Error ServerDataBroker::DiscoverService(const std::string &service_name, std::string* uri_to_set) {
     if (!uri_to_set->empty()) {
         return nullptr;
     }
@@ -175,13 +192,28 @@ Error ServerDataBroker::DiscoverService(const std::string& service_name, std::st
     if (err != nullptr || uri_to_set->empty()) {
         uri_to_set->clear();
         return ConsumerErrorTemplates::kUnavailableService.Generate(" on " + endpoint_
-                + (err != nullptr ? ": " + err->Explain()
-                   : ""));
+                                                                        + (err != nullptr ? ": " + err->Explain()
+                                                                                          : ""));
     }
     return nullptr;
 }
 
-bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri) {
+bool ServerDataBroker::SwitchToGetByIdIfPartialData(Error* err,
+                                                    const std::string &response,
+                                                    std::string* redirect_uri) {
+    if (*err == ConsumerErrorTemplates::kPartialData) {
+        auto error_data = static_cast<const PartialErrorData*>((*err)->GetCustomData());
+        if (error_data == nullptr) {
+            *err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response - " + response);
+            return false;
+        }
+        *redirect_uri = std::to_string(error_data->id);
+        return true;
+    }
+    return false;
+}
+
+bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string &response, std::string* redirect_uri) {
     if (*err == ConsumerErrorTemplates::kNoData) {
         auto error_data = static_cast<const ConsumerErrorData*>((*err)->GetCustomData());
         if (error_data == nullptr) {
@@ -194,33 +226,34 @@ bool ServerDataBroker::SwitchToGetByIdIfNoData(Error* err, const std::string& re
     return false;
 }
 
-RequestInfo ServerDataBroker::PrepareRequestInfo(std::string api_url, bool dataset) {
+RequestInfo ServerDataBroker::PrepareRequestInfo(std::string api_url, bool dataset, uint64_t min_size) {
     RequestInfo ri;
     ri.host = current_broker_uri_;
     ri.api = std::move(api_url);
     if (dataset) {
         ri.extra_params = "&dataset=true";
+        ri.extra_params += "&minsize="+std::to_string(min_size);
     }
     return ri;
 }
 
 Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, std::string substream,
                                             GetImageServerOperation op,
-                                            bool dataset) {
+                                            bool dataset, uint64_t min_size) {
     std::string request_suffix = OpToUriCmd(op);
     std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream
-                              + "/" + std::move(substream) +
-                              +"/" + std::move(group_id) + "/";
+        + "/" + std::move(substream) +
+        +"/" + std::move(group_id) + "/";
     uint64_t elapsed_ms = 0;
     Error no_data_error;
     while (true) {
         auto start = system_clock::now();
         auto err = DiscoverService(kBrokerServiceName, &current_broker_uri_);
         if (err == nullptr) {
-            auto ri = PrepareRequestInfo(request_api + request_suffix, dataset);
+            auto ri = PrepareRequestInfo(request_api + request_suffix, dataset, min_size);
             if (request_suffix == "next" && resend_) {
                 ri.extra_params = ri.extra_params + "&resend_nacks=true" + "&delay_sec=" +
-                                  std::to_string(delay_sec_) + "&resend_attempts=" + std::to_string(resend_attempts_);
+                    std::to_string(delay_sec_) + "&resend_attempts=" + std::to_string(resend_attempts_);
             }
             RequestOutput output;
             err = ProcessRequest(&output, ri, &current_broker_uri_);
@@ -234,8 +267,13 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g
             return err;
         }
 
+        if (err == ConsumerErrorTemplates::kPartialData) {
+            return err;
+        }
+
         if (request_suffix == "next") {
-            auto save_error = SwitchToGetByIdIfNoData(&err, *response, &request_suffix);
+            auto save_error = SwitchToGetByIdIfNoData(&err, *response, &request_suffix)
+                || SwitchToGetByIdIfPartialData(&err, *response, &request_suffix);
             if (err == ConsumerErrorTemplates::kInterruptedTransaction) {
                 return err;
             }
@@ -280,12 +318,9 @@ Error ServerDataBroker::GetLast(FileInfo* info, std::string group_id, std::strin
 
 std::string ServerDataBroker::OpToUriCmd(GetImageServerOperation op) {
     switch (op) {
-    case GetImageServerOperation::GetNext:
-        return "next";
-    case GetImageServerOperation::GetLast:
-        return "last";
-    default:
-        return "last";
+        case GetImageServerOperation::GetNext:return "next";
+        case GetImageServerOperation::GetLast:return "last";
+        default:return "last";
     }
 }
 
@@ -406,7 +441,7 @@ std::string ServerDataBroker::GenerateNewGroupId(Error* err) {
     return BrokerRequestWithTimeout(ri, err);
 }
 
-Error ServerDataBroker::ServiceRequestWithTimeout(const std::string& service_name,
+Error ServerDataBroker::ServiceRequestWithTimeout(const std::string &service_name,
                                                   std::string* service_uri,
                                                   RequestInfo request,
                                                   RequestOutput* response) {
@@ -489,7 +524,7 @@ Error ServerDataBroker::ResetLastReadMarker(std::string group_id, std::string su
 Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id, std::string substream) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/"
-             + std::move(substream) + "/" + std::move(group_id) + "/resetcounter";
+        + std::move(substream) + "/" + std::move(group_id) + "/resetcounter";
     ri.extra_params = "&value=" + std::to_string(value);
     ri.post = true;
 
@@ -501,7 +536,7 @@ Error ServerDataBroker::SetLastReadMarker(uint64_t value, std::string group_id,
 uint64_t ServerDataBroker::GetCurrentSize(std::string substream, Error* err) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
-             +"/" + std::move(substream) + "/size";
+        +"/" + std::move(substream) + "/size";
     auto responce = BrokerRequestWithTimeout(ri, err);
     if (*err) {
         return 0;
@@ -536,14 +571,15 @@ Error ServerDataBroker::GetById(uint64_t id,
 
 Error ServerDataBroker::GetRecordFromServerById(uint64_t id, std::string* response, std::string group_id,
                                                 std::string substream,
-                                                bool dataset) {
+                                                bool dataset, uint64_t min_size) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
-             +"/" + std::move(substream) +
-             "/" + std::move(
-                 group_id) + "/" + std::to_string(id);
+        +"/" + std::move(substream) +
+        "/" + std::move(
+        group_id) + "/" + std::to_string(id);
     if (dataset) {
         ri.extra_params += "&dataset=true";
+        ri.extra_params += "&minsize="+std::to_string(min_size);
     }
 
     Error err;
@@ -558,13 +594,12 @@ std::string ServerDataBroker::GetBeamtimeMeta(Error* err) {
     return BrokerRequestWithTimeout(ri, err);
 }
 
-DataSet ServerDataBroker::DecodeDatasetFromResponse(std::string response, Error* err) {
+DataSet DecodeDatasetFromResponse(std::string response, Error* err) {
     DataSet res;
     if (!res.SetFromJson(std::move(response))) {
         *err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("malformed response:" + response);
-        return {0, FileInfos{}};
+        return {0,0,FileInfos{}};
     } else {
-        *err = nullptr;
         return res;
     }
 }
@@ -572,7 +607,7 @@ DataSet ServerDataBroker::DecodeDatasetFromResponse(std::string response, Error*
 FileInfos ServerDataBroker::QueryImages(std::string query, std::string substream, Error* err) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
-             "/" + std::move(substream) + "/0/queryimages";
+        "/" + std::move(substream) + "/0/queryimages";
     ri.post = true;
     ri.body = std::move(query);
 
@@ -581,7 +616,7 @@ FileInfos ServerDataBroker::QueryImages(std::string query, std::string substream
         return FileInfos{};
     }
 
-    auto dataset = DecodeDatasetFromResponse("{\"_id\":0, \"images\":" + response + "}", err);
+    auto dataset = DecodeDatasetFromResponse("{\"_id\":0,\"size\":0, \"images\":" + response + "}", err);
     return dataset.content;
 }
 
@@ -589,45 +624,46 @@ FileInfos ServerDataBroker::QueryImages(std::string query, Error* err) {
     return QueryImages(std::move(query), kDefaultSubstream, err);
 }
 
-DataSet ServerDataBroker::GetNextDataset(std::string group_id, Error* err) {
-    return GetNextDataset(std::move(group_id), kDefaultSubstream, err);
+DataSet ServerDataBroker::GetNextDataset(std::string group_id, uint64_t min_size, Error* err) {
+    return GetNextDataset(std::move(group_id), kDefaultSubstream, min_size, err);
 }
 
-DataSet ServerDataBroker::GetNextDataset(std::string group_id, std::string substream, Error* err) {
-    return GetDatasetFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), std::move(substream), err);
+DataSet ServerDataBroker::GetNextDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) {
+    return GetDatasetFromServer(GetImageServerOperation::GetNext, 0, std::move(group_id), std::move(substream),min_size, err);
 }
 
-DataSet ServerDataBroker::GetLastDataset(std::string group_id, std::string substream, Error* err) {
-    return GetDatasetFromServer(GetImageServerOperation::GetLast, 0, std::move(group_id), std::move(substream), err);
+DataSet ServerDataBroker::GetLastDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) {
+    return GetDatasetFromServer(GetImageServerOperation::GetLast, 0, std::move(group_id), std::move(substream),min_size, err);
 }
 
-DataSet ServerDataBroker::GetLastDataset(std::string group_id, Error* err) {
-    return GetLastDataset(std::move(group_id), kDefaultSubstream, err);
+DataSet ServerDataBroker::GetLastDataset(std::string group_id, uint64_t min_size, Error* err) {
+    return GetLastDataset(std::move(group_id), kDefaultSubstream, min_size, err);
 }
 
 DataSet ServerDataBroker::GetDatasetFromServer(GetImageServerOperation op,
                                                uint64_t id,
                                                std::string group_id, std::string substream,
+                                               uint64_t min_size,
                                                Error* err) {
     FileInfos infos;
     std::string response;
     if (op == GetImageServerOperation::GetID) {
-        *err = GetRecordFromServerById(id, &response, std::move(group_id), std::move(substream), true);
+        *err = GetRecordFromServerById(id, &response, std::move(group_id), std::move(substream), true, min_size);
     } else {
-        *err = GetRecordFromServer(&response, std::move(group_id), std::move(substream), op, true);
+        *err = GetRecordFromServer(&response, std::move(group_id), std::move(substream), op, true, min_size);
     }
-    if (*err != nullptr) {
-        return {0, FileInfos{}};
+    if (*err != nullptr && *err!=ConsumerErrorTemplates::kPartialData) {
+        return {0, 0,FileInfos{}};
     }
     return DecodeDatasetFromResponse(response, err);
 }
 
-DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, Error* err) {
-    return GetDatasetById(id, std::move(group_id), kDefaultSubstream, err);
+DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, uint64_t min_size, Error* err) {
+    return GetDatasetById(id, std::move(group_id), kDefaultSubstream, min_size, err);
 }
 
-DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, std::string substream, Error* err) {
-    return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), std::move(substream), err);
+DataSet ServerDataBroker::GetDatasetById(uint64_t id, std::string group_id, std::string substream, uint64_t min_size, Error* err) {
+    return GetDatasetFromServer(GetImageServerOperation::GetID, id, std::move(group_id), std::move(substream), min_size, err);
 }
 
 StreamInfos ParseSubstreamsFromResponse(std::string response, Error* err) {
@@ -637,14 +673,14 @@ StreamInfos ParseSubstreamsFromResponse(std::string response, Error* err) {
     Error parse_err;
     *err = parser.GetArrayRawStrings("substreams", &substreams_endcoded);
     if (*err) {
-        return StreamInfos {};
+        return StreamInfos{};
     }
     for (auto substream_encoded : substreams_endcoded) {
         StreamInfo si;
-        auto ok = si.SetFromJson(substream_encoded,false);
+        auto ok = si.SetFromJson(substream_encoded, false);
         if (!ok) {
-            *err = TextError("cannot parse "+substream_encoded);
-            return StreamInfos {};
+            *err = TextError("cannot parse " + substream_encoded);
+            return StreamInfos{};
         }
         substreams.emplace_back(si);
     }
@@ -657,12 +693,12 @@ StreamInfos ServerDataBroker::GetSubstreamList(std::string from, Error* err) {
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream + "/0/substreams";
     ri.post = false;
     if (!from.empty()) {
-        ri.extra_params="&from=" + from;
+        ri.extra_params = "&from=" + from;
     }
 
     auto response = BrokerRequestWithTimeout(ri, err);
     if (*err) {
-        return StreamInfos {};
+        return StreamInfos{};
     }
 
     return ParseSubstreamsFromResponse(std::move(response), err);
@@ -691,13 +727,13 @@ RequestInfo ServerDataBroker::CreateFolderTokenRequest() const {
     ri.post = true;
     ri.body =
         "{\"Folder\":\"" + source_path_ + "\",\"BeamtimeId\":\"" + source_credentials_.beamtime_id + "\",\"Token\":\""
-        +
-        source_credentials_.user_token + "\"}";
+            +
+                source_credentials_.user_token + "\"}";
     return ri;
 }
 
 Error ServerDataBroker::GetDataFromFileTransferService(FileInfo* info, FileData* data,
-        bool retry_with_new_token) {
+                                                       bool retry_with_new_token) {
     auto err = UpdateFolderTokenIfNeeded(retry_with_new_token);
     if (err) {
         return err;
@@ -706,7 +742,7 @@ Error ServerDataBroker::GetDataFromFileTransferService(FileInfo* info, FileData*
     if (info->size == 0) {
         err = FtsSizeRequestWithTimeout(info);
         if (err == ConsumerErrorTemplates::kWrongInput
-                && !retry_with_new_token) { // token expired? Refresh token and try again.
+            && !retry_with_new_token) { // token expired? Refresh token and try again.
             return GetDataFromFileTransferService(info, data, true);
         }
         if (err) {
@@ -716,7 +752,7 @@ Error ServerDataBroker::GetDataFromFileTransferService(FileInfo* info, FileData*
 
     err = FtsRequestWithTimeout(info, data);
     if (err == ConsumerErrorTemplates::kWrongInput
-            && !retry_with_new_token) { // token expired? Refresh token and try again.
+        && !retry_with_new_token) { // token expired? Refresh token and try again.
         return GetDataFromFileTransferService(info, data, true);
     }
     return err;
@@ -725,8 +761,8 @@ Error ServerDataBroker::GetDataFromFileTransferService(FileInfo* info, FileData*
 Error ServerDataBroker::Acknowledge(std::string group_id, uint64_t id, std::string substream) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
-             +"/" + std::move(substream) +
-             "/" + std::move(group_id) + "/" + std::to_string(id);
+        +"/" + std::move(substream) +
+        "/" + std::move(group_id) + "/" + std::to_string(id);
     ri.post = true;
     ri.body = "{\"Op\":\"ackimage\"}";
 
@@ -736,14 +772,14 @@ Error ServerDataBroker::Acknowledge(std::string group_id, uint64_t id, std::stri
 }
 
 IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id,
-        std::string substream,
-        uint64_t from_id,
-        uint64_t to_id,
-        Error* error) {
+                                                   std::string substream,
+                                                   uint64_t from_id,
+                                                   uint64_t to_id,
+                                                   Error* error) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
-             +"/" + std::move(substream) +
-             "/" + std::move(group_id) + "/nacks";
+        +"/" + std::move(substream) +
+        "/" + std::move(group_id) + "/nacks";
     ri.extra_params = "&from=" + std::to_string(from_id) + "&to=" + std::to_string(to_id);
 
     auto json_string = BrokerRequestWithTimeout(ri, error);
@@ -761,17 +797,17 @@ IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id,
 }
 
 IdList ServerDataBroker::GetUnacknowledgedTupleIds(std::string group_id,
-        uint64_t from_id,
-        uint64_t to_id,
-        Error* error) {
+                                                   uint64_t from_id,
+                                                   uint64_t to_id,
+                                                   Error* error) {
     return GetUnacknowledgedTupleIds(std::move(group_id), kDefaultSubstream, from_id, to_id, error);
 }
 
 uint64_t ServerDataBroker::GetLastAcknowledgedTulpeId(std::string group_id, std::string substream, Error* error) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
-             +"/" + std::move(substream) +
-             "/" + std::move(group_id) + "/lastack";
+        +"/" + std::move(substream) +
+        "/" + std::move(group_id) + "/lastack";
 
     auto json_string = BrokerRequestWithTimeout(ri, error);
     if (*error) {
@@ -806,8 +842,8 @@ Error ServerDataBroker::NegativeAcknowledge(std::string group_id,
                                             std::string substream) {
     RequestInfo ri;
     ri.api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream +
-             +"/" + std::move(substream) +
-             "/" + std::move(group_id) + "/" + std::to_string(id);
+        +"/" + std::move(substream) +
+        "/" + std::move(group_id) + "/" + std::to_string(id);
     ri.post = true;
     ri.body = R"({"Op":"negackimage","Params":{"DelaySec":)" + std::to_string(delay_sec) + "}}";
 
diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h
index d58c69df7..298201591 100644
--- a/consumer/api/cpp/src/server_data_broker.h
+++ b/consumer/api/cpp/src/server_data_broker.h
@@ -47,7 +47,8 @@ struct RequestOutput {
 
 Error ProcessRequestResponce(const Error& server_err, const RequestOutput* response, const HttpCode& code);
 Error ConsumerErrorFromNoDataResponse(const std::string& response);
-
+Error ConsumerErrorFromPartialDataResponse(const std::string& response);
+DataSet DecodeDatasetFromResponse(std::string response, Error* err);
 
 class ServerDataBroker final : public asapo::DataBroker {
   public:
@@ -98,14 +99,14 @@ class ServerDataBroker final : public asapo::DataBroker {
     FileInfos QueryImages(std::string query, Error* err) override;
     FileInfos QueryImages(std::string query, std::string substream, Error* err) override;
 
-    DataSet GetNextDataset(std::string group_id, Error* err) override;
-    DataSet GetNextDataset(std::string group_id, std::string substream, Error* err) override;
+    DataSet GetNextDataset(std::string group_id, uint64_t min_size, Error* err) override;
+    DataSet GetNextDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) override;
 
-    DataSet GetLastDataset(std::string group_id, Error* err) override;
-    DataSet GetLastDataset(std::string group_id, std::string substream, Error* err) override;
+    DataSet GetLastDataset(std::string group_id, uint64_t min_size, Error* err) override;
+    DataSet GetLastDataset(std::string group_id, std::string substream, uint64_t min_size, Error* err) override;
 
-    DataSet GetDatasetById(uint64_t id, std::string group_id, Error* err) override;
-    DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, Error* err) override;
+    DataSet GetDatasetById(uint64_t id, std::string group_id, uint64_t min_size, Error* err) override;
+    DataSet GetDatasetById(uint64_t id, std::string group_id, std::string substream, uint64_t min_size, Error* err) override;
 
     Error RetrieveData(FileInfo* info, FileData* data) override;
 
@@ -124,17 +125,18 @@ class ServerDataBroker final : public asapo::DataBroker {
     static const std::string kFileTransferServiceName;
     std::string RequestWithToken(std::string uri);
     Error GetRecordFromServer(std::string* info, std::string group_id, std::string substream, GetImageServerOperation op,
-                              bool dataset = false);
+                              bool dataset = false, uint64_t min_size = 0);
     Error GetRecordFromServerById(uint64_t id, std::string* info, std::string group_id, std::string substream,
-                                  bool dataset = false);
+                                  bool dataset = false, uint64_t min_size = 0);
     Error GetDataIfNeeded(FileInfo* info, FileData* data);
     Error DiscoverService(const std::string& service_name, std::string* uri_to_set);
     bool SwitchToGetByIdIfNoData(Error* err, const std::string& response, std::string* redirect_uri);
+    bool SwitchToGetByIdIfPartialData(Error* err, const std::string& response, std::string* redirect_uri);
     Error ProcessRequest(RequestOutput* response, const RequestInfo& request, std::string* service_uri);
     Error GetImageFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream,
                              FileInfo* info, FileData* data);
     DataSet GetDatasetFromServer(GetImageServerOperation op, uint64_t id, std::string group_id, std::string substream,
-                                 Error* err);
+                                 uint64_t min_size, Error* err);
     bool DataCanBeInBuffer(const FileInfo* info);
     Error TryGetDataFromBuffer(const FileInfo* info, FileData* data);
     Error CreateNetClientAndTryToGetFile(const FileInfo* info, FileData* data);
@@ -146,8 +148,7 @@ class ServerDataBroker final : public asapo::DataBroker {
     Error ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code);
     Error ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code);
 
-    DataSet DecodeDatasetFromResponse(std::string response, Error* err);
-    RequestInfo PrepareRequestInfo(std::string api_url, bool dataset);
+    RequestInfo PrepareRequestInfo(std::string api_url, bool dataset, uint64_t min_size);
     std::string OpToUriCmd(GetImageServerOperation op);
     Error UpdateFolderTokenIfNeeded(bool ignore_existing);
     std::string endpoint_;
diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp
index e1ddc85aa..147af20da 100644
--- a/consumer/api/cpp/unittests/test_server_broker.cpp
+++ b/consumer/api/cpp/unittests/test_server_broker.cpp
@@ -45,9 +45,10 @@ namespace {
 
 TEST(FolderDataBroker, Constructor) {
     auto data_broker =
-    std::unique_ptr<ServerDataBroker> {new ServerDataBroker("test", "path", false,
-                asapo::SourceCredentials{asapo::SourceType::kProcessed,"beamtime_id", "", "", "token"})
-    };
+        std::unique_ptr<ServerDataBroker>{new ServerDataBroker("test", "path", false,
+                                                               asapo::SourceCredentials{asapo::SourceType::kProcessed,
+                                                                                        "beamtime_id", "", "", "token"})
+        };
     ASSERT_THAT(dynamic_cast<asapo::SystemIO*>(data_broker->io__.get()), Ne(nullptr));
     ASSERT_THAT(dynamic_cast<asapo::CurlHttpClient*>(data_broker->httpclient__.get()), Ne(nullptr));
     ASSERT_THAT(data_broker->net_client__.get(), Eq(nullptr));
@@ -56,118 +57,125 @@ TEST(FolderDataBroker, Constructor) {
 const uint8_t expected_value = 1;
 
 class ServerDataBrokerTests : public Test {
-  public:
-    std::unique_ptr<ServerDataBroker> data_broker, fts_data_broker;
-    NiceMock<MockIO> mock_io;
-    NiceMock<MockHttpClient> mock_http_client;
-    NiceMock<MockNetClient> mock_netclient;
-    FileInfo info;
-    std::string expected_server_uri = "test:8400";
-    std::string expected_broker_uri = "asapo-broker:5005";
-    std::string expected_fts_uri = "asapo-file-transfer:5008";
-    std::string expected_token = "token";
-    std::string expected_path = "/tmp/beamline/beamtime";
-    std::string expected_filename = "filename";
-    std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename;
-    std::string expected_group_id = "groupid";
-    std::string expected_stream = "stream";
-    std::string expected_substream = "substream";
-    std::string expected_metadata = "{\"meta\":1}";
-    std::string expected_query_string = "bla";
-    std::string expected_folder_token = "folder_token";
-    std::string expected_beamtime_id = "beamtime_id";
-    uint64_t expected_image_size = 100;
-    uint64_t expected_dataset_id = 1;
-    static const uint64_t expected_buf_id = 123;
-    std::string expected_next_substream = "nextsubstream";
-    std::string expected_fts_query_string = "{\"Folder\":\"" + expected_path + "\",\"FileName\":\"" + expected_filename +
-                                            "\"}";
-    std::string expected_cookie = "Authorization=Bearer " + expected_folder_token;
-
-    void AssertSingleFileTransfer();
-    void SetUp() override {
-        data_broker = std::unique_ptr<ServerDataBroker> {
-            new ServerDataBroker(expected_server_uri, expected_path, true, asapo::SourceCredentials{asapo::SourceType::kProcessed,expected_beamtime_id, "", expected_stream, expected_token})
-        };
-        fts_data_broker = std::unique_ptr<ServerDataBroker> {
-            new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{asapo::SourceType::kProcessed,expected_beamtime_id, "", expected_stream, expected_token})
-        };
-        data_broker->io__ = std::unique_ptr<IO> {&mock_io};
-        data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
-        data_broker->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient};
-        fts_data_broker->io__ = std::unique_ptr<IO> {&mock_io};
-        fts_data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
-        fts_data_broker->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient};
-
-    }
-    void TearDown() override {
-        data_broker->io__.release();
-        data_broker->httpclient__.release();
-        data_broker->net_client__.release();
-        fts_data_broker->io__.release();
-        fts_data_broker->httpclient__.release();
-        fts_data_broker->net_client__.release();
-
-    }
-    void MockGet(const std::string& response) {
-        EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_uri), _, _)).WillOnce(DoAll(
-                    SetArgPointee<1>(HttpCode::OK),
-                    SetArgPointee<2>(nullptr),
-                    Return(response)
-                ));
-    }
-
-    void MockGetError() {
-        EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_uri), _, _)).WillOnce(DoAll(
-                    SetArgPointee<1>(HttpCode::NotFound),
-                    SetArgPointee<2>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
-                    Return("")
-                ));
-    }
-    void MockGetServiceUri(std::string service, std::string result) {
-        EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/" + service), _,
-                                            _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return(result)));
-    }
-
-    void MockBeforeFTS(FileData* data);
-
-    void MockGetFTSUri() {
-        MockGetServiceUri("asapo-file-transfer", expected_fts_uri);
-    }
-
-    void ExpectFolderToken();
-    void ExpectFileTransfer(const asapo::ConsumerErrorTemplate* p_err_template);
-    void ExpectRepeatedFileTransfer();
-    void ExpectIdList(bool error);
-    void ExpectLastAckId(bool empty_response);
-
-    void MockGetBrokerUri() {
-        MockGetServiceUri("asapo-broker", expected_broker_uri);
-    }
-    void MockReadDataFromFile(int times = 1) {
-        if (times == 0) {
-            EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0);
-            return;
-        }
-
-        EXPECT_CALL(mock_io, GetDataFromFile_t(expected_full_path, testing::Pointee(100), _)).Times(times).
-        WillRepeatedly(DoAll(SetArgPointee<2>(new asapo::SimpleError{"s"}), testing::Return(nullptr)));
-    }
-    FileInfo CreateFI(uint64_t buf_id = expected_buf_id) {
-        FileInfo fi;
-        fi.size = expected_image_size;
-        fi.id = 1;
-        fi.buf_id = buf_id;
-        fi.name = expected_filename;
-        fi.timestamp = std::chrono::system_clock::now();
-        return fi;
-    }
+ public:
+  std::unique_ptr<ServerDataBroker> data_broker, fts_data_broker;
+  NiceMock<MockIO> mock_io;
+  NiceMock<MockHttpClient> mock_http_client;
+  NiceMock<MockNetClient> mock_netclient;
+  FileInfo info;
+  std::string expected_server_uri = "test:8400";
+  std::string expected_broker_uri = "asapo-broker:5005";
+  std::string expected_fts_uri = "asapo-file-transfer:5008";
+  std::string expected_token = "token";
+  std::string expected_path = "/tmp/beamline/beamtime";
+  std::string expected_filename = "filename";
+  std::string expected_full_path = std::string("/tmp/beamline/beamtime") + asapo::kPathSeparator + expected_filename;
+  std::string expected_group_id = "groupid";
+  std::string expected_stream = "stream";
+  std::string expected_substream = "substream";
+  std::string expected_metadata = "{\"meta\":1}";
+  std::string expected_query_string = "bla";
+  std::string expected_folder_token = "folder_token";
+  std::string expected_beamtime_id = "beamtime_id";
+  uint64_t expected_image_size = 100;
+  uint64_t expected_dataset_id = 1;
+  static const uint64_t expected_buf_id = 123;
+  std::string expected_next_substream = "nextsubstream";
+  std::string expected_fts_query_string = "{\"Folder\":\"" + expected_path + "\",\"FileName\":\"" + expected_filename +
+      "\"}";
+  std::string expected_cookie = "Authorization=Bearer " + expected_folder_token;
+
+  void AssertSingleFileTransfer();
+  void SetUp() override {
+      data_broker = std::unique_ptr<ServerDataBroker>{
+          new ServerDataBroker(expected_server_uri,
+                               expected_path,
+                               true,
+                               asapo::SourceCredentials{asapo::SourceType::kProcessed, expected_beamtime_id, "",
+                                                        expected_stream, expected_token})
+      };
+      fts_data_broker = std::unique_ptr<ServerDataBroker>{
+          new ServerDataBroker(expected_server_uri,
+                               expected_path,
+                               false,
+                               asapo::SourceCredentials{asapo::SourceType::kProcessed, expected_beamtime_id, "",
+                                                        expected_stream, expected_token})
+      };
+      data_broker->io__ = std::unique_ptr<IO>{&mock_io};
+      data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient>{&mock_http_client};
+      data_broker->net_client__ = std::unique_ptr<asapo::NetClient>{&mock_netclient};
+      fts_data_broker->io__ = std::unique_ptr<IO>{&mock_io};
+      fts_data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient>{&mock_http_client};
+      fts_data_broker->net_client__ = std::unique_ptr<asapo::NetClient>{&mock_netclient};
+
+  }
+  void TearDown() override {
+      data_broker->io__.release();
+      data_broker->httpclient__.release();
+      data_broker->net_client__.release();
+      fts_data_broker->io__.release();
+      fts_data_broker->httpclient__.release();
+      fts_data_broker->net_client__.release();
+
+  }
+  void MockGet(const std::string &response, asapo::HttpCode return_code = HttpCode::OK) {
+      EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_uri), _, _)).WillOnce(DoAll(
+          SetArgPointee<1>(return_code),
+          SetArgPointee<2>(nullptr),
+          Return(response)
+      ));
+  }
+
+  void MockGetError() {
+      EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_broker_uri), _, _)).WillOnce(DoAll(
+          SetArgPointee<1>(HttpCode::NotFound),
+          SetArgPointee<2>(asapo::IOErrorTemplates::kUnknownIOError.Generate().release()),
+          Return("")
+      ));
+  }
+  void MockGetServiceUri(std::string service, std::string result) {
+      EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/" + service), _,
+                                          _)).WillOnce(DoAll(
+          SetArgPointee<1>(HttpCode::OK),
+          SetArgPointee<2>(nullptr),
+          Return(result)));
+  }
+
+  void MockBeforeFTS(FileData* data);
+
+  void MockGetFTSUri() {
+      MockGetServiceUri("asapo-file-transfer", expected_fts_uri);
+  }
+
+  void ExpectFolderToken();
+  void ExpectFileTransfer(const asapo::ConsumerErrorTemplate* p_err_template);
+  void ExpectRepeatedFileTransfer();
+  void ExpectIdList(bool error);
+  void ExpectLastAckId(bool empty_response);
+
+  void MockGetBrokerUri() {
+      MockGetServiceUri("asapo-broker", expected_broker_uri);
+  }
+  void MockReadDataFromFile(int times = 1) {
+      if (times == 0) {
+          EXPECT_CALL(mock_io, GetDataFromFile_t(_, _, _)).Times(0);
+          return;
+      }
+
+      EXPECT_CALL(mock_io, GetDataFromFile_t(expected_full_path, testing::Pointee(100), _)).Times(times).
+          WillRepeatedly(DoAll(SetArgPointee<2>(new asapo::SimpleError{"s"}), testing::Return(nullptr)));
+  }
+  FileInfo CreateFI(uint64_t buf_id = expected_buf_id) {
+      FileInfo fi;
+      fi.size = expected_image_size;
+      fi.id = 1;
+      fi.buf_id = buf_id;
+      fi.name = expected_filename;
+      fi.timestamp = std::chrono::system_clock::now();
+      return fi;
+  }
 };
 
-
 TEST_F(ServerDataBrokerTests, GetImageReturnsErrorOnWrongInput) {
     auto err = data_broker->GetNext(nullptr, "", nullptr);
     ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kWrongInput));
@@ -177,39 +185,42 @@ TEST_F(ServerDataBrokerTests, DefaultStreamIsDetector) {
     data_broker->io__.release();
     data_broker->httpclient__.release();
     data_broker->net_client__.release();
-    data_broker = std::unique_ptr<ServerDataBroker> {
-        new ServerDataBroker(expected_server_uri, expected_path, false, asapo::SourceCredentials{asapo::SourceType::kProcessed,"beamtime_id", "", "", expected_token})
+    data_broker = std::unique_ptr<ServerDataBroker>{
+        new ServerDataBroker(expected_server_uri,
+                             expected_path,
+                             false,
+                             asapo::SourceCredentials{asapo::SourceType::kProcessed, "beamtime_id", "", "",
+                                                      expected_token})
     };
-    data_broker->io__ = std::unique_ptr<IO> {&mock_io};
-    data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient> {&mock_http_client};
-    data_broker->net_client__ = std::unique_ptr<asapo::NetClient> {&mock_netclient};
+    data_broker->io__ = std::unique_ptr<IO>{&mock_io};
+    data_broker->httpclient__ = std::unique_ptr<asapo::HttpClient>{&mock_http_client};
+    data_broker->net_client__ = std::unique_ptr<asapo::NetClient>{&mock_netclient};
 
     MockGetBrokerUri();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/detector/default/" + expected_group_id
-                                        +
-                                        "/next?token="
-                                        + expected_token, _,
-                                        _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_uri + "/database/beamtime_id/detector/default/" + expected_group_id
+                          +
+                              "/next?token="
+                          + expected_token, _,
+                      _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
 
     data_broker->GetNext(&info, expected_group_id, nullptr);
 }
 
-
-
 TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"
-                                        + expected_group_id + "/next?token="
-                                        + expected_token, _,
+                                            + expected_group_id + "/next?token="
+                                            + expected_token, _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
     data_broker->GetNext(&info, expected_group_id, nullptr);
 }
 
@@ -217,12 +228,12 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUriWithSubstream) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                        expected_substream + "/" + expected_group_id + "/next?token="
-                                        + expected_token, _,
+                                            expected_substream + "/" + expected_group_id + "/next?token="
+                                            + expected_token, _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
     data_broker->GetNext(&info, expected_group_id, expected_substream, nullptr);
 }
 
@@ -230,12 +241,12 @@ TEST_F(ServerDataBrokerTests, GetLastUsesCorrectUri) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
-                                        expected_group_id + "/last?token="
-                                        + expected_token, _,
+                                            expected_group_id + "/last?token="
+                                            + expected_token, _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
     data_broker->GetLast(&info, expected_group_id, nullptr);
 }
 
@@ -243,9 +254,9 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEndOfStreamFromHttpClient) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::Conflict),
-                SetArgPointee<2>(nullptr),
-                Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}")));
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"\"}")));
 
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
 
@@ -261,9 +272,10 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsStreamFinishedFromHttpClient) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::Conflict),
-                SetArgPointee<2>(nullptr),
-                Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"" + expected_next_substream + "\"}")));
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"" + expected_next_substream
+                   + "\"}")));
 
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
 
@@ -279,10 +291,9 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataFromHttpClient) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::Conflict),
-                SetArgPointee<2>(nullptr),
-                Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_substream\":\"""\"}")));
-
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":2,\"next_substream\":\"""\"}")));
 
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
     auto err_data = static_cast<const asapo::ConsumerErrorData*>(err->GetCustomData());
@@ -298,9 +309,9 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNotAuthorized) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::Unauthorized),
-                SetArgPointee<2>(nullptr),
-                Return("")));
+        SetArgPointee<1>(HttpCode::Unauthorized),
+        SetArgPointee<2>(nullptr),
+        Return("")));
 
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
 
@@ -312,9 +323,9 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsWrongResponseFromHttpClient) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::Conflict),
-                SetArgPointee<2>(nullptr),
-                Return("id")));
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("id")));
 
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
 
@@ -325,9 +336,9 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsWrongResponseFromHttpClient) {
 TEST_F(ServerDataBrokerTests, GetImageReturnsIfBrokerAddressNotFound) {
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/asapo-broker"), _,
                                         _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
-                                                    SetArgPointee<1>(HttpCode::NotFound),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("")));
+        SetArgPointee<1>(HttpCode::NotFound),
+        SetArgPointee<2>(nullptr),
+        Return("")));
 
     data_broker->SetTimeout(100);
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
@@ -338,9 +349,9 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsIfBrokerAddressNotFound) {
 TEST_F(ServerDataBrokerTests, GetImageReturnsIfBrokerUriEmpty) {
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/asapo-broker"), _,
                                         _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("")));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
 
     data_broker->SetTimeout(100);
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
@@ -356,7 +367,8 @@ TEST_F(ServerDataBrokerTests, GetDoNotCallBrokerUriIfAlreadyFound) {
     data_broker->GetNext(&info, expected_group_id, nullptr);
     Mock::VerifyAndClearExpectations(&mock_http_client);
 
-    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/asap-broker"), _, _)).Times(0);
+    EXPECT_CALL(mock_http_client,
+                Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/asap-broker"), _, _)).Times(0);
     MockGet("error_response");
     data_broker->GetNext(&info, expected_group_id, nullptr);
 }
@@ -378,9 +390,9 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsEofStreamFromHttpClientUntilTimeout
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
-                SetArgPointee<1>(HttpCode::Conflict),
-                SetArgPointee<2>(nullptr),
-                Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"""\"}")));
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"""\"}")));
 
     data_broker->SetTimeout(300);
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
@@ -393,18 +405,17 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorO
     data_broker->SetTimeout(300);
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::Conflict),
-                SetArgPointee<2>(nullptr),
-                Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) +
-                       ",\"id_max\":2,\"next_substream\":\"""\"}")));
-
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
-                                        expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
-                                        + expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
-                                                    SetArgPointee<1>(HttpCode::NotFound),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("")));
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("{\"op\":\"get_record_by_id\",\"id\":" + std::to_string(expected_dataset_id) +
+            ",\"id_max\":2,\"next_substream\":\"""\"}")));
 
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+        expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
+                                            + expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
+        SetArgPointee<1>(HttpCode::NotFound),
+        SetArgPointee<2>(nullptr),
+        Return("")));
 
     data_broker->SetTimeout(300);
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
@@ -412,14 +423,13 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorO
     ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kNoData));
 }
 
-
 TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnTransferError) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::InternalServerError),
-                SetArgPointee<2>(asapo::HttpErrorTemplates::kTransferError.Generate("sss").release()),
-                Return("")));
+        SetArgPointee<1>(HttpCode::InternalServerError),
+        SetArgPointee<2>(asapo::HttpErrorTemplates::kTransferError.Generate("sss").release()),
+        Return("")));
 
     data_broker->SetTimeout(300);
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
@@ -428,23 +438,21 @@ TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnTransferError) {
     ASSERT_THAT(err->Explain(), HasSubstr("sss"));
 }
 
-
 ACTION(AssignArg2) {
     *arg2 = asapo::HttpErrorTemplates::kConnectionError.Generate().release();
 }
 
-
 TEST_F(ServerDataBrokerTests, GetNextRetriesIfConnectionHttpClientErrorUntilTimeout) {
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/asapo-discovery/asapo-broker"), _,
                                         _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return(expected_broker_uri)));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return(expected_broker_uri)));
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
-                SetArgPointee<1>(HttpCode::Conflict),
-                AssignArg2(),
-                Return("")));
+        SetArgPointee<1>(HttpCode::Conflict),
+        AssignArg2(),
+        Return("")));
 
     data_broker->SetTimeout(300);
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
@@ -456,9 +464,9 @@ TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnFinshedSubstream)
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::Conflict),
-                SetArgPointee<2>(nullptr),
-                Return("{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_substream\":\"next\"}")));
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("{\"op\":\"get_record_by_id\",\"id\":2,\"id_max\":2,\"next_substream\":\"next\"}")));
 
     data_broker->SetTimeout(300);
     auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
@@ -543,7 +551,6 @@ TEST_F(ServerDataBrokerTests, GetImageCallsReadFromFileIfZeroBufId) {
 
     FileData data;
 
-
     EXPECT_CALL(mock_netclient, GetData_t(_, _)).Times(0);
 
     MockReadDataFromFile();
@@ -551,14 +558,13 @@ TEST_F(ServerDataBrokerTests, GetImageCallsReadFromFileIfZeroBufId) {
     data_broker->GetNext(&info, expected_group_id, &data);
 }
 
-
 TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsErrorCreateGroup) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Post_t(HasSubstr("creategroup"), _, "", _, _)).WillOnce(DoAll(
-                SetArgPointee<3>(HttpCode::BadRequest),
-                SetArgPointee<4>(nullptr),
-                Return("")));
+        SetArgPointee<3>(HttpCode::BadRequest),
+        SetArgPointee<4>(nullptr),
+        Return("")));
 
     data_broker->SetTimeout(100);
     asapo::Error err;
@@ -567,15 +573,14 @@ TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsErrorCreateGroup) {
     ASSERT_THAT(groupid, Eq(""));
 }
 
-
 TEST_F(ServerDataBrokerTests, GenerateNewGroupIdReturnsGroupID) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/creategroup?token=" + expected_token, _, "", _,
                                          _)).WillOnce(DoAll(
-                                                 SetArgPointee<3>(HttpCode::OK),
-                                                 SetArgPointee<4>(nullptr),
-                                                 Return(expected_group_id)));
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return(expected_group_id)));
 
     data_broker->SetTimeout(100);
     asapo::Error err;
@@ -588,12 +593,13 @@ TEST_F(ServerDataBrokerTests, ResetCounterByDefaultUsesCorrectUri) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
-                                         expected_group_id +
-                                         "/resetcounter?token=" + expected_token + "&value=0", _, _, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("")));
+    EXPECT_CALL(mock_http_client,
+                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+                    expected_group_id +
+                    "/resetcounter?token=" + expected_token + "&value=0", _, _, _, _)).WillOnce(DoAll(
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return("")));
     auto err = data_broker->ResetLastReadMarker(expected_group_id);
     ASSERT_THAT(err, Eq(nullptr));
 }
@@ -602,28 +608,28 @@ TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUri) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
-                                         expected_group_id +
-                                         "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("")));
+    EXPECT_CALL(mock_http_client,
+                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+                    expected_group_id +
+                    "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll(
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return("")));
     auto err = data_broker->SetLastReadMarker(10, expected_group_id);
     ASSERT_THAT(err, Eq(nullptr));
 }
 
-
 TEST_F(ServerDataBrokerTests, ResetCounterUsesCorrectUriWithSubstream) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
     EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                         expected_substream + "/" +
-                                         expected_group_id +
-                                         "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("")));
+        expected_substream + "/" +
+        expected_group_id +
+        "/resetcounter?token=" + expected_token + "&value=10", _, _, _, _)).WillOnce(DoAll(
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return("")));
     auto err = data_broker->SetLastReadMarker(10, expected_group_id, expected_substream);
     ASSERT_THAT(err, Eq(nullptr));
 }
@@ -633,11 +639,11 @@ TEST_F(ServerDataBrokerTests, GetCurrentSizeUsesCorrectUri) {
     data_broker->SetTimeout(100);
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
-                                        "/default/size?token="
-                                        + expected_token, _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("{\"size\":10}")));
+        "/default/size?token="
+                                            + expected_token, _, _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("{\"size\":10}")));
     asapo::Error err;
     auto size = data_broker->GetCurrentSize(&err);
     ASSERT_THAT(err, Eq(nullptr));
@@ -649,45 +655,43 @@ TEST_F(ServerDataBrokerTests, GetCurrentSizeUsesCorrectUriWithSubstream) {
     data_broker->SetTimeout(100);
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                        expected_substream + "/size?token="
-                                        + expected_token, _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("{\"size\":10}")));
+        expected_substream + "/size?token="
+                                            + expected_token, _, _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("{\"size\":10}")));
     asapo::Error err;
     auto size = data_broker->GetCurrentSize(expected_substream, &err);
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(size, Eq(10));
 }
 
-
 TEST_F(ServerDataBrokerTests, GetCurrentSizeErrorOnWrongResponce) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
-                                        "/default/size?token="
-                                        + expected_token, _, _)).WillRepeatedly(DoAll(
-                                                    SetArgPointee<1>(HttpCode::Unauthorized),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("")));
+        "/default/size?token="
+                                            + expected_token, _, _)).WillRepeatedly(DoAll(
+        SetArgPointee<1>(HttpCode::Unauthorized),
+        SetArgPointee<2>(nullptr),
+        Return("")));
     asapo::Error err;
     auto size = data_broker->GetCurrentSize(&err);
     ASSERT_THAT(err, Ne(nullptr));
     ASSERT_THAT(size, Eq(0));
 }
 
-
 TEST_F(ServerDataBrokerTests, GetNDataErrorOnWrongParse) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
-                                        "/default/size?token="
-                                        + expected_token, _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("{\"siz\":10}")));
+        "/default/size?token="
+                                            + expected_token, _, _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("{\"siz\":10}")));
     asapo::Error err;
     auto size = data_broker->GetCurrentSize(&err);
     ASSERT_THAT(err, Ne(nullptr));
@@ -700,15 +704,15 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) {
     auto to_send = CreateFI();
     auto json = to_send.Json();
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
-                                        expected_group_id
-                                        + "/" + std::to_string(
-                                            expected_dataset_id) + "?token="
-                                        + expected_token, _,
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+                                            expected_group_id
+                                            + "/" + std::to_string(
+        expected_dataset_id) + "?token="
+                                            + expected_token, _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return(json)));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return(json)));
 
     auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr);
 
@@ -716,17 +720,16 @@ TEST_F(ServerDataBrokerTests, GetByIdUsesCorrectUri) {
     ASSERT_THAT(info.name, Eq(to_send.name));
 }
 
-
 TEST_F(ServerDataBrokerTests, GetByIdTimeouts) {
     MockGetBrokerUri();
     data_broker->SetTimeout(10);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
-                                        expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
-                                        + expected_token, _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::Conflict),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("")));
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+        expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
+                                            + expected_token, _, _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("")));
 
     auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr);
 
@@ -737,13 +740,12 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStream) {
     MockGetBrokerUri();
     data_broker->SetTimeout(10);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
-                                        expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
-                                        + expected_token, _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::Conflict),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"""\"}")));
-
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+        expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
+                                            + expected_token, _, _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("{\"op\":\"get_record_by_id\",\"id\":1,\"id_max\":1,\"next_substream\":\"""\"}")));
 
     auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr);
 
@@ -754,32 +756,29 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) {
     MockGetBrokerUri();
     data_broker->SetTimeout(10);
 
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
-                                        expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
-                                        + expected_token, _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::Conflict),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return("{\"op\":\"get_record_by_id\",\"id\":100,\"id_max\":1,\"next_substream\":\"""\"}")));
-
+    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+        expected_group_id + "/" + std::to_string(expected_dataset_id) + "?token="
+                                            + expected_token, _, _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::Conflict),
+        SetArgPointee<2>(nullptr),
+        Return("{\"op\":\"get_record_by_id\",\"id\":100,\"id_max\":1,\"next_substream\":\"""\"}")));
 
     auto err = data_broker->GetById(expected_dataset_id, &info, expected_group_id, nullptr);
 
     ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kEndOfStream));
 }
 
-
 TEST_F(ServerDataBrokerTests, GetMetaDataOK) {
     MockGetBrokerUri();
     data_broker->SetTimeout(100);
 
-
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream +
-                                        "/default/0/meta/0?token="
-                                        + expected_token, _,
+                                            "/default/0/meta/0?token="
+                                            + expected_token, _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return(expected_metadata)));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return(expected_metadata)));
 
     asapo::Error err;
     auto res = data_broker->GetBeamtimeMeta(&err);
@@ -789,14 +788,13 @@ TEST_F(ServerDataBrokerTests, GetMetaDataOK) {
 
 }
 
-
 TEST_F(ServerDataBrokerTests, QueryImagesReturnError) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Post_t(HasSubstr("queryimages"), _, expected_query_string, _, _)).WillOnce(DoAll(
-                SetArgPointee<3>(HttpCode::BadRequest),
-                SetArgPointee<4>(nullptr),
-                Return("error in query")));
+        SetArgPointee<3>(HttpCode::BadRequest),
+        SetArgPointee<4>(nullptr),
+        Return("error in query")));
 
     data_broker->SetTimeout(1000);
     asapo::Error err;
@@ -807,14 +805,13 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnError) {
     ASSERT_THAT(images.size(), Eq(0));
 }
 
-
 TEST_F(ServerDataBrokerTests, QueryImagesReturnEmptyResults) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Post_t(HasSubstr("queryimages"), _, expected_query_string, _, _)).WillOnce(DoAll(
-                SetArgPointee<3>(HttpCode::OK),
-                SetArgPointee<4>(nullptr),
-                Return("[]")));
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return("[]")));
 
     data_broker->SetTimeout(100);
     asapo::Error err;
@@ -836,9 +833,9 @@ TEST_F(ServerDataBrokerTests, QueryImagesWrongResponseArray) {
 
 
     EXPECT_CALL(mock_http_client, Post_t(HasSubstr("queryimages"), _, expected_query_string, _, _)).WillOnce(DoAll(
-                SetArgPointee<3>(HttpCode::OK),
-                SetArgPointee<4>(nullptr),
-                Return(responce_string)));
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return(responce_string)));
 
     data_broker->SetTimeout(100);
     asapo::Error err;
@@ -855,11 +852,10 @@ TEST_F(ServerDataBrokerTests, QueryImagesWrongResponseRecorsd) {
 
     auto responce_string = R"([{"bla":1},{"err":}])";
 
-
     EXPECT_CALL(mock_http_client, Post_t(HasSubstr("queryimages"), _, expected_query_string, _, _)).WillOnce(DoAll(
-                SetArgPointee<3>(HttpCode::OK),
-                SetArgPointee<4>(nullptr),
-                Return(responce_string)));
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return(responce_string)));
 
     data_broker->SetTimeout(100);
     asapo::Error err;
@@ -870,8 +866,6 @@ TEST_F(ServerDataBrokerTests, QueryImagesWrongResponseRecorsd) {
     ASSERT_THAT(err->Explain(), HasSubstr("response"));
 }
 
-
-
 TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) {
 
     MockGetBrokerUri();
@@ -883,12 +877,12 @@ TEST_F(ServerDataBrokerTests, QueryImagesReturnRecords) {
     auto json2 = rec2.Json();
     auto responce_string = "[" + json1 + "," + json2 + "]";
 
-
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0" +
-                                         "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return(responce_string)));
+    EXPECT_CALL(mock_http_client,
+                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0" +
+                    "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll(
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return(responce_string)));
 
     data_broker->SetTimeout(100);
     asapo::Error err;
@@ -906,11 +900,11 @@ TEST_F(ServerDataBrokerTests, QueryImagesUsesCorrectUriWithSubstream) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                         expected_substream + "/0" +
-                                         "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("[]")));
+        expected_substream + "/0" +
+        "/queryimages?token=" + expected_token, _, expected_query_string, _, _)).WillOnce(DoAll(
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return("[]")));
 
     data_broker->SetTimeout(100);
     asapo::Error err;
@@ -924,17 +918,16 @@ TEST_F(ServerDataBrokerTests, GetNextDatasetUsesCorrectUri) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
-                                        expected_group_id + "/next?token="
-                                        + expected_token + "&dataset=true", _,
+                                            expected_group_id + "/next?token="
+                                            + expected_token + "&dataset=true&minsize=0", _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
     asapo::Error err;
-    data_broker->GetNextDataset(expected_group_id, &err);
+    data_broker->GetNextDataset(expected_group_id, 0, &err);
 }
 
-
 TEST_F(ServerDataBrokerTests, GetDataSetReturnsFileInfos) {
     asapo::Error err;
     MockGetBrokerUri();
@@ -949,14 +942,14 @@ TEST_F(ServerDataBrokerTests, GetDataSetReturnsFileInfos) {
     auto json3 = to_send3.Json();
 
     auto json = std::string("{") +
-                "\"_id\":1," +
-                "\"size\":3," +
-                "\"images\":[" + json1 + "," + json2 + "," + json3 + "]" +
-                "}";
+        "\"_id\":1," +
+        "\"size\":3," +
+        "\"images\":[" + json1 + "," + json2 + "," + json3 + "]" +
+        "}";
 
     MockGet(json);
 
-    auto dataset = data_broker->GetNextDataset(expected_group_id, &err);
+    auto dataset = data_broker->GetNextDataset(expected_group_id, 0, &err);
 
     ASSERT_THAT(err, Eq(nullptr));
 
@@ -967,12 +960,47 @@ TEST_F(ServerDataBrokerTests, GetDataSetReturnsFileInfos) {
     ASSERT_THAT(dataset.content[2].id, Eq(to_send3.id));
 }
 
+TEST_F(ServerDataBrokerTests, GetDataSetReturnsPartialFileInfos) {
+    asapo::Error err;
+    MockGetBrokerUri();
+
+    auto to_send1 = CreateFI();
+    auto json1 = to_send1.Json();
+    auto to_send2 = CreateFI();
+    to_send2.id = 2;
+    auto json2 = to_send2.Json();
+    auto to_send3 = CreateFI();
+    to_send3.id = 3;
+    auto json3 = to_send3.Json();
+
+    auto json = std::string("{") +
+        "\"_id\":1," +
+        "\"size\":3," +
+        "\"images\":[" + json1 + "," + json2 + "]" +
+        "}";
+
+    MockGet(json, asapo::HttpCode::PartialContent);
+
+    auto dataset = data_broker->GetNextDataset(expected_group_id, 0, &err);
+
+    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kPartialData));
+
+    auto err_data = static_cast<const asapo::PartialErrorData*>(err->GetCustomData());
+    ASSERT_THAT(err_data->expected_size, Eq(3));
+    ASSERT_THAT(err_data->id, Eq(1));
+
+    ASSERT_THAT(dataset.id, Eq(1));
+    ASSERT_THAT(dataset.content.size(), Eq(2));
+    ASSERT_THAT(dataset.content[0].id, Eq(to_send1.id));
+    ASSERT_THAT(dataset.content[1].id, Eq(to_send2.id));
+}
+
 TEST_F(ServerDataBrokerTests, GetDataSetReturnsParseError) {
     MockGetBrokerUri();
     MockGet("error_response");
 
     asapo::Error err;
-    auto dataset = data_broker->GetNextDataset(expected_group_id, &err);
+    auto dataset = data_broker->GetNextDataset(expected_group_id, 0, &err);
 
     ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction));
     ASSERT_THAT(dataset.content.size(), Eq(0));
@@ -984,59 +1012,60 @@ TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUri) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
-                                        expected_group_id + "/last?token="
-                                        + expected_token + "&dataset=true", _,
+                                            expected_group_id + "/last?token="
+                                            + expected_token + "&dataset=true&minsize=2", _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
     asapo::Error err;
-    data_broker->GetLastDataset(expected_group_id, &err);
+    data_broker->GetLastDataset(expected_group_id, 2, &err);
 }
 
 TEST_F(ServerDataBrokerTests, GetLastDatasetUsesCorrectUriWithSubstream) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                        expected_substream + "/" +
-                                        expected_group_id + "/last?token="
-                                        + expected_token + "&dataset=true", _,
+                                            expected_substream + "/" +
+                                            expected_group_id + "/last?token="
+                                            + expected_token + "&dataset=true&minsize=1", _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
     asapo::Error err;
-    data_broker->GetLastDataset(expected_group_id, expected_substream, &err);
+    data_broker->GetLastDataset(expected_group_id, expected_substream, 1, &err);
 }
 
-
 TEST_F(ServerDataBrokerTests, GetDatasetByIdUsesCorrectUri) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
-                                        expected_group_id +
-                                        "/" + std::to_string(expected_dataset_id) + "?token="
-                                        + expected_token + "&dataset=true", _,
+                                            expected_group_id +
+                                            "/" + std::to_string(expected_dataset_id) + "?token="
+                                            + expected_token + "&dataset=true" + "&minsize=0", _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
     asapo::Error err;
-    data_broker->GetDatasetById(expected_dataset_id, expected_group_id, &err);
+    data_broker->GetDatasetById(expected_dataset_id, expected_group_id, 0, &err);
 }
 
 TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) {
     MockGetBrokerUri();
-    std::string return_substreams = R"({"substreams":[{"lastId":123,"name":"test","timestampCreated":1000000},{"name":"test1","timestampCreated":2000000}]})";
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/0/substreams"
-                                        + "?token=" + expected_token+"&from=stream_from", _,
-                                        _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return(return_substreams)));
+    std::string return_substreams =
+        R"({"substreams":[{"lastId":123,"name":"test","timestampCreated":1000000},{"name":"test1","timestampCreated":2000000}]})";
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/0/substreams"
+                          + "?token=" + expected_token + "&from=stream_from", _,
+                      _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return(return_substreams)));
 
     asapo::Error err;
-    auto substreams = data_broker->GetSubstreamList("stream_from",&err);
+    auto substreams = data_broker->GetSubstreamList("stream_from", &err);
     ASSERT_THAT(err, Eq(nullptr));
     ASSERT_THAT(substreams.size(), Eq(2));
     ASSERT_THAT(substreams.size(), 2);
@@ -1044,21 +1073,20 @@ TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUri) {
     ASSERT_THAT(substreams[1].Json(false), R"({"name":"test1","timestampCreated":2000000})");
 }
 
-
 TEST_F(ServerDataBrokerTests, GetSubstreamListUsesCorrectUriWithoutFrom) {
     MockGetBrokerUri();
-    EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/0/substreams"
-                                            + "?token=" + expected_token, _,
-                                        _)).WillOnce(DoAll(
+    EXPECT_CALL(mock_http_client,
+                Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/0/substreams"
+                          + "?token=" + expected_token, _,
+                      _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::OK),
         SetArgPointee<2>(nullptr),
         Return("")));;
 
     asapo::Error err;
-    auto substreams = data_broker->GetSubstreamList("",&err);
+    auto substreams = data_broker->GetSubstreamList("", &err);
 }
 
-
 void ServerDataBrokerTests::MockBeforeFTS(FileData* data) {
     auto to_send = CreateFI();
     auto json = to_send.Json();
@@ -1070,21 +1098,21 @@ void ServerDataBrokerTests::MockBeforeFTS(FileData* data) {
 
 void ServerDataBrokerTests::ExpectFolderToken() {
     std::string expected_folder_query_string = "{\"Folder\":\"" + expected_path + "\",\"BeamtimeId\":\"" +
-                                               expected_beamtime_id
-                                               + "\",\"Token\":\"" + expected_token + "\"}";
+        expected_beamtime_id
+        + "\",\"Token\":\"" + expected_token + "\"}";
 
     EXPECT_CALL(mock_http_client, Post_t(HasSubstr(expected_server_uri + "/asapo-authorizer/folder"), _,
                                          expected_folder_query_string, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return(expected_folder_token)
-                                                 ));
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return(expected_folder_token)
+    ));
 
 }
 
 ACTION_P(AssignArg3, assign) {
     if (assign) {
-        asapo::FileData data = asapo::FileData{new uint8_t[1] };
+        asapo::FileData data = asapo::FileData{new uint8_t[1]};
         data[0] = expected_value;
         *arg3 = std::move(data);
     }
@@ -1092,27 +1120,35 @@ ACTION_P(AssignArg3, assign) {
 
 void ServerDataBrokerTests::ExpectFileTransfer(const asapo::ConsumerErrorTemplate* p_err_template) {
     EXPECT_CALL(mock_http_client, PostReturnArray_t(HasSubstr(expected_fts_uri + "/transfer"),
-                                                    expected_cookie, expected_fts_query_string, _, expected_image_size, _)).WillOnce(DoAll(
-                                                            SetArgPointee<5>(HttpCode::OK),
-                                                            AssignArg3(p_err_template == nullptr),
-                                                            Return(p_err_template == nullptr ? nullptr : p_err_template->Generate().release())
-                                                            ));
+                                                    expected_cookie,
+                                                    expected_fts_query_string,
+                                                    _,
+                                                    expected_image_size,
+                                                    _)).WillOnce(DoAll(
+        SetArgPointee<5>(HttpCode::OK),
+        AssignArg3(p_err_template == nullptr),
+        Return(p_err_template == nullptr ? nullptr : p_err_template->Generate().release())
+    ));
 }
 
 void ServerDataBrokerTests::ExpectRepeatedFileTransfer() {
     EXPECT_CALL(mock_http_client, PostReturnArray_t(HasSubstr(expected_fts_uri + "/transfer"),
-                                                    expected_cookie, expected_fts_query_string, _, expected_image_size, _)).
-    WillOnce(DoAll(
-                 SetArgPointee<5>(HttpCode::Unauthorized),
-                 Return(nullptr))).
-    WillOnce(DoAll(
-                 SetArgPointee<5>(HttpCode::OK),
-                 Return(nullptr)
-             ));
+                                                    expected_cookie,
+                                                    expected_fts_query_string,
+                                                    _,
+                                                    expected_image_size,
+                                                    _)).
+        WillOnce(DoAll(
+        SetArgPointee<5>(HttpCode::Unauthorized),
+        Return(nullptr))).
+        WillOnce(DoAll(
+        SetArgPointee<5>(HttpCode::OK),
+        Return(nullptr)
+    ));
 }
 
 void ServerDataBrokerTests::AssertSingleFileTransfer() {
-    asapo::FileData data = asapo::FileData{new uint8_t[1] };
+    asapo::FileData data = asapo::FileData{new uint8_t[1]};
     MockGetBrokerUri();
     MockBeforeFTS(&data);
     ExpectFolderToken();
@@ -1127,7 +1163,6 @@ void ServerDataBrokerTests::AssertSingleFileTransfer() {
     Mock::VerifyAndClearExpectations(&mock_io);
 }
 
-
 TEST_F(ServerDataBrokerTests, GetImageUsesFileTransferServiceIfCannotReadFromCache) {
     AssertSingleFileTransfer();
 }
@@ -1137,28 +1172,32 @@ TEST_F(ServerDataBrokerTests, FileTransferReadsFileSize) {
     EXPECT_CALL(mock_http_client, Post_t(HasSubstr("sizeonly=true"),
                                          expected_cookie, expected_fts_query_string, _, _)).WillOnce(DoAll(
 
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("{\"file_size\":5}")
-                                                 ));
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return("{\"file_size\":5}")
+    ));
 
     EXPECT_CALL(mock_http_client, PostReturnArray_t(HasSubstr(expected_fts_uri + "/transfer"),
-                                                    expected_cookie, expected_fts_query_string, _, 5, _)).WillOnce(DoAll(
-                                                            SetArgPointee<5>(HttpCode::OK),
-                                                            AssignArg3(nullptr),
-                                                            Return(nullptr)
-                                                            ));
+                                                    expected_cookie,
+                                                    expected_fts_query_string,
+                                                    _,
+                                                    5,
+                                                    _)).WillOnce(DoAll(
+        SetArgPointee<5>(HttpCode::OK),
+        AssignArg3(nullptr),
+        Return(nullptr)
+    ));
 
     FileData data;
     info.size = 0;
     info.buf_id = 0;
-    auto err = fts_data_broker->RetrieveData(&info,  &data);
+    auto err = fts_data_broker->RetrieveData(&info, &data);
 }
 
 TEST_F(ServerDataBrokerTests, GetImageReusesTokenAndUri) {
     AssertSingleFileTransfer();
 
-    asapo::FileData data = asapo::FileData{new uint8_t[1] };
+    asapo::FileData data = asapo::FileData{new uint8_t[1]};
     MockBeforeFTS(&data);
     ExpectFileTransfer(nullptr);
 
@@ -1180,30 +1219,30 @@ TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUri) {
     MockGetBrokerUri();
     auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}";
     EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                         expected_substream + "/"  +
-                                         expected_group_id
-                                         + "/" + std::to_string(expected_dataset_id) + "?token="
-                                         + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("")));
+        expected_substream + "/" +
+        expected_group_id
+                                             + "/" + std::to_string(expected_dataset_id) + "?token="
+                                             + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll(
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return("")));
 
     auto err = data_broker->Acknowledge(expected_group_id, expected_dataset_id, expected_substream);
 
     ASSERT_THAT(err, Eq(nullptr));
 }
 
-
 TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUriWithDefaultSubStream) {
     MockGetBrokerUri();
     auto expected_acknowledge_command = "{\"Op\":\"ackimage\"}";
-    EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"  +
-                                         expected_group_id
-                                         + "/" + std::to_string(expected_dataset_id) + "?token="
-                                         + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("")));
+    EXPECT_CALL(mock_http_client,
+                Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/" +
+                    expected_group_id
+                           + "/" + std::to_string(expected_dataset_id) + "?token="
+                           + expected_token, _, expected_acknowledge_command, _, _)).WillOnce(DoAll(
+        SetArgPointee<3>(HttpCode::OK),
+        SetArgPointee<4>(nullptr),
+        Return("")));
 
     auto err = data_broker->Acknowledge(expected_group_id, expected_dataset_id);
 
@@ -1213,11 +1252,11 @@ TEST_F(ServerDataBrokerTests, AcknowledgeUsesCorrectUriWithDefaultSubStream) {
 void ServerDataBrokerTests::ExpectIdList(bool error) {
     MockGetBrokerUri();
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                        expected_substream + "/"  +
-                                        expected_group_id + "/nacks?token=" + expected_token + "&from=1&to=0", _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return(error ? "" : "{\"unacknowledged\":[1,2,3]}")));
+        expected_substream + "/" +
+        expected_group_id + "/nacks?token=" + expected_token + "&from=1&to=0", _, _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return(error ? "" : "{\"unacknowledged\":[1,2,3]}")));
 }
 
 TEST_F(ServerDataBrokerTests, GetUnAcknowledgedListReturnsIds) {
@@ -1229,17 +1268,15 @@ TEST_F(ServerDataBrokerTests, GetUnAcknowledgedListReturnsIds) {
     ASSERT_THAT(err, Eq(nullptr));
 }
 
-
 void ServerDataBrokerTests::ExpectLastAckId(bool empty_response) {
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                        expected_substream + "/"  +
-                                        expected_group_id + "/lastack?token=" + expected_token, _, _)).WillOnce(DoAll(
-                                                    SetArgPointee<1>(HttpCode::OK),
-                                                    SetArgPointee<2>(nullptr),
-                                                    Return(empty_response ? "{\"lastAckId\":0}" : "{\"lastAckId\":1}")));
+        expected_substream + "/" +
+        expected_group_id + "/lastack?token=" + expected_token, _, _)).WillOnce(DoAll(
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return(empty_response ? "{\"lastAckId\":0}" : "{\"lastAckId\":1}")));
 }
 
-
 TEST_F(ServerDataBrokerTests, GetLastAcknowledgeUsesOk) {
     MockGetBrokerUri();
     ExpectLastAckId(false);
@@ -1271,29 +1308,29 @@ TEST_F(ServerDataBrokerTests, ResendNacks) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/"
-                                        + expected_group_id + "/next?token="
-                                        + expected_token + "&resend_nacks=true&delay_sec=10&resend_attempts=3", _,
+                                            + expected_group_id + "/next?token="
+                                            + expected_token + "&resend_nacks=true&delay_sec=10&resend_attempts=3", _,
                                         _)).WillOnce(DoAll(
-                                                SetArgPointee<1>(HttpCode::OK),
-                                                SetArgPointee<2>(nullptr),
-                                                Return("")));
+        SetArgPointee<1>(HttpCode::OK),
+        SetArgPointee<2>(nullptr),
+        Return("")));
 
     data_broker->SetResendNacs(true, 10, 3);
     data_broker->GetNext(&info, expected_group_id, nullptr);
 }
 
-
 TEST_F(ServerDataBrokerTests, NegativeAcknowledgeUsesCorrectUri) {
     MockGetBrokerUri();
     auto expected_neg_acknowledge_command = R"({"Op":"negackimage","Params":{"DelaySec":10}})";
     EXPECT_CALL(mock_http_client, Post_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/" +
-                                         expected_substream + "/"  +
-                                         expected_group_id
-                                         + "/" + std::to_string(expected_dataset_id) + "?token="
-                                         + expected_token, _, expected_neg_acknowledge_command, _, _)).WillOnce(DoAll(
-                                                     SetArgPointee<3>(HttpCode::OK),
-                                                     SetArgPointee<4>(nullptr),
-                                                     Return("")));
+        expected_substream + "/" +
+        expected_group_id
+                                             + "/" + std::to_string(expected_dataset_id) + "?token="
+                                             + expected_token, _, expected_neg_acknowledge_command, _, _)).WillOnce(
+        DoAll(
+            SetArgPointee<3>(HttpCode::OK),
+            SetArgPointee<4>(nullptr),
+            Return("")));
 
     auto err = data_broker->NegativeAcknowledge(expected_group_id, expected_dataset_id, 10, expected_substream);
 
diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd
index 867139750..78a6ccaaa 100644
--- a/consumer/api/python/asapo_consumer.pxd
+++ b/consumer/api/python/asapo_consumer.pxd
@@ -39,6 +39,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo":
     vector[FileInfo].iterator end()
   struct DataSet:
     uint64_t id
+    uint64_t expected_size
     FileInfos content
   struct  SourceCredentials:
     string beamtime_id
@@ -74,9 +75,9 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil:
         string GenerateNewGroupId(Error* err)
         string GetBeamtimeMeta(Error* err)
         FileInfos QueryImages(string query, string substream, Error* err)
-        DataSet GetNextDataset(string group_id, string substream, Error* err)
-        DataSet GetLastDataset(string group_id, string substream, Error* err)
-        DataSet GetDatasetById(uint64_t id, string group_id, string substream, Error* err)
+        DataSet GetNextDataset(string group_id, string substream, uint64_t min_size, Error* err)
+        DataSet GetLastDataset(string group_id, string substream, uint64_t min_size, Error* err)
+        DataSet GetDatasetById(uint64_t id, string group_id, string substream, uint64_t min_size, Error* err)
         Error RetrieveData(FileInfo* info, FileData* data)
         vector[StreamInfo] GetSubstreamList(string from_substream, Error* err)
         void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts)
@@ -96,6 +97,8 @@ cdef extern from "asapo_consumer.h" namespace "asapo":
   ErrorTemplateInterface kInterruptedTransaction "asapo::ConsumerErrorTemplates::kInterruptedTransaction"
   ErrorTemplateInterface kLocalIOError "asapo::ConsumerErrorTemplates::kLocalIOError"
   ErrorTemplateInterface kWrongInput "asapo::ConsumerErrorTemplates::kWrongInput"
+  ErrorTemplateInterface kPartialData "asapo::ConsumerErrorTemplates::kPartialData"
+
   cdef cppclass ConsumerErrorData:
     uint64_t id
     uint64_t id_max
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index 0022eef0a..6958b6436 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -52,19 +52,23 @@ class AsapoStreamFinishedError(AsapoConsumerError):
     self.id_max = id_max
     self.next_substream = _str(next_substream)
 
-
 class AsapoEndOfStreamError(AsapoConsumerError):
   def __init__(self,message,id_max=None):
     AsapoConsumerError.__init__(self,message)
     self.id_max = id_max
 
+class AsapoPartialDataError(AsapoConsumerError):
+  def __init__(self,message,partial_data):
+    AsapoConsumerError.__init__(self,message)
+    self.partial_data = partial_data
+
 class AsapoNoDataError(AsapoConsumerError):
   def __init__(self,message,id=None,id_max=None):
     AsapoConsumerError.__init__(self,message)
     self.id_max = id_max
     self.id = id
 
-cdef throw_exception(Error& err):
+cdef throw_exception(Error& err, res = None):
     cdef ConsumerErrorData* data
     error_string =  _str(err.get().Explain())
     if err == kEndOfStream:
@@ -85,6 +89,8 @@ cdef throw_exception(Error& err):
                 raise AsapoNoDataError(error_string,data.id,data.id_max)
             else:
                 raise AsapoNoDataError(error_string)
+    elif err == kPartialData:
+            raise AsapoPartialDataError(error_string, res)
     elif err == kWrongInput:
             raise AsapoWrongInputError(error_string)
     elif err == kLocalIOError:
@@ -273,7 +279,7 @@ cdef class PyDataBroker:
         for fi in file_infos:
             json_list.append(json.loads(_str(fi.Json())))
         return json_list
-    def _op_dataset(self, op, group_id, substream, uint64_t id):
+    def _op_dataset(self, op, group_id, substream, uint64_t min_size, uint64_t id):
         cdef string b_group_id = _bytes(group_id)
         cdef string b_substream = _bytes(substream)
         cdef FileInfos file_infos
@@ -281,25 +287,26 @@ cdef class PyDataBroker:
         cdef Error err
         if op == "next":
             with nogil:
-                dataset = self.c_broker.GetNextDataset(b_group_id,b_substream, &err)
+                dataset = self.c_broker.GetNextDataset(b_group_id,b_substream, min_size, &err)
         elif op == "last":
             with nogil:
-                dataset = self.c_broker.GetLastDataset(b_group_id,b_substream, &err)
+                dataset = self.c_broker.GetLastDataset(b_group_id,b_substream, min_size, &err)
         elif op == "id":
             with nogil:
-                dataset = self.c_broker.GetDatasetById(id, b_group_id,b_substream, &err)
-        if err:
-            throw_exception(err)
+                dataset = self.c_broker.GetDatasetById(id, b_group_id,b_substream, min_size, &err)
         json_list = []
         for fi in dataset.content:
             json_list.append(json.loads(_str(fi.Json())))
-        return dataset.id, json_list
-    def get_next_dataset(self, group_id, substream = "default"):
-        return self._op_dataset("next",group_id,substream,0)
-    def get_last_dataset(self, group_id, substream = "default"):
-        return self._op_dataset("last",group_id,substream,0)
-    def get_dataset_by_id(self, uint64_t id, group_id, substream = "default"):
-        return self._op_dataset("id",group_id,substream,id)
+        res={'id':dataset.id,'expected_size':dataset.expected_size,'content':json_list}
+        if err:
+            throw_exception(err,res)
+        return res
+    def get_next_dataset(self, group_id, substream = "default", min_size = 0):
+        return self._op_dataset("next",group_id,substream,min_size,0)
+    def get_last_dataset(self, group_id, substream = "default", min_size = 0):
+        return self._op_dataset("last",group_id,substream,min_size,0)
+    def get_dataset_by_id(self, uint64_t id, group_id, substream = "default", min_size = 0):
+        return self._op_dataset("id",group_id,substream,min_size,id)
     def get_beamtime_meta(self):
         cdef Error err
         cdef string meta_str
-- 
GitLab