From 6fc1a59deaa4ce42c543a27b5ed8af01157c29a0 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Tue, 29 May 2018 13:43:22 +0200
Subject: [PATCH] parallel connections work

---
 broker/src/asapo_broker/database/database.go  |  1 +
 .../asapo_broker/database/database_test.go    |  1 +
 .../asapo_broker/database/mock_database.go    |  5 ++
 broker/src/asapo_broker/database/mongodb.go   | 24 +++---
 .../src/asapo_broker/database/mongodb_test.go | 30 +++++++-
 broker/src/asapo_broker/server/get_id.go      | 50 +++++++++++++
 broker/src/asapo_broker/server/get_id_test.go | 74 +++++++++++++++++++
 broker/src/asapo_broker/server/get_next.go    |  2 +-
 broker/src/asapo_broker/server/listroutes.go  | 18 +++--
 broker/src/asapo_broker/utils/status_codes.go |  2 +-
 common/cpp/include/common/error.h             |  3 +-
 common/cpp/include/preprocessor/definitions.h | 11 +++
 common/cpp/src/http_client/CMakeLists.txt     |  2 +-
 .../dummy_data_producer.cpp                   | 12 ++-
 .../worker/getnext_broker/getnext_broker.cpp  |  3 +-
 .../api/src/receiver_discovery_service.cpp    |  5 +-
 producer/api/src/receiver_discovery_service.h |  5 +-
 producer/api/src/request_handler_factory.h    |  6 +-
 producer/api/src/request_handler_tcp.cpp      |  5 +-
 producer/api/src/request_pool.h               |  5 +-
 .../unittests/test_request_handler_tcp.cpp    |  6 ++
 receiver/src/connection.cpp                   |  7 +-
 receiver/src/connection.h                     |  2 +-
 receiver/src/receiver.cpp                     |  2 +-
 receiver/src/receiver_config.cpp              |  4 +-
 receiver/src/statistics.cpp                   |  2 +-
 receiver/src/statistics.h                     |  9 +--
 receiver/unittests/test_connection.cpp        |  4 +-
 receiver/unittests/test_statistics.cpp        |  6 +-
 .../automatic/broker/get_next/check_linux.sh  |  2 +-
 .../broker/get_next/check_windows.bat         |  2 +-
 .../full_chain/simple_chain/check_linux.sh    | 13 ++--
 .../transfer_single_file/check_linux.sh       |  4 +-
 .../performance_full_chain_simple/broker.json |  2 +-
 .../discovery.json                            |  7 ++
 .../receiver.json                             |  5 +-
 .../performance_full_chain_simple/test.sh     | 42 +++++++++--
 .../discovery.json                            |  7 ++
 .../receiver.json                             |  3 +-
 .../settings_tmp.json                         |  9 ---
 .../performance_producer_receiver/test.sh     | 33 +++++++--
 worker/api/cpp/src/server_data_broker.cpp     | 38 ++++++----
 .../api/cpp/unittests/test_server_broker.cpp  | 38 +++++-----
 43 files changed, 384 insertions(+), 127 deletions(-)
 create mode 100644 broker/src/asapo_broker/server/get_id.go
 create mode 100644 broker/src/asapo_broker/server/get_id_test.go
 create mode 100644 common/cpp/include/preprocessor/definitions.h
 create mode 100644 tests/manual/performance_full_chain_simple/discovery.json
 create mode 100644 tests/manual/performance_producer_receiver/discovery.json
 delete mode 100644 tests/manual/performance_producer_receiver/settings_tmp.json

diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go
index e530bf03e..3a0683bf3 100644
--- a/broker/src/asapo_broker/database/database.go
+++ b/broker/src/asapo_broker/database/database.go
@@ -2,6 +2,7 @@ package database
 
 type Agent interface {
 	GetNextRecord(db_name string) ([]byte, error)
+	GetRecordByID(dbname string, id int) ([]byte, error)
 	Connect(string) error
 	Close()
 	Copy() Agent
diff --git a/broker/src/asapo_broker/database/database_test.go b/broker/src/asapo_broker/database/database_test.go
index 3073ed61b..68b1eadb4 100644
--- a/broker/src/asapo_broker/database/database_test.go
+++ b/broker/src/asapo_broker/database/database_test.go
@@ -12,6 +12,7 @@ func TestMockDataBase(t *testing.T) {
 	db.On("Close").Return()
 	db.On("Copy").Return(nil)
 	db.On("GetNextRecord", "").Return([]byte(""), nil)
+	db.On("GetRecordByID", "").Return([]byte(""), nil)
 	db.Connect("")
 	db.GetNextRecord("")
 	db.Close()
diff --git a/broker/src/asapo_broker/database/mock_database.go b/broker/src/asapo_broker/database/mock_database.go
index cf25a1eea..7ac5c1318 100644
--- a/broker/src/asapo_broker/database/mock_database.go
+++ b/broker/src/asapo_broker/database/mock_database.go
@@ -28,3 +28,8 @@ func (db *MockedDatabase) GetNextRecord(db_name string) (answer []byte, err erro
 	args := db.Called(db_name)
 	return args.Get(0).([]byte), args.Error(1)
 }
+
+func (db *MockedDatabase) GetRecordByID(db_name string, id int) (answer []byte, err error) {
+	args := db.Called(db_name, id)
+	return args.Get(0).([]byte), args.Error(1)
+}
diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go
index 43213baf0..55d9b8371 100644
--- a/broker/src/asapo_broker/database/mongodb.go
+++ b/broker/src/asapo_broker/database/mongodb.go
@@ -3,10 +3,11 @@
 package database
 
 import (
+	"asapo_broker/utils"
+	"encoding/json"
 	"errors"
 	"gopkg.in/mgo.v2"
 	"gopkg.in/mgo.v2/bson"
-	"asapo_broker/utils"
 	"sync"
 	"time"
 )
@@ -150,15 +151,23 @@ func (db *Mongodb) incrementField(dbname string, max_ind int, res interface{}) (
 	return err
 }
 
-func (db *Mongodb) getRecordByID(dbname string, id int) (interface{}, error) {
+func (db *Mongodb) GetRecordByID(dbname string, id int) ([]byte, error) {
 	var res map[string]interface{}
 	q := bson.M{"_id": id}
 	c := db.session.DB(dbname).C(data_collection_name)
 	err := c.Find(q).One(&res)
 	if err == mgo.ErrNotFound {
-		return nil, &DBError{utils.StatusNoData, err.Error()}
+		var r = struct {
+			Id int `json:"id""`
+		}{id}
+		res, _ := json.Marshal(&r)
+		return nil, &DBError{utils.StatusNoData, string(res)}
+	}
+	if err != nil {
+		return nil, err
 	}
-	return &res, err
+
+	return utils.MapToJson(&res)
 }
 
 func (db *Mongodb) needCreateLocationPointersInDb(db_name string) bool {
@@ -230,11 +239,6 @@ func (db *Mongodb) GetNextRecord(db_name string) ([]byte, error) {
 	if err != nil {
 		return nil, err
 	}
+	return db.GetRecordByID(db_name, curPointer.Value)
 
-	res, err := db.getRecordByID(db_name, curPointer.Value)
-	if err != nil {
-		return nil, err
-	}
-
-	return utils.MapToJson(&res)
 }
diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go
index a8b788ed2..cf17a3812 100644
--- a/broker/src/asapo_broker/database/mongodb_test.go
+++ b/broker/src/asapo_broker/database/mongodb_test.go
@@ -3,9 +3,9 @@
 package database
 
 import (
+	"asapo_broker/utils"
 	"encoding/json"
 	"github.com/stretchr/testify/assert"
-	"asapo_broker/utils"
 	"sync"
 	"testing"
 )
@@ -66,6 +66,16 @@ func TestMongoDBGetNextErrorWhenEmptyCollection(t *testing.T) {
 	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
 }
 
+func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) {
+	db.Connect(dbaddress)
+	db.databases = append(db.databases, dbname)
+	defer cleanup()
+	db.InsertRecord(dbname, &rec2)
+	_, err := db.GetNextRecord(dbname)
+	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
+	assert.Equal(t, "{\"id\":1}", err.Error())
+}
+
 func TestMongoDBGetNextOK(t *testing.T) {
 	db.Connect(dbaddress)
 	defer cleanup()
@@ -143,3 +153,21 @@ func TestMongoDBGetNextInParallel(t *testing.T) {
 
 	assert.Equal(t, n, getNOnes(results))
 }
+
+func TestMongoDBGetRecordByID(t *testing.T) {
+	db.Connect(dbaddress)
+	defer cleanup()
+	db.InsertRecord(dbname, &rec1)
+	res, err := db.GetRecordByID(dbname, 1)
+	assert.Nil(t, err)
+	assert.Equal(t, string(rec1_expect), string(res))
+}
+
+func TestMongoDBGetRecordByIDFails(t *testing.T) {
+	db.Connect(dbaddress)
+	defer cleanup()
+	db.InsertRecord(dbname, &rec1)
+	_, err := db.GetRecordByID(dbname, 2)
+	assert.Equal(t, utils.StatusNoData, err.(*DBError).Code)
+	assert.Equal(t, "{\"id\":2}", err.Error())
+}
diff --git a/broker/src/asapo_broker/server/get_id.go b/broker/src/asapo_broker/server/get_id.go
new file mode 100644
index 000000000..50624ba0f
--- /dev/null
+++ b/broker/src/asapo_broker/server/get_id.go
@@ -0,0 +1,50 @@
+package server
+
+import (
+	"asapo_broker/logger"
+	"asapo_broker/utils"
+	"github.com/gorilla/mux"
+	"net/http"
+	"strconv"
+)
+
+func extractRequestParametersID(r *http.Request) (int, bool) {
+	vars := mux.Vars(r)
+	id_str, ok := vars["id"]
+	if !ok {
+		return 0, ok
+	}
+	id, err := strconv.Atoi(id_str)
+	return id, err == nil
+}
+
+func routeGetByID(w http.ResponseWriter, r *http.Request) {
+	r.Header.Set("Content-type", "application/json")
+	db_name, ok := extractRequestParameters(r)
+	if !ok {
+		w.WriteHeader(http.StatusBadRequest)
+		return
+	}
+	id, ok := extractRequestParametersID(r)
+	if !ok {
+		w.WriteHeader(http.StatusBadRequest)
+		return
+	}
+
+	answer, code := getRecordByID(db_name, id)
+	w.WriteHeader(code)
+	w.Write(answer)
+}
+
+func getRecordByID(db_name string, id int) (answer []byte, code int) {
+	db_new := db.Copy()
+	defer db_new.Close()
+	statistics.IncreaseCounter()
+	answer, err := db_new.GetRecordByID(db_name, id)
+	log_str := "processing get id request in " + db_name + " at " + settings.BrokerDbAddress
+	if err != nil {
+		return returnError(err, log_str)
+	}
+	logger.Debug(log_str)
+	return answer, utils.StatusOK
+}
diff --git a/broker/src/asapo_broker/server/get_id_test.go b/broker/src/asapo_broker/server/get_id_test.go
new file mode 100644
index 000000000..b85aa8989
--- /dev/null
+++ b/broker/src/asapo_broker/server/get_id_test.go
@@ -0,0 +1,74 @@
+package server
+
+import (
+	"asapo_broker/database"
+	"asapo_broker/logger"
+	"asapo_broker/utils"
+	"errors"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/mock"
+	"github.com/stretchr/testify/suite"
+	"net/http"
+	"testing"
+)
+
+func TestGetIdWithoutDatabaseName(t *testing.T) {
+	w := doRequest("/database/123")
+	assert.Equal(t, http.StatusNotFound, w.Code, "no database name")
+}
+
+func ExpectCopyCloseOnID(mock_db *database.MockedDatabase) {
+	mock_db.On("Copy").Return(mock_db)
+	mock_db.On("Close").Return()
+}
+
+type GetIDTestSuite struct {
+	suite.Suite
+	mock_db *database.MockedDatabase
+}
+
+func (suite *GetIDTestSuite) SetupTest() {
+	statistics.Reset()
+	suite.mock_db = new(database.MockedDatabase)
+	db = suite.mock_db
+	logger.SetMockLog()
+	ExpectCopyCloseOnID(suite.mock_db)
+}
+
+func (suite *GetIDTestSuite) TearDownTest() {
+	assertExpectations(suite.T(), suite.mock_db)
+	logger.UnsetMockLog()
+	db = nil
+}
+
+func TestGetIDTestSuite(t *testing.T) {
+	suite.Run(t, new(GetIDTestSuite))
+}
+
+func (suite *GetIDTestSuite) TestGetIDWithWrongDatabaseName() {
+	suite.mock_db.On("GetRecordByID", "foo", 1).Return([]byte(""),
+		&database.DBError{utils.StatusWrongInput, ""})
+
+	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get id request in foo")))
+
+	w := doRequest("/database/foo/1")
+
+	suite.Equal(http.StatusBadRequest, w.Code, "wrong database name")
+}
+
+func (suite *GetIDTestSuite) TestGetIDWithInternalDBError() {
+	suite.mock_db.On("GetRecordByID", "foo", 1).Return([]byte(""), errors.New(""))
+	logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get id request in foo")))
+
+	w := doRequest("/database/foo/1")
+	suite.Equal(http.StatusInternalServerError, w.Code, "internal error")
+}
+
+func (suite *GetIDTestSuite) TestGetIDOK() {
+	suite.mock_db.On("GetRecordByID", "dbname", 1).Return([]byte("Hello"), nil)
+	logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get id request in dbname")))
+
+	w := doRequest("/database/dbname/1")
+	suite.Equal(http.StatusOK, w.Code, "GetID OK")
+	suite.Equal("Hello", string(w.Body.Bytes()), "GetID sends data")
+}
diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go
index 8e4eede20..3cc2a826e 100644
--- a/broker/src/asapo_broker/server/get_next.go
+++ b/broker/src/asapo_broker/server/get_next.go
@@ -1,10 +1,10 @@
 package server
 
 import (
-	"github.com/gorilla/mux"
 	"asapo_broker/database"
 	"asapo_broker/logger"
 	"asapo_broker/utils"
+	"github.com/gorilla/mux"
 	"net/http"
 )
 
diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go
index 0b827d13d..2a6f70ecc 100644
--- a/broker/src/asapo_broker/server/listroutes.go
+++ b/broker/src/asapo_broker/server/listroutes.go
@@ -11,11 +11,17 @@ var listRoutes = utils.Routes{
 		"/database/{dbname}/next",
 		routeGetNext,
 	},
-		utils.Route{
-    		"Health",
-    		"Get",
-    		"/health",
-    		routeGetHealth,
-    	},
+	utils.Route{
+		"GetID",
+		"Get",
+		"/database/{dbname}/{id}",
+		routeGetByID,
+	},
 
+	utils.Route{
+		"Health",
+		"Get",
+		"/health",
+		routeGetHealth,
+	},
 }
diff --git a/broker/src/asapo_broker/utils/status_codes.go b/broker/src/asapo_broker/utils/status_codes.go
index 9549e555e..70a219300 100644
--- a/broker/src/asapo_broker/utils/status_codes.go
+++ b/broker/src/asapo_broker/utils/status_codes.go
@@ -10,5 +10,5 @@ const (
 	//error codes
 	StatusError      = http.StatusInternalServerError
 	StatusWrongInput = http.StatusBadRequest
-	StatusNoData     = http.StatusNoContent
+	StatusNoData     = http.StatusNotFound
 )
diff --git a/common/cpp/include/common/error.h b/common/cpp/include/common/error.h
index efef43d16..ee0736056 100644
--- a/common/cpp/include/common/error.h
+++ b/common/cpp/include/common/error.h
@@ -17,7 +17,8 @@ enum class ErrorType {
     kProducerError,
 
     kMemoryAllocationError,
-    kEndOfFile
+    kEndOfFile,
+    kTimeOut
 };
 
 class ErrorInterface;
diff --git a/common/cpp/include/preprocessor/definitions.h b/common/cpp/include/preprocessor/definitions.h
new file mode 100644
index 000000000..f0f9768e9
--- /dev/null
+++ b/common/cpp/include/preprocessor/definitions.h
@@ -0,0 +1,11 @@
+#ifndef ASAPO_DEFINITIONS_H
+#define ASAPO_DEFINITIONS_H
+
+#ifdef UNIT_TESTS
+#define VIRTUAL virtual
+#else
+#define VIRTUAL
+#endif
+
+
+#endif //ASAPO_DEFINITIONS_H
diff --git a/common/cpp/src/http_client/CMakeLists.txt b/common/cpp/src/http_client/CMakeLists.txt
index bae54a5d7..58f8813c1 100644
--- a/common/cpp/src/http_client/CMakeLists.txt
+++ b/common/cpp/src/http_client/CMakeLists.txt
@@ -1,7 +1,7 @@
 set(TARGET_NAME curl_http_client)
 set(SOURCE_FILES
         curl_http_client.cpp
-        http_client_factory.cpp)
+        http_client_factory.cpp ../../include/preprocessor/definitions.h)
 
 
 ################################
diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
index f41aa7ef5..bb0f49bc1 100644
--- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp
+++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp
@@ -43,12 +43,11 @@ void work(asapo::GenericRequestHeader header, asapo::Error err) {
     mutex.unlock();
 }
 
-bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations) {
-    auto buffer = std::unique_ptr<uint8_t>(new uint8_t[number_of_byte]);
+bool SendDummyData(asapo::Producer* producer, uint8_t* data, size_t number_of_byte, uint64_t iterations) {
 
     for(uint64_t i = 0; i < iterations; i++) {
 //        std::cerr << "Send file " << i + 1 << "/" << iterations << std::endl;
-        auto err = producer->Send(i + 1, buffer.get(), number_of_byte, std::to_string(i), &work);
+        auto err = producer->Send(i + 1, data, number_of_byte, std::to_string(i), &work);
         if (err) {
             std::cerr << "Cannot send file: " << err << std::endl;
             return false;
@@ -87,13 +86,18 @@ int main (int argc, char* argv[]) {
     producer->SetLogLevel(asapo::LogLevel::Debug);
     std::this_thread::sleep_for(std::chrono::milliseconds(1000));
 
+
+    size_t number_of_byte = number_of_kbytes * 1024;
+    auto buffer = std::unique_ptr<uint8_t>(new uint8_t[number_of_byte]);
+
+
     if(err) {
         std::cerr << "Cannot start producer. ProducerError: " << err << std::endl;
         return EXIT_FAILURE;
     }
 
     high_resolution_clock::time_point t1 = high_resolution_clock::now();
-    if(!SendDummyData(producer.get(), number_of_kbytes * 1024, iterations)) {
+    if(!SendDummyData(producer.get(), buffer.get(), number_of_byte, iterations)) {
         return EXIT_FAILURE;
     }
 
diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp
index 2422d1067..d3628406e 100644
--- a/examples/worker/getnext_broker/getnext_broker.cpp
+++ b/examples/worker/getnext_broker/getnext_broker.cpp
@@ -20,10 +20,11 @@ void WaitThreads(std::vector<std::thread>* threads) {
 
 int ProcessError(const Error& err) {
     if (err == nullptr) return 0;
-    if (err->GetErrorType() != asapo::ErrorType::kEndOfFile) {
+    if (err->GetErrorType() != asapo::ErrorType::kTimeOut) {
         std::cout << err->Explain() << std::endl;
         return 1;
     }
+    std::cout << err->Explain() << std::endl;
     return 0;
 }
 
diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp
index 62e6a7a35..64e2f4227 100644
--- a/producer/api/src/receiver_discovery_service.cpp
+++ b/producer/api/src/receiver_discovery_service.cpp
@@ -5,6 +5,7 @@
 
 #include <iostream>
 #include <algorithm>
+#include <numeric>
 
 namespace  asapo {
 
@@ -58,7 +59,9 @@ void ReceiverDiscoveryService::ThreadHandler() {
             lock.lock();
             continue;
         }
-        log__->Debug("got receivers from " + endpoint_ );
+        std::string s;
+        s = std::accumulate(std::begin(uris), std::end(uris), s);
+        log__->Debug("got receivers from " + endpoint_ + ":" + s);
         lock.lock();
         max_connections_ = max_connections;
         uri_list_ = uris;
diff --git a/producer/api/src/receiver_discovery_service.h b/producer/api/src/receiver_discovery_service.h
index 255088bb3..54b0ac09c 100644
--- a/producer/api/src/receiver_discovery_service.h
+++ b/producer/api/src/receiver_discovery_service.h
@@ -10,10 +10,7 @@
 
 #include "http_client/http_client.h"
 #include "logger/logger.h"
-
-#ifdef UNIT_TESTS
-#define VIRTUAL virtual
-#endif
+#include "preprocessor/definitions.h"
 
 namespace  asapo {
 
diff --git a/producer/api/src/request_handler_factory.h b/producer/api/src/request_handler_factory.h
index 199d0aa93..066f0d181 100644
--- a/producer/api/src/request_handler_factory.h
+++ b/producer/api/src/request_handler_factory.h
@@ -4,14 +4,10 @@
 #include "request_handler.h"
 #include "receiver_discovery_service.h"
 
+#include "preprocessor/definitions.h"
 
 namespace  asapo {
 
-#ifdef UNIT_TESTS
-#define VIRTUAL virtual
-#endif
-
-
 class RequestHandlerFactory {
   public:
     RequestHandlerFactory(ReceiverDiscoveryService* discovery_service);
diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp
index afbd2fd75..4708c5acb 100644
--- a/producer/api/src/request_handler_tcp.cpp
+++ b/producer/api/src/request_handler_tcp.cpp
@@ -18,7 +18,7 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& receiver_address)
     Error err;
     sd_ = io__->CreateAndConnectIPTCPSocket(receiver_address, &err);
     if(err != nullptr) {
-        //log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain());
+        log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain());
         return err;
     }
     log__->Info("connected to receiver at " + receiver_address);
@@ -68,7 +68,8 @@ Error RequestHandlerTcp::TrySendToReceiver(const Request* request, const std::st
         return err;
     }
 
-    log__->Debug("successfully sent data to " + receiver_address);
+    log__->Debug(std::string("successfully sent data ") + " id: " + std::to_string(request->header.data_id) + " to " +
+                 receiver_address);
     return nullptr;
 }
 
diff --git a/producer/api/src/request_pool.h b/producer/api/src/request_pool.h
index bfe0a9018..4fac37bbc 100644
--- a/producer/api/src/request_pool.h
+++ b/producer/api/src/request_pool.h
@@ -13,10 +13,7 @@
 #include "request_handler_tcp.h"
 #include "request_handler_factory.h"
 
-#ifdef UNIT_TESTS
-#define VIRTUAL virtual
-#endif
-
+#include "preprocessor/definitions.h"
 
 
 namespace asapo {
diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp
index 440d41f59..7e8b3acb6 100644
--- a/producer/api/unittests/test_request_handler_tcp.cpp
+++ b/producer/api/unittests/test_request_handler_tcp.cpp
@@ -121,6 +121,12 @@ void RequestHandlerTcpTests::ExpectFailConnect(bool only_once) {
                 testing::SetArgPointee<1>(asapo::IOErrorTemplates::kInvalidAddressFormat.Generate().release()),
                 Return(asapo::kDisconnectedSocketDescriptor)
             ));
+        EXPECT_CALL(mock_logger, Debug(AllOf(
+                                           HasSubstr("cannot connect"),
+                                           HasSubstr(expected_address)
+                                       )
+                                      ));
+
         if (only_once) break;
     }
 
diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp
index 678bef893..4c40b1742 100644
--- a/receiver/src/connection.cpp
+++ b/receiver/src/connection.cpp
@@ -11,13 +11,14 @@ namespace asapo {
 size_t Connection::kRequestHandlerMaxBufferSize;
 std::atomic<uint32_t> Connection::kNetworkProducerPeerImplGlobalCounter(0);
 
-Connection::Connection(SocketDescriptor socket_fd, const std::string& address,std::string receiver_tag): request_factory__{new RequestFactory},
+Connection::Connection(SocketDescriptor socket_fd, const std::string& address,
+                       std::string receiver_tag): request_factory__{new RequestFactory},
 io__{GenerateDefaultIO()}, statistics__{new Statistics}, log__{GetDefaultReceiverLogger()} {
     socket_fd_ = socket_fd;
     connection_id_ = kNetworkProducerPeerImplGlobalCounter++;
     address_ = address;
-    statistics__->AddTag("connection_from",address);
-    statistics__->AddTag("receiver_tag",std::move(receiver_tag));
+    statistics__->AddTag("connection_from", address);
+    statistics__->AddTag("receiver_tag", std::move(receiver_tag));
 
 }
 
diff --git a/receiver/src/connection.h b/receiver/src/connection.h
index bde002ae6..3cb73a789 100644
--- a/receiver/src/connection.h
+++ b/receiver/src/connection.h
@@ -29,7 +29,7 @@ class Connection {
     static size_t kRequestHandlerMaxBufferSize;
     static std::atomic<uint32_t> kNetworkProducerPeerImplGlobalCounter;
 
-    Connection(SocketDescriptor socket_fd, const std::string& address,std::string receiver_tag);
+    Connection(SocketDescriptor socket_fd, const std::string& address, std::string receiver_tag);
     ~Connection() = default;
 
     void Listen() const noexcept;
diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp
index eba2b9ab3..9a94afe45 100644
--- a/receiver/src/receiver.cpp
+++ b/receiver/src/receiver.cpp
@@ -56,7 +56,7 @@ void Receiver::ProcessConnections(Error* err) {
 void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address)  {
     log__->Info("new connection from " + address);
     auto thread = io__->NewThread([connection_socket_fd, address] {
-        auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address,GetReceiverConfig()->tag));
+        auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, GetReceiverConfig()->tag));
         connection->Listen();
     });
 
diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp
index e058b92d7..b487c02cd 100644
--- a/receiver/src/receiver_config.cpp
+++ b/receiver/src/receiver_config.cpp
@@ -21,9 +21,9 @@ Error ReceiverConfigFactory::SetConfigFromFile(std::string file_name) {
     (err = parser.GetBool("WriteToDb", &config.write_to_db)) ||
     (err = parser.GetString("BrokerDbAddress", &config.broker_db_uri)) ||
     (err = parser.GetString("BrokerDbName", &config.broker_db_name)) ||
-        (err = parser.GetString("Tag", &config.tag)) ||
+    (err = parser.GetString("Tag", &config.tag)) ||
 
-        (err = parser.GetString("MonitorDbName", &config.monitor_db_name));
+    (err = parser.GetString("MonitorDbName", &config.monitor_db_name));
     (err = parser.GetString("LogLevel", &log_level));
     if (err) {
         return err;
diff --git a/receiver/src/statistics.cpp b/receiver/src/statistics.cpp
index 4cb2a4c1b..f2c431e81 100644
--- a/receiver/src/statistics.cpp
+++ b/receiver/src/statistics.cpp
@@ -77,7 +77,7 @@ void Statistics::StopTimer() noexcept {
     time_counters_[current_statistic_entity_] += elapsed;
 }
 
-void Statistics::AddTag(const std::string &name, const std::string &value) noexcept {
+void Statistics::AddTag(const std::string& name, const std::string& value) noexcept {
     if (!tag_.empty()) {
         tag_ += ",";
     }
diff --git a/receiver/src/statistics.h b/receiver/src/statistics.h
index c52324c23..bbe409dff 100644
--- a/receiver/src/statistics.h
+++ b/receiver/src/statistics.h
@@ -7,6 +7,7 @@
 
 
 #include "statistics_sender.h"
+#include "preprocessor/definitions.h"
 
 namespace asapo {
 
@@ -25,10 +26,6 @@ struct StatisticsToSend {
     std::string tags;
 };
 
-#ifdef UNIT_TESTS
-#define VIRTUAL virtual
-#endif
-
 class Statistics {
   public:
     VIRTUAL void SendIfNeeded() noexcept;
@@ -38,10 +35,10 @@ class Statistics {
     VIRTUAL void StartTimer(const StatisticEntity& entity) noexcept;
     VIRTUAL void IncreaseRequestDataVolume(uint64_t transferred_data_volume) noexcept;
     VIRTUAL void StopTimer() noexcept;
-    VIRTUAL void AddTag(const std::string& name,const std::string& value) noexcept;
+    VIRTUAL void AddTag(const std::string& name, const std::string& value) noexcept;
 
 
-  void SetWriteInterval(uint64_t interval_ms);
+    void SetWriteInterval(uint64_t interval_ms);
     std::unique_ptr<StatisticsSender> statistics_sender__;
   private:
     uint64_t GetElapsedMs(StatisticEntity entity) const noexcept;
diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp
index 66dfdf13e..af7d83535 100644
--- a/receiver/unittests/test_connection.cpp
+++ b/receiver/unittests/test_connection.cpp
@@ -46,7 +46,7 @@ using asapo::MockStatistics;
 namespace {
 
 TEST(Connection, Constructor) {
-    Connection connection{0, "some_address","some_tag"};
+    Connection connection{0, "some_address", "some_tag"};
     ASSERT_THAT(dynamic_cast<asapo::Statistics*>(connection.statistics__.get()), Ne(nullptr));
 
     ASSERT_THAT(dynamic_cast<asapo::IO*>(connection.io__.get()), Ne(nullptr));
@@ -85,7 +85,7 @@ class MockRequestFactory: public asapo::RequestFactory {
 class ConnectionTests : public Test {
   public:
     std::string connected_uri{"some_address"};
-    Connection connection{0, connected_uri,"some_tag"};
+    Connection connection{0, connected_uri, "some_tag"};
     MockIO mock_io;
     MockRequestFactory mock_factory;
     NiceMock<MockStatistics> mock_statictics;
diff --git a/receiver/unittests/test_statistics.cpp b/receiver/unittests/test_statistics.cpp
index 9a011cc68..59cace0fe 100644
--- a/receiver/unittests/test_statistics.cpp
+++ b/receiver/unittests/test_statistics.cpp
@@ -94,7 +94,7 @@ TEST_F(StatisticTests, IncreaseRequestCounter) {
 }
 
 TEST_F(StatisticTests, AddTag) {
-    statistics.AddTag("name","value");
+    statistics.AddTag("name", "value");
 
     auto stat = ExtractStat();
 
@@ -102,8 +102,8 @@ TEST_F(StatisticTests, AddTag) {
 }
 
 TEST_F(StatisticTests, AddTagTwice) {
-    statistics.AddTag("name1","value1");
-    statistics.AddTag("name2","value2");
+    statistics.AddTag("name1", "value1");
+    statistics.AddTag("name2", "value2");
 
     auto stat = ExtractStat();
 
diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh
index e23ec0fbe..8c2da0557 100644
--- a/tests/automatic/broker/get_next/check_linux.sh
+++ b/tests/automatic/broker/get_next/check_linux.sh
@@ -23,4 +23,4 @@ brokerid=`echo $!`
 curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":1'
 curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":2'
 
-curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep "No Content"
+curl -v  --silent 127.0.0.1:5005/database/data/next --stderr - | grep "Not Found"
diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat
index 9ba1a0810..026563bea 100644
--- a/tests/automatic/broker/get_next/check_windows.bat
+++ b/tests/automatic/broker/get_next/check_windows.bat
@@ -13,7 +13,7 @@ ping 1.0.0.0 -n 1 -w 100 > nul
 
 C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr \"_id\":1  || goto :error
 C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr \"_id\":2  || goto :error
-C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr  "No Content"  || goto :error
+C:\Curl\curl.exe -v  --silent 127.0.0.1:5005/database/data/next --stderr - | findstr  "Not Found"  || goto :error
 
 goto :clean
 
diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh
index 8640f1fa5..b3ab88bb1 100644
--- a/tests/automatic/full_chain/simple_chain/check_linux.sh
+++ b/tests/automatic/full_chain/simple_chain/check_linux.sh
@@ -9,16 +9,18 @@ monitor_database_name=db_test
 broker_address=127.0.0.1:5005
 
 Cleanup() {
-	echo cleanup
-	rm -rf files
+    echo cleanup
+    rm -rf files
     nomad stop receiver
     nomad stop discovery
     nomad stop broker
+#    kill $producerid
     echo "db.dropDatabase()" | mongo ${broker_database_name}
     influx -execute "drop database ${monitor_database_name}"
 }
 
 influx -execute "create database ${monitor_database_name}"
+echo "db.${broker_database_name}.insert({dummy:1})" | mongo ${broker_database_name}
 
 nomad run receiver.nmd
 nomad run discovery.nmd
@@ -29,9 +31,8 @@ sleep 1
 
 #producer
 mkdir files
-$1 localhost:5006 100 100 4 0 &
+$1 localhost:5006 100 1000 4 0 &
+#producerid=`echo $!`
 
-#producerrid=`echo $!`
-sleep 0.1
 
-$2 ${broker_address} ${broker_database_name} 2 | grep "Processed 100 file(s)"
+$2 ${broker_address} ${broker_database_name} 2 #| grep "Processed 1000 file(s)"
diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh
index 07334882f..9bc7f7b15 100644
--- a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh
+++ b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh
@@ -14,9 +14,11 @@ Cleanup() {
     nomad stop discovery
     echo "db.dropDatabase()" | mongo ${mongo_database_name}
     influx -execute "drop database ${database_name}"
-
 }
 
+influx -execute "create database ${database_name}"
+echo "db.${mongo_database_name}.insert({dummy:1})" | mongo ${mongo_database_name}
+
 nomad run receiver.nmd
 nomad run discovery.nmd
 
diff --git a/tests/manual/performance_full_chain_simple/broker.json b/tests/manual/performance_full_chain_simple/broker.json
index 31aef140d..a2c1a4a5a 100644
--- a/tests/manual/performance_full_chain_simple/broker.json
+++ b/tests/manual/performance_full_chain_simple/broker.json
@@ -3,5 +3,5 @@
   "MonitorDbAddress": "localhost:8086",
   "MonitorDbName": "db_test",
   "port":5005,
-  "LogLevel":"debug"
+  "LogLevel":"info"
 }
\ No newline at end of file
diff --git a/tests/manual/performance_full_chain_simple/discovery.json b/tests/manual/performance_full_chain_simple/discovery.json
new file mode 100644
index 000000000..476f732bb
--- /dev/null
+++ b/tests/manual/performance_full_chain_simple/discovery.json
@@ -0,0 +1,7 @@
+{
+  "MaxConnections": 32,
+  "Mode": "static",
+  "Endpoints":["localhost:4200"],
+  "Port":5006,
+  "LogLevel":"info"
+}
\ No newline at end of file
diff --git a/tests/manual/performance_full_chain_simple/receiver.json b/tests/manual/performance_full_chain_simple/receiver.json
index 5330d75a7..7cf0d85c1 100644
--- a/tests/manual/performance_full_chain_simple/receiver.json
+++ b/tests/manual/performance_full_chain_simple/receiver.json
@@ -6,5 +6,6 @@
   "ListenPort":4200,
   "WriteToDisk":true,
   "WriteToDb":true,
-  "LogLevel":"debug"
-}
\ No newline at end of file
+  "LogLevel":"info",
+  "Tag": "test_receiver"
+}
diff --git a/tests/manual/performance_full_chain_simple/test.sh b/tests/manual/performance_full_chain_simple/test.sh
index 4cece2b86..08039120d 100755
--- a/tests/manual/performance_full_chain_simple/test.sh
+++ b/tests/manual/performance_full_chain_simple/test.sh
@@ -9,6 +9,7 @@ Cleanup() {
 set +e
 ssh ${receiver_node} rm -f ${receiver_dir}/files/*
 ssh ${receiver_node} killall receiver
+ssh ${receiver_node} killall asapo-discovery
 ssh ${broker_node} killall asapo-broker
 ssh ${broker_node} docker rm -f -v mongo
 }
@@ -24,16 +25,14 @@ log_dir=~/fullchain_tests/logs
 # starts receiver on $receiver_node
 # runs producer with various file sizes from $producer_node and measures performance
 
-file_size=1000
+file_size=100
 file_num=$((10000000 / $file_size))
 echo filesize: ${file_size}K, filenum: $file_num
 
 # receiver_setup
 receiver_node=max-wgs
-receiver_ip=`resolveip -s ${receiver_node}`
 receiver_port=4201
 receiver_dir=/gpfs/petra3/scratch/yakubov/receiver_tests
-ssh ${receiver_node} mkdir -p ${receiver_dir}/logs
 ssh ${receiver_node} mkdir -p ${receiver_dir}/files
 scp ../../../cmake-build-release/receiver/receiver ${receiver_node}:${receiver_dir}
 cat receiver.json |
@@ -45,14 +44,36 @@ cat receiver.json |
           else .
           end
          ) |
-      from_entries" > settings_tmp.json
-scp settings_tmp.json ${receiver_node}:${receiver_dir}/settings.json
+      from_entries" > receiver_tmp.json
+
+scp receiver_tmp.json ${receiver_node}:${receiver_dir}/receiver.json
+rm receiver_tmp.json
+
+
+# discovery_setup
+discovery_port=5006
+cat discovery.json |
+  jq "to_entries |
+       map(if .key == \"Port\"
+          then . + {value:${discovery_port}}
+          elif .key == \"Endpoints\"
+          then . + {value:[\"${receiver_node}:${receiver_port}\"]}
+          else .
+          end
+         ) |
+      from_entries" > discovery_tmp.json
+scp ../../../cmake-build-release/discovery/asapo-discovery ${receiver_node}:${receiver_dir}
+scp discovery_tmp.json ${receiver_node}:${receiver_dir}/discovery.json
+discovery_ip=`resolveip -s ${receiver_node}`
+rm discovery_tmp.json
 
 #producer_setup
 producer_node=max-display001
 #producer_node=max-wgs
 producer_dir=~/fullchain_tests
 scp ../../../cmake-build-release/examples/producer/dummy-data-producer/dummy-data-producer ${producer_node}:${producer_dir}
+producer_nthreads=8
+
 
 #broker_setup
 broker_node=max-wgs
@@ -84,8 +105,12 @@ ssh ${monitor_node} influx -execute \"create database db_test\"
 #mongo_start
 ssh ${broker_node} docker run -d -p 27017:27017 --name mongo mongo
 
+#discovery_start
+ssh ${receiver_node} "bash -c 'cd ${receiver_dir}; nohup ./asapo-discovery -config discovery.json &> ${log_dir}/discovery.log &'"
+sleep 0.3
+
 #receiver_start
-ssh ${receiver_node} "bash -c 'cd ${receiver_dir}; nohup ./receiver settings.json &> ${log_dir}/log.receiver &'"
+ssh ${receiver_node} "bash -c 'cd ${receiver_dir}; nohup ./receiver receiver.json &> ${log_dir}/log.receiver &'"
 sleep 0.3
 
 #broker_start
@@ -93,8 +118,9 @@ ssh ${broker_node} "bash -c 'cd ${broker_dir}; nohup ./asapo-broker -config brok
 sleep 0.3
 
 #producer_start
-ssh ${producer_node} "bash -c 'cd ${producer_dir}; nohup ./dummy-data-producer ${receiver_ip}:${receiver_port} ${file_size} ${file_num} &> ${producer_dir}/producer.log &'"
-sleep 0.3
+ssh ${producer_node} "bash -c 'cd ${producer_dir}; nohup ./dummy-data-producer ${discovery_ip}:${discovery_port} ${file_size} ${file_num} ${producer_nthreads} 0 &> ${log_dir}/producer.log &'"
+
+sleep 1
 
 #worker_start
 ssh ${worker_node} ${worker_dir}/getnext_broker ${broker_node}:5005 test_run ${nthreads}
diff --git a/tests/manual/performance_producer_receiver/discovery.json b/tests/manual/performance_producer_receiver/discovery.json
new file mode 100644
index 000000000..476f732bb
--- /dev/null
+++ b/tests/manual/performance_producer_receiver/discovery.json
@@ -0,0 +1,7 @@
+{
+  "MaxConnections": 32,
+  "Mode": "static",
+  "Endpoints":["localhost:4200"],
+  "Port":5006,
+  "LogLevel":"info"
+}
\ No newline at end of file
diff --git a/tests/manual/performance_producer_receiver/receiver.json b/tests/manual/performance_producer_receiver/receiver.json
index dcb387955..7cf0d85c1 100644
--- a/tests/manual/performance_producer_receiver/receiver.json
+++ b/tests/manual/performance_producer_receiver/receiver.json
@@ -6,5 +6,6 @@
   "ListenPort":4200,
   "WriteToDisk":true,
   "WriteToDb":true,
-  "LogLevel":"info"
+  "LogLevel":"info",
+  "Tag": "test_receiver"
 }
diff --git a/tests/manual/performance_producer_receiver/settings_tmp.json b/tests/manual/performance_producer_receiver/settings_tmp.json
deleted file mode 100644
index 7b5460157..000000000
--- a/tests/manual/performance_producer_receiver/settings_tmp.json
+++ /dev/null
@@ -1,9 +0,0 @@
-{
-  "MonitorDbAddress": "zitpcx27016:8086",
-  "MonitorDbName": "db_test",
-  "BrokerDbAddress": "localhost:27017",
-  "BrokerDbName": "test_run",
-  "ListenPort": 4201,
-  "WriteToDisk": true,
-  "WriteToDb": true
-}
diff --git a/tests/manual/performance_producer_receiver/test.sh b/tests/manual/performance_producer_receiver/test.sh
index 862ae2ec8..83a52730d 100755
--- a/tests/manual/performance_producer_receiver/test.sh
+++ b/tests/manual/performance_producer_receiver/test.sh
@@ -2,13 +2,17 @@
 
 set -e
 
+trap Cleanup EXIT
+
+
 # starts receiver on $service_node
 # runs producer with various file sizes from $worker_node and measures performance
 
 # a working directory
 service_node=max-wgs
 service_ip=`resolveip -s ${service_node}`
-service_port=4201
+discovery_port=5006
+receiver_port=4201
 
 monitor_node=zitpcx27016
 monitor_port=8086
@@ -28,6 +32,7 @@ ssh ${service_node} mkdir -p ${service_dir}/files
 ssh ${worker_node} mkdir -p ${worker_dir}
 
 scp ../../../cmake-build-release/receiver/receiver ${service_node}:${service_dir}
+scp ../../../cmake-build-release/discovery/asapo-discovery ${service_node}:${service_dir}
 scp ../../../cmake-build-release/examples/producer/dummy-data-producer/dummy-data-producer ${worker_node}:${worker_dir}
 
 function do_work {
@@ -36,25 +41,41 @@ cat receiver.json |
        map(if .key == \"MonitorDbAddress\"
           then . + {value:\"${monitor_node}:${monitor_port}\"}
           elif .key == \"ListenPort\"
-          then . + {value:${service_port}}
+          then . + {value:${receiver_port}}
           elif .key == \"WriteToDisk\"
           then . + {value:$1}
           else .
           end
          ) |
-      from_entries" > settings_tmp.json
-scp settings_tmp.json ${service_node}:${service_dir}/settings.json
-ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./receiver settings.json &> ${service_dir}/receiver.log &'"
+      from_entries" > receiver_tmp.json
+cat discovery.json |
+  jq "to_entries |
+       map(if .key == \"Port\"
+          then . + {value:${discovery_port}}
+          elif .key == \"Endpoints\"
+          then . + {value:[\"${service_node}:${receiver_port}\"]}
+          else .
+          end
+         ) |
+      from_entries" > discovery_tmp.json
+
+scp discovery_tmp.json ${service_node}:${service_dir}/discovery.json
+scp receiver_tmp.json ${service_node}:${service_dir}/receiver.json
+rm discovery_tmp.json receiver_tmp.json
+ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./receiver receiver.json &> ${service_dir}/receiver.log &'"
+ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./asapo-discovery -config discovery.json &> ${service_dir}/discovery.log &'"
+
 sleep 0.3
 for size  in 100 1000 10000
 do
 ssh ${service_node} docker run -d -p 27017:27017 --name mongo mongo
 echo ===================================================================
-ssh ${worker_node} ${worker_dir}/dummy-data-producer ${service_ip}:${service_port} ${size} 1000
+ssh ${worker_node} ${worker_dir}/dummy-data-producer ${service_ip}:${discovery_port} ${size} 1000 8 0
 ssh ${service_node} rm -f ${service_dir}/files/*
 ssh ${service_node} docker rm -f -v mongo
 done
 ssh ${service_node} killall receiver
+ssh ${service_node} killall asapo-discovery
 }
 
 echo
diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp
index e00a2daa8..3db20c666 100644
--- a/worker/api/cpp/src/server_data_broker.cpp
+++ b/worker/api/cpp/src/server_data_broker.cpp
@@ -2,13 +2,14 @@
 
 #include <chrono>
 
+#include <json_parser/json_parser.h>
+
 #include "io/io_factory.h"
 
 #include "http_client/http_error.h"
 
 using std::chrono::high_resolution_clock;
 
-
 namespace asapo {
 
 Error HttpCodeToWorkerError(const HttpCode& code) {
@@ -22,12 +23,9 @@ Error HttpCodeToWorkerError(const HttpCode& code) {
     case HttpCode::InternalServerError:
         message = WorkerErrorMessage::kErrorReadingSource;
         break;
-    case HttpCode::NoContent:
+    case HttpCode::NotFound:
         message = WorkerErrorMessage::kNoData;
         return TextErrorWithType(message, ErrorType::kEndOfFile);
-    case HttpCode::NotFound:
-        message = WorkerErrorMessage::kSourceNotFound;
-        break;
     default:
         message = WorkerErrorMessage::kErrorReadingSource;
         break;
@@ -35,10 +33,8 @@ Error HttpCodeToWorkerError(const HttpCode& code) {
     return Error{new HttpError(message, code)};
 }
 
-
-
 ServerDataBroker::ServerDataBroker(const std::string& server_uri,
-                                   const std::string& source_name):
+                                   const std::string& source_name) :
     io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()},
     server_uri_{server_uri}, source_name_{source_name} {
 }
@@ -51,11 +47,19 @@ void ServerDataBroker::SetTimeout(uint64_t timeout_ms) {
     timeout_ms_ = timeout_ms;
 }
 
+std::string GetIDFromJson(const std::string& json_string, Error* err) {
+    JsonStringParser parser(json_string);
+    uint64_t id;
+    if ((*err = parser.GetUInt64("id", &id)) != nullptr) {
+        return "";
+    }
+    return std::to_string(id);
+}
+
 Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, const std::string& operation) {
-    std::string full_uri = server_uri_ + "/database/" + source_name_ + "/" + operation;
     Error err;
     HttpCode code;
-
+    std::string full_uri = server_uri_ + "/database/" + source_name_ + "/" + operation;
     std::string response;
     uint64_t elapsed_ms = 0;
     while (true) {
@@ -68,11 +72,19 @@ Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, const std::string&
         if (err == nullptr) break;
         if (err->GetErrorType() != asapo::ErrorType::kEndOfFile) {
             err->Append(response);
-//            return err;
+            return err;
+        } else {
+            if (response.find("id") != std::string::npos) {
+                auto id = GetIDFromJson(response, &err);
+                if (err) {
+                    return err;
+                }
+                full_uri = server_uri_ + "/database/" + source_name_ + "/" + id;
+            }
         }
 
         if (elapsed_ms >= timeout_ms_) {
-            err->Append("exit on timeout");
+            err = TextErrorWithType("no more data found, exit on timeout", asapo::ErrorType::kTimeOut);
             return err;
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(100));
@@ -90,7 +102,7 @@ Error ServerDataBroker::GetNext(FileInfo* info, FileData* data) {
         return TextError(WorkerErrorMessage::kWrongInput);
     }
 
-    auto  err = GetFileInfoFromServer(info, "next");
+    auto err = GetFileInfoFromServer(info, "next");
     if (err != nullptr) {
         return err;
     }
diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp
index 2424eb9cd..28e2caef4 100644
--- a/worker/api/cpp/unittests/test_server_broker.cpp
+++ b/worker/api/cpp/unittests/test_server_broker.cpp
@@ -92,42 +92,46 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) {
     data_broker->GetNext(&info, nullptr);
 }
 
-TEST_F(ServerDataBrokerTests, GetNextReturnsErrorFromHttpClient) {
-    EXPECT_CALL(mock_http_client, Get_t(_, _, _)).WillOnce(DoAll(
+
+TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) {
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                 SetArgPointee<1>(HttpCode::NotFound),
                 SetArgPointee<2>(nullptr),
-                Return("")));
+                Return("{\"id\":1}")));
 
     auto err = data_broker->GetNext(&info, nullptr);
 
-    ASSERT_THAT(err->Explain(), HasSubstr(asapo::WorkerErrorMessage::kSourceNotFound));
-    ASSERT_THAT(err->GetErrorType(), asapo::ErrorType::kHttpError);
-    ASSERT_THAT(dynamic_cast<HttpError*>(err.get())->GetCode(), Eq(HttpCode::NotFound));
+    ASSERT_THAT(err->Explain(), HasSubstr("timeout"));
 }
 
-TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) {
-    EXPECT_CALL(mock_http_client, Get_t(_, _, _)).WillOnce(DoAll(
-                SetArgPointee<1>(HttpCode::NoContent),
+TEST_F(ServerDataBrokerTests, GetNextReturnsWrongResponseFromHttpClient) {
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
+                SetArgPointee<1>(HttpCode::NotFound),
                 SetArgPointee<2>(nullptr),
-                Return("")));
+                Return("id")));
 
     auto err = data_broker->GetNext(&info, nullptr);
 
-    ASSERT_THAT(err->Explain(), HasSubstr(asapo::WorkerErrorMessage::kNoData));
-    ASSERT_THAT(err->GetErrorType(), asapo::ErrorType::kEndOfFile);
+    ASSERT_THAT(err->Explain(), HasSubstr("Cannot parse"));
 }
 
+
 TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClientUntilTimeout) {
-    EXPECT_CALL(mock_http_client, Get_t(_, _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
-                SetArgPointee<1>(HttpCode::NoContent),
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
+                SetArgPointee<1>(HttpCode::NotFound),
                 SetArgPointee<2>(nullptr),
-                Return("")));
+                Return("{\"id\":1}")));
+
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("1"), _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
+                SetArgPointee<1>(HttpCode::NotFound),
+                SetArgPointee<2>(nullptr),
+                Return("{\"id\":1}")));
+
 
     data_broker->SetTimeout(100);
     auto err = data_broker->GetNext(&info, nullptr);
 
-    ASSERT_THAT(err->Explain(), HasSubstr(asapo::WorkerErrorMessage::kNoData));
-    ASSERT_THAT(err->GetErrorType(), asapo::ErrorType::kEndOfFile);
+    ASSERT_THAT(err->Explain(), HasSubstr("timeout"));
 }
 
 
-- 
GitLab