diff --git a/broker/src/asapo_broker/database/database.go b/broker/src/asapo_broker/database/database.go index e530bf03e1587c224a9e13dc92437fa45c656d2a..3a0683bf301fd3bf4b41c4d54544a1a970c3b158 100644 --- a/broker/src/asapo_broker/database/database.go +++ b/broker/src/asapo_broker/database/database.go @@ -2,6 +2,7 @@ package database type Agent interface { GetNextRecord(db_name string) ([]byte, error) + GetRecordByID(dbname string, id int) ([]byte, error) Connect(string) error Close() Copy() Agent diff --git a/broker/src/asapo_broker/database/database_test.go b/broker/src/asapo_broker/database/database_test.go index 3073ed61b1a621ae47e3a41903e75f2a85836255..68b1eadb44e0e5a02a19c66994a0c4cfe9540fbb 100644 --- a/broker/src/asapo_broker/database/database_test.go +++ b/broker/src/asapo_broker/database/database_test.go @@ -12,6 +12,7 @@ func TestMockDataBase(t *testing.T) { db.On("Close").Return() db.On("Copy").Return(nil) db.On("GetNextRecord", "").Return([]byte(""), nil) + db.On("GetRecordByID", "").Return([]byte(""), nil) db.Connect("") db.GetNextRecord("") db.Close() diff --git a/broker/src/asapo_broker/database/mock_database.go b/broker/src/asapo_broker/database/mock_database.go index cf25a1eea71a73fa8973027b8a70cfc574643fda..7ac5c13188e4c498566665b57b4448963d3fbe05 100644 --- a/broker/src/asapo_broker/database/mock_database.go +++ b/broker/src/asapo_broker/database/mock_database.go @@ -28,3 +28,8 @@ func (db *MockedDatabase) GetNextRecord(db_name string) (answer []byte, err erro args := db.Called(db_name) return args.Get(0).([]byte), args.Error(1) } + +func (db *MockedDatabase) GetRecordByID(db_name string, id int) (answer []byte, err error) { + args := db.Called(db_name, id) + return args.Get(0).([]byte), args.Error(1) +} diff --git a/broker/src/asapo_broker/database/mongodb.go b/broker/src/asapo_broker/database/mongodb.go index 43213baf03226baa05d3b933175c1de7139b7051..55d9b8371a85320c6754aad974f6af1d3b45346b 100644 --- a/broker/src/asapo_broker/database/mongodb.go +++ b/broker/src/asapo_broker/database/mongodb.go @@ -3,10 +3,11 @@ package database import ( + "asapo_broker/utils" + "encoding/json" "errors" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" - "asapo_broker/utils" "sync" "time" ) @@ -150,15 +151,23 @@ func (db *Mongodb) incrementField(dbname string, max_ind int, res interface{}) ( return err } -func (db *Mongodb) getRecordByID(dbname string, id int) (interface{}, error) { +func (db *Mongodb) GetRecordByID(dbname string, id int) ([]byte, error) { var res map[string]interface{} q := bson.M{"_id": id} c := db.session.DB(dbname).C(data_collection_name) err := c.Find(q).One(&res) if err == mgo.ErrNotFound { - return nil, &DBError{utils.StatusNoData, err.Error()} + var r = struct { + Id int `json:"id""` + }{id} + res, _ := json.Marshal(&r) + return nil, &DBError{utils.StatusNoData, string(res)} + } + if err != nil { + return nil, err } - return &res, err + + return utils.MapToJson(&res) } func (db *Mongodb) needCreateLocationPointersInDb(db_name string) bool { @@ -230,11 +239,6 @@ func (db *Mongodb) GetNextRecord(db_name string) ([]byte, error) { if err != nil { return nil, err } + return db.GetRecordByID(db_name, curPointer.Value) - res, err := db.getRecordByID(db_name, curPointer.Value) - if err != nil { - return nil, err - } - - return utils.MapToJson(&res) } diff --git a/broker/src/asapo_broker/database/mongodb_test.go b/broker/src/asapo_broker/database/mongodb_test.go index a8b788ed2bbfa8c7921e7382e65fecd74a44121f..cf17a38123464f59ff19dfc63dfcc1dafaa73e45 100644 --- a/broker/src/asapo_broker/database/mongodb_test.go +++ b/broker/src/asapo_broker/database/mongodb_test.go @@ -3,9 +3,9 @@ package database import ( + "asapo_broker/utils" "encoding/json" "github.com/stretchr/testify/assert" - "asapo_broker/utils" "sync" "testing" ) @@ -66,6 +66,16 @@ func TestMongoDBGetNextErrorWhenEmptyCollection(t *testing.T) { assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) } +func TestMongoDBGetNextErrorWhenRecordNotThereYet(t *testing.T) { + db.Connect(dbaddress) + db.databases = append(db.databases, dbname) + defer cleanup() + db.InsertRecord(dbname, &rec2) + _, err := db.GetNextRecord(dbname) + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"id\":1}", err.Error()) +} + func TestMongoDBGetNextOK(t *testing.T) { db.Connect(dbaddress) defer cleanup() @@ -143,3 +153,21 @@ func TestMongoDBGetNextInParallel(t *testing.T) { assert.Equal(t, n, getNOnes(results)) } + +func TestMongoDBGetRecordByID(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.InsertRecord(dbname, &rec1) + res, err := db.GetRecordByID(dbname, 1) + assert.Nil(t, err) + assert.Equal(t, string(rec1_expect), string(res)) +} + +func TestMongoDBGetRecordByIDFails(t *testing.T) { + db.Connect(dbaddress) + defer cleanup() + db.InsertRecord(dbname, &rec1) + _, err := db.GetRecordByID(dbname, 2) + assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) + assert.Equal(t, "{\"id\":2}", err.Error()) +} diff --git a/broker/src/asapo_broker/server/get_id.go b/broker/src/asapo_broker/server/get_id.go new file mode 100644 index 0000000000000000000000000000000000000000..50624ba0f442c00e0ea2d2937ee3aaf8234eb90c --- /dev/null +++ b/broker/src/asapo_broker/server/get_id.go @@ -0,0 +1,50 @@ +package server + +import ( + "asapo_broker/logger" + "asapo_broker/utils" + "github.com/gorilla/mux" + "net/http" + "strconv" +) + +func extractRequestParametersID(r *http.Request) (int, bool) { + vars := mux.Vars(r) + id_str, ok := vars["id"] + if !ok { + return 0, ok + } + id, err := strconv.Atoi(id_str) + return id, err == nil +} + +func routeGetByID(w http.ResponseWriter, r *http.Request) { + r.Header.Set("Content-type", "application/json") + db_name, ok := extractRequestParameters(r) + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + id, ok := extractRequestParametersID(r) + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + + answer, code := getRecordByID(db_name, id) + w.WriteHeader(code) + w.Write(answer) +} + +func getRecordByID(db_name string, id int) (answer []byte, code int) { + db_new := db.Copy() + defer db_new.Close() + statistics.IncreaseCounter() + answer, err := db_new.GetRecordByID(db_name, id) + log_str := "processing get id request in " + db_name + " at " + settings.BrokerDbAddress + if err != nil { + return returnError(err, log_str) + } + logger.Debug(log_str) + return answer, utils.StatusOK +} diff --git a/broker/src/asapo_broker/server/get_id_test.go b/broker/src/asapo_broker/server/get_id_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b85aa898956ea7e12c5fb45f41248fc1f7be2d85 --- /dev/null +++ b/broker/src/asapo_broker/server/get_id_test.go @@ -0,0 +1,74 @@ +package server + +import ( + "asapo_broker/database" + "asapo_broker/logger" + "asapo_broker/utils" + "errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "net/http" + "testing" +) + +func TestGetIdWithoutDatabaseName(t *testing.T) { + w := doRequest("/database/123") + assert.Equal(t, http.StatusNotFound, w.Code, "no database name") +} + +func ExpectCopyCloseOnID(mock_db *database.MockedDatabase) { + mock_db.On("Copy").Return(mock_db) + mock_db.On("Close").Return() +} + +type GetIDTestSuite struct { + suite.Suite + mock_db *database.MockedDatabase +} + +func (suite *GetIDTestSuite) SetupTest() { + statistics.Reset() + suite.mock_db = new(database.MockedDatabase) + db = suite.mock_db + logger.SetMockLog() + ExpectCopyCloseOnID(suite.mock_db) +} + +func (suite *GetIDTestSuite) TearDownTest() { + assertExpectations(suite.T(), suite.mock_db) + logger.UnsetMockLog() + db = nil +} + +func TestGetIDTestSuite(t *testing.T) { + suite.Run(t, new(GetIDTestSuite)) +} + +func (suite *GetIDTestSuite) TestGetIDWithWrongDatabaseName() { + suite.mock_db.On("GetRecordByID", "foo", 1).Return([]byte(""), + &database.DBError{utils.StatusWrongInput, ""}) + + logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get id request in foo"))) + + w := doRequest("/database/foo/1") + + suite.Equal(http.StatusBadRequest, w.Code, "wrong database name") +} + +func (suite *GetIDTestSuite) TestGetIDWithInternalDBError() { + suite.mock_db.On("GetRecordByID", "foo", 1).Return([]byte(""), errors.New("")) + logger.MockLog.On("Error", mock.MatchedBy(containsMatcher("get id request in foo"))) + + w := doRequest("/database/foo/1") + suite.Equal(http.StatusInternalServerError, w.Code, "internal error") +} + +func (suite *GetIDTestSuite) TestGetIDOK() { + suite.mock_db.On("GetRecordByID", "dbname", 1).Return([]byte("Hello"), nil) + logger.MockLog.On("Debug", mock.MatchedBy(containsMatcher("get id request in dbname"))) + + w := doRequest("/database/dbname/1") + suite.Equal(http.StatusOK, w.Code, "GetID OK") + suite.Equal("Hello", string(w.Body.Bytes()), "GetID sends data") +} diff --git a/broker/src/asapo_broker/server/get_next.go b/broker/src/asapo_broker/server/get_next.go index 8e4eede203c96314b59df566eb1d335f4f126873..3cc2a826e4c4e649c5543042e9eba4ee609e9108 100644 --- a/broker/src/asapo_broker/server/get_next.go +++ b/broker/src/asapo_broker/server/get_next.go @@ -1,10 +1,10 @@ package server import ( - "github.com/gorilla/mux" "asapo_broker/database" "asapo_broker/logger" "asapo_broker/utils" + "github.com/gorilla/mux" "net/http" ) diff --git a/broker/src/asapo_broker/server/listroutes.go b/broker/src/asapo_broker/server/listroutes.go index 0b827d13d8faef9b89f6d13a36544f9be53232fc..2a6f70ecc1de6e6dc6517d540d6609ca6e00d7ac 100644 --- a/broker/src/asapo_broker/server/listroutes.go +++ b/broker/src/asapo_broker/server/listroutes.go @@ -11,11 +11,17 @@ var listRoutes = utils.Routes{ "/database/{dbname}/next", routeGetNext, }, - utils.Route{ - "Health", - "Get", - "/health", - routeGetHealth, - }, + utils.Route{ + "GetID", + "Get", + "/database/{dbname}/{id}", + routeGetByID, + }, + utils.Route{ + "Health", + "Get", + "/health", + routeGetHealth, + }, } diff --git a/broker/src/asapo_broker/utils/status_codes.go b/broker/src/asapo_broker/utils/status_codes.go index 9549e555eeb771d129d93c0e6eea3e7cb5124626..70a2193004b0e34bd07f5f0804b50375fd8ba910 100644 --- a/broker/src/asapo_broker/utils/status_codes.go +++ b/broker/src/asapo_broker/utils/status_codes.go @@ -10,5 +10,5 @@ const ( //error codes StatusError = http.StatusInternalServerError StatusWrongInput = http.StatusBadRequest - StatusNoData = http.StatusNoContent + StatusNoData = http.StatusNotFound ) diff --git a/common/cpp/include/common/error.h b/common/cpp/include/common/error.h index efef43d16a6b7887e9347a98db407b5bb17f82d9..ee07360567adef4c64e4747b24e013509796bf59 100644 --- a/common/cpp/include/common/error.h +++ b/common/cpp/include/common/error.h @@ -17,7 +17,8 @@ enum class ErrorType { kProducerError, kMemoryAllocationError, - kEndOfFile + kEndOfFile, + kTimeOut }; class ErrorInterface; diff --git a/common/cpp/include/preprocessor/definitions.h b/common/cpp/include/preprocessor/definitions.h new file mode 100644 index 0000000000000000000000000000000000000000..f0f9768e9f25454503b3681962f6267df2c97541 --- /dev/null +++ b/common/cpp/include/preprocessor/definitions.h @@ -0,0 +1,11 @@ +#ifndef ASAPO_DEFINITIONS_H +#define ASAPO_DEFINITIONS_H + +#ifdef UNIT_TESTS +#define VIRTUAL virtual +#else +#define VIRTUAL +#endif + + +#endif //ASAPO_DEFINITIONS_H diff --git a/common/cpp/src/http_client/CMakeLists.txt b/common/cpp/src/http_client/CMakeLists.txt index bae54a5d735ceef0f4792df590ede2d9e94f2d34..58f8813c18de3f7f4e22a974349cdd6de5afcbf4 100644 --- a/common/cpp/src/http_client/CMakeLists.txt +++ b/common/cpp/src/http_client/CMakeLists.txt @@ -1,7 +1,7 @@ set(TARGET_NAME curl_http_client) set(SOURCE_FILES curl_http_client.cpp - http_client_factory.cpp) + http_client_factory.cpp ../../include/preprocessor/definitions.h) ################################ diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index f41aa7ef53fa9e0b8feb49f4e377b8ddef60c068..bb0f49bc1fba95076c6d3c51f92bc99acf31719b 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -43,12 +43,11 @@ void work(asapo::GenericRequestHeader header, asapo::Error err) { mutex.unlock(); } -bool SendDummyData(asapo::Producer* producer, size_t number_of_byte, uint64_t iterations) { - auto buffer = std::unique_ptr<uint8_t>(new uint8_t[number_of_byte]); +bool SendDummyData(asapo::Producer* producer, uint8_t* data, size_t number_of_byte, uint64_t iterations) { for(uint64_t i = 0; i < iterations; i++) { // std::cerr << "Send file " << i + 1 << "/" << iterations << std::endl; - auto err = producer->Send(i + 1, buffer.get(), number_of_byte, std::to_string(i), &work); + auto err = producer->Send(i + 1, data, number_of_byte, std::to_string(i), &work); if (err) { std::cerr << "Cannot send file: " << err << std::endl; return false; @@ -87,13 +86,18 @@ int main (int argc, char* argv[]) { producer->SetLogLevel(asapo::LogLevel::Debug); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + size_t number_of_byte = number_of_kbytes * 1024; + auto buffer = std::unique_ptr<uint8_t>(new uint8_t[number_of_byte]); + + if(err) { std::cerr << "Cannot start producer. ProducerError: " << err << std::endl; return EXIT_FAILURE; } high_resolution_clock::time_point t1 = high_resolution_clock::now(); - if(!SendDummyData(producer.get(), number_of_kbytes * 1024, iterations)) { + if(!SendDummyData(producer.get(), buffer.get(), number_of_byte, iterations)) { return EXIT_FAILURE; } diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp index 2422d106797d30a31f8b70fc02d30ee37610cd9f..d3628406e59ef237a8056da175b743a16dc39a58 100644 --- a/examples/worker/getnext_broker/getnext_broker.cpp +++ b/examples/worker/getnext_broker/getnext_broker.cpp @@ -20,10 +20,11 @@ void WaitThreads(std::vector<std::thread>* threads) { int ProcessError(const Error& err) { if (err == nullptr) return 0; - if (err->GetErrorType() != asapo::ErrorType::kEndOfFile) { + if (err->GetErrorType() != asapo::ErrorType::kTimeOut) { std::cout << err->Explain() << std::endl; return 1; } + std::cout << err->Explain() << std::endl; return 0; } diff --git a/producer/api/src/receiver_discovery_service.cpp b/producer/api/src/receiver_discovery_service.cpp index 62e6a7a354000acfec9166351f3c728279def75b..64e2f42272d11bcd0290463d36f4ad9aa0d1eaad 100644 --- a/producer/api/src/receiver_discovery_service.cpp +++ b/producer/api/src/receiver_discovery_service.cpp @@ -5,6 +5,7 @@ #include <iostream> #include <algorithm> +#include <numeric> namespace asapo { @@ -58,7 +59,9 @@ void ReceiverDiscoveryService::ThreadHandler() { lock.lock(); continue; } - log__->Debug("got receivers from " + endpoint_ ); + std::string s; + s = std::accumulate(std::begin(uris), std::end(uris), s); + log__->Debug("got receivers from " + endpoint_ + ":" + s); lock.lock(); max_connections_ = max_connections; uri_list_ = uris; diff --git a/producer/api/src/receiver_discovery_service.h b/producer/api/src/receiver_discovery_service.h index 255088bb3f5f6f0bb9d08c3e82f9b402efbf5da9..54b0ac09ca57eb01e399037667625014671861d8 100644 --- a/producer/api/src/receiver_discovery_service.h +++ b/producer/api/src/receiver_discovery_service.h @@ -10,10 +10,7 @@ #include "http_client/http_client.h" #include "logger/logger.h" - -#ifdef UNIT_TESTS -#define VIRTUAL virtual -#endif +#include "preprocessor/definitions.h" namespace asapo { diff --git a/producer/api/src/request_handler_factory.h b/producer/api/src/request_handler_factory.h index 199d0aa93a80fade27afb3a40d48c774460ea50b..066f0d1811d8229466792576c4e63db310ee9cda 100644 --- a/producer/api/src/request_handler_factory.h +++ b/producer/api/src/request_handler_factory.h @@ -4,14 +4,10 @@ #include "request_handler.h" #include "receiver_discovery_service.h" +#include "preprocessor/definitions.h" namespace asapo { -#ifdef UNIT_TESTS -#define VIRTUAL virtual -#endif - - class RequestHandlerFactory { public: RequestHandlerFactory(ReceiverDiscoveryService* discovery_service); diff --git a/producer/api/src/request_handler_tcp.cpp b/producer/api/src/request_handler_tcp.cpp index afbd2fd7593604ec87946a4dc7a1096a802c0938..4708c5acb1c261c9dc7d0eb7af5d591f096eac67 100644 --- a/producer/api/src/request_handler_tcp.cpp +++ b/producer/api/src/request_handler_tcp.cpp @@ -18,7 +18,7 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& receiver_address) Error err; sd_ = io__->CreateAndConnectIPTCPSocket(receiver_address, &err); if(err != nullptr) { - //log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain()); + log__->Debug("cannot connect to receiver at " + receiver_address + " - " + err->Explain()); return err; } log__->Info("connected to receiver at " + receiver_address); @@ -68,7 +68,8 @@ Error RequestHandlerTcp::TrySendToReceiver(const Request* request, const std::st return err; } - log__->Debug("successfully sent data to " + receiver_address); + log__->Debug(std::string("successfully sent data ") + " id: " + std::to_string(request->header.data_id) + " to " + + receiver_address); return nullptr; } diff --git a/producer/api/src/request_pool.h b/producer/api/src/request_pool.h index bfe0a9018cf5364bd1a11c2452b8d9b074b075b4..4fac37bbc12f5bc34f7746ff5784ea5a07784fae 100644 --- a/producer/api/src/request_pool.h +++ b/producer/api/src/request_pool.h @@ -13,10 +13,7 @@ #include "request_handler_tcp.h" #include "request_handler_factory.h" -#ifdef UNIT_TESTS -#define VIRTUAL virtual -#endif - +#include "preprocessor/definitions.h" namespace asapo { diff --git a/producer/api/unittests/test_request_handler_tcp.cpp b/producer/api/unittests/test_request_handler_tcp.cpp index 440d41f59adc210ece79625aa3614e8d2d9cc3a2..7e8b3acb693525d21ee6b80724f1606218ec2bca 100644 --- a/producer/api/unittests/test_request_handler_tcp.cpp +++ b/producer/api/unittests/test_request_handler_tcp.cpp @@ -121,6 +121,12 @@ void RequestHandlerTcpTests::ExpectFailConnect(bool only_once) { testing::SetArgPointee<1>(asapo::IOErrorTemplates::kInvalidAddressFormat.Generate().release()), Return(asapo::kDisconnectedSocketDescriptor) )); + EXPECT_CALL(mock_logger, Debug(AllOf( + HasSubstr("cannot connect"), + HasSubstr(expected_address) + ) + )); + if (only_once) break; } diff --git a/receiver/src/connection.cpp b/receiver/src/connection.cpp index 678bef893b5856e1dd6717ad75cf6e2350fc151b..4c40b1742051f7be157c0997845441d8fa4156ea 100644 --- a/receiver/src/connection.cpp +++ b/receiver/src/connection.cpp @@ -11,13 +11,14 @@ namespace asapo { size_t Connection::kRequestHandlerMaxBufferSize; std::atomic<uint32_t> Connection::kNetworkProducerPeerImplGlobalCounter(0); -Connection::Connection(SocketDescriptor socket_fd, const std::string& address,std::string receiver_tag): request_factory__{new RequestFactory}, +Connection::Connection(SocketDescriptor socket_fd, const std::string& address, + std::string receiver_tag): request_factory__{new RequestFactory}, io__{GenerateDefaultIO()}, statistics__{new Statistics}, log__{GetDefaultReceiverLogger()} { socket_fd_ = socket_fd; connection_id_ = kNetworkProducerPeerImplGlobalCounter++; address_ = address; - statistics__->AddTag("connection_from",address); - statistics__->AddTag("receiver_tag",std::move(receiver_tag)); + statistics__->AddTag("connection_from", address); + statistics__->AddTag("receiver_tag", std::move(receiver_tag)); } diff --git a/receiver/src/connection.h b/receiver/src/connection.h index bde002ae6eebfaecdff4e36afec09bd6c9b01c3d..3cb73a789bec8055e997028fe37d18e384bd3efe 100644 --- a/receiver/src/connection.h +++ b/receiver/src/connection.h @@ -29,7 +29,7 @@ class Connection { static size_t kRequestHandlerMaxBufferSize; static std::atomic<uint32_t> kNetworkProducerPeerImplGlobalCounter; - Connection(SocketDescriptor socket_fd, const std::string& address,std::string receiver_tag); + Connection(SocketDescriptor socket_fd, const std::string& address, std::string receiver_tag); ~Connection() = default; void Listen() const noexcept; diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index eba2b9ab3a20bb270ddf6345981b3c39730734d2..9a94afe45df43c47522ce68d850883976a6ad4ec 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -56,7 +56,7 @@ void Receiver::ProcessConnections(Error* err) { void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address) { log__->Info("new connection from " + address); auto thread = io__->NewThread([connection_socket_fd, address] { - auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address,GetReceiverConfig()->tag)); + auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, GetReceiverConfig()->tag)); connection->Listen(); }); diff --git a/receiver/src/receiver_config.cpp b/receiver/src/receiver_config.cpp index e058b92d753ba307451133aa27929152909c9a1e..b487c02cd317477e83b4dfdafd9136423aedaabb 100644 --- a/receiver/src/receiver_config.cpp +++ b/receiver/src/receiver_config.cpp @@ -21,9 +21,9 @@ Error ReceiverConfigFactory::SetConfigFromFile(std::string file_name) { (err = parser.GetBool("WriteToDb", &config.write_to_db)) || (err = parser.GetString("BrokerDbAddress", &config.broker_db_uri)) || (err = parser.GetString("BrokerDbName", &config.broker_db_name)) || - (err = parser.GetString("Tag", &config.tag)) || + (err = parser.GetString("Tag", &config.tag)) || - (err = parser.GetString("MonitorDbName", &config.monitor_db_name)); + (err = parser.GetString("MonitorDbName", &config.monitor_db_name)); (err = parser.GetString("LogLevel", &log_level)); if (err) { return err; diff --git a/receiver/src/statistics.cpp b/receiver/src/statistics.cpp index 4cb2a4c1b84e0c05677b9d7a09e9462d5474066e..f2c431e81a4425c99e54f3d766c1d95a04db9af0 100644 --- a/receiver/src/statistics.cpp +++ b/receiver/src/statistics.cpp @@ -77,7 +77,7 @@ void Statistics::StopTimer() noexcept { time_counters_[current_statistic_entity_] += elapsed; } -void Statistics::AddTag(const std::string &name, const std::string &value) noexcept { +void Statistics::AddTag(const std::string& name, const std::string& value) noexcept { if (!tag_.empty()) { tag_ += ","; } diff --git a/receiver/src/statistics.h b/receiver/src/statistics.h index c52324c230ee79e8b73642ed176dcfe8b0963c05..bbe409dff521dbbc167c5185cb222af03df64ebb 100644 --- a/receiver/src/statistics.h +++ b/receiver/src/statistics.h @@ -7,6 +7,7 @@ #include "statistics_sender.h" +#include "preprocessor/definitions.h" namespace asapo { @@ -25,10 +26,6 @@ struct StatisticsToSend { std::string tags; }; -#ifdef UNIT_TESTS -#define VIRTUAL virtual -#endif - class Statistics { public: VIRTUAL void SendIfNeeded() noexcept; @@ -38,10 +35,10 @@ class Statistics { VIRTUAL void StartTimer(const StatisticEntity& entity) noexcept; VIRTUAL void IncreaseRequestDataVolume(uint64_t transferred_data_volume) noexcept; VIRTUAL void StopTimer() noexcept; - VIRTUAL void AddTag(const std::string& name,const std::string& value) noexcept; + VIRTUAL void AddTag(const std::string& name, const std::string& value) noexcept; - void SetWriteInterval(uint64_t interval_ms); + void SetWriteInterval(uint64_t interval_ms); std::unique_ptr<StatisticsSender> statistics_sender__; private: uint64_t GetElapsedMs(StatisticEntity entity) const noexcept; diff --git a/receiver/unittests/test_connection.cpp b/receiver/unittests/test_connection.cpp index 66dfdf13e75c95ebccdd0d9e981ee6a6c8d5f237..af7d83535f771910921fdb508841a6ff6af86894 100644 --- a/receiver/unittests/test_connection.cpp +++ b/receiver/unittests/test_connection.cpp @@ -46,7 +46,7 @@ using asapo::MockStatistics; namespace { TEST(Connection, Constructor) { - Connection connection{0, "some_address","some_tag"}; + Connection connection{0, "some_address", "some_tag"}; ASSERT_THAT(dynamic_cast<asapo::Statistics*>(connection.statistics__.get()), Ne(nullptr)); ASSERT_THAT(dynamic_cast<asapo::IO*>(connection.io__.get()), Ne(nullptr)); @@ -85,7 +85,7 @@ class MockRequestFactory: public asapo::RequestFactory { class ConnectionTests : public Test { public: std::string connected_uri{"some_address"}; - Connection connection{0, connected_uri,"some_tag"}; + Connection connection{0, connected_uri, "some_tag"}; MockIO mock_io; MockRequestFactory mock_factory; NiceMock<MockStatistics> mock_statictics; diff --git a/receiver/unittests/test_statistics.cpp b/receiver/unittests/test_statistics.cpp index 9a011cc68d79d1076d5baf6c5bfc38593e2f067e..59cace0fe901f15abaf4a45681614aa2e6883ca4 100644 --- a/receiver/unittests/test_statistics.cpp +++ b/receiver/unittests/test_statistics.cpp @@ -94,7 +94,7 @@ TEST_F(StatisticTests, IncreaseRequestCounter) { } TEST_F(StatisticTests, AddTag) { - statistics.AddTag("name","value"); + statistics.AddTag("name", "value"); auto stat = ExtractStat(); @@ -102,8 +102,8 @@ TEST_F(StatisticTests, AddTag) { } TEST_F(StatisticTests, AddTagTwice) { - statistics.AddTag("name1","value1"); - statistics.AddTag("name2","value2"); + statistics.AddTag("name1", "value1"); + statistics.AddTag("name2", "value2"); auto stat = ExtractStat(); diff --git a/tests/automatic/broker/get_next/check_linux.sh b/tests/automatic/broker/get_next/check_linux.sh index e23ec0fbe01dba7db0c7b5362029abcc8bd7fb3c..8c2da0557ee06fc479e37bb9d5c4498a5212640a 100644 --- a/tests/automatic/broker/get_next/check_linux.sh +++ b/tests/automatic/broker/get_next/check_linux.sh @@ -23,4 +23,4 @@ brokerid=`echo $!` curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":1' curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep '"_id":2' -curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep "No Content" +curl -v --silent 127.0.0.1:5005/database/data/next --stderr - | grep "Not Found" diff --git a/tests/automatic/broker/get_next/check_windows.bat b/tests/automatic/broker/get_next/check_windows.bat index 9ba1a0810a2682abe9c575313993df5ca794dac3..026563bea3cc4fea8d233d23ca70cf80c7280aca 100644 --- a/tests/automatic/broker/get_next/check_windows.bat +++ b/tests/automatic/broker/get_next/check_windows.bat @@ -13,7 +13,7 @@ ping 1.0.0.0 -n 1 -w 100 > nul C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr \"_id\":1 || goto :error C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr \"_id\":2 || goto :error -C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr "No Content" || goto :error +C:\Curl\curl.exe -v --silent 127.0.0.1:5005/database/data/next --stderr - | findstr "Not Found" || goto :error goto :clean diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh index 8640f1fa5ae1c4792a1f15513024ae58fef2c55e..b3ab88bb1696e64c916dfe0a9fb8aed8bc3d1b4c 100644 --- a/tests/automatic/full_chain/simple_chain/check_linux.sh +++ b/tests/automatic/full_chain/simple_chain/check_linux.sh @@ -9,16 +9,18 @@ monitor_database_name=db_test broker_address=127.0.0.1:5005 Cleanup() { - echo cleanup - rm -rf files + echo cleanup + rm -rf files nomad stop receiver nomad stop discovery nomad stop broker +# kill $producerid echo "db.dropDatabase()" | mongo ${broker_database_name} influx -execute "drop database ${monitor_database_name}" } influx -execute "create database ${monitor_database_name}" +echo "db.${broker_database_name}.insert({dummy:1})" | mongo ${broker_database_name} nomad run receiver.nmd nomad run discovery.nmd @@ -29,9 +31,8 @@ sleep 1 #producer mkdir files -$1 localhost:5006 100 100 4 0 & +$1 localhost:5006 100 1000 4 0 & +#producerid=`echo $!` -#producerrid=`echo $!` -sleep 0.1 -$2 ${broker_address} ${broker_database_name} 2 | grep "Processed 100 file(s)" +$2 ${broker_address} ${broker_database_name} 2 #| grep "Processed 1000 file(s)" diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh index 07334882f797f3a8e27337fa305973e2469b1c3e..9bc7f7b15a5fb2f30e41fa680514e04c9ab0d6b3 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh @@ -14,9 +14,11 @@ Cleanup() { nomad stop discovery echo "db.dropDatabase()" | mongo ${mongo_database_name} influx -execute "drop database ${database_name}" - } +influx -execute "create database ${database_name}" +echo "db.${mongo_database_name}.insert({dummy:1})" | mongo ${mongo_database_name} + nomad run receiver.nmd nomad run discovery.nmd diff --git a/tests/manual/performance_full_chain_simple/broker.json b/tests/manual/performance_full_chain_simple/broker.json index 31aef140d3ed64a122684062b159008e1b52a372..a2c1a4a5ab7238e14c26667e5bfc7335e935d96d 100644 --- a/tests/manual/performance_full_chain_simple/broker.json +++ b/tests/manual/performance_full_chain_simple/broker.json @@ -3,5 +3,5 @@ "MonitorDbAddress": "localhost:8086", "MonitorDbName": "db_test", "port":5005, - "LogLevel":"debug" + "LogLevel":"info" } \ No newline at end of file diff --git a/tests/manual/performance_full_chain_simple/discovery.json b/tests/manual/performance_full_chain_simple/discovery.json new file mode 100644 index 0000000000000000000000000000000000000000..476f732bbedad31adc0e4ce4fbbee1ca081cc025 --- /dev/null +++ b/tests/manual/performance_full_chain_simple/discovery.json @@ -0,0 +1,7 @@ +{ + "MaxConnections": 32, + "Mode": "static", + "Endpoints":["localhost:4200"], + "Port":5006, + "LogLevel":"info" +} \ No newline at end of file diff --git a/tests/manual/performance_full_chain_simple/receiver.json b/tests/manual/performance_full_chain_simple/receiver.json index 5330d75a7380acc0c09b321e87bb6bf1253d962e..7cf0d85c122d91a081050a4c7fe648618e84f841 100644 --- a/tests/manual/performance_full_chain_simple/receiver.json +++ b/tests/manual/performance_full_chain_simple/receiver.json @@ -6,5 +6,6 @@ "ListenPort":4200, "WriteToDisk":true, "WriteToDb":true, - "LogLevel":"debug" -} \ No newline at end of file + "LogLevel":"info", + "Tag": "test_receiver" +} diff --git a/tests/manual/performance_full_chain_simple/test.sh b/tests/manual/performance_full_chain_simple/test.sh index 4cece2b86f0e04b0cd4a0191eb8f5e8f53ccf68f..08039120da48086f3e13dfa12fadc91564ecccb1 100755 --- a/tests/manual/performance_full_chain_simple/test.sh +++ b/tests/manual/performance_full_chain_simple/test.sh @@ -9,6 +9,7 @@ Cleanup() { set +e ssh ${receiver_node} rm -f ${receiver_dir}/files/* ssh ${receiver_node} killall receiver +ssh ${receiver_node} killall asapo-discovery ssh ${broker_node} killall asapo-broker ssh ${broker_node} docker rm -f -v mongo } @@ -24,16 +25,14 @@ log_dir=~/fullchain_tests/logs # starts receiver on $receiver_node # runs producer with various file sizes from $producer_node and measures performance -file_size=1000 +file_size=100 file_num=$((10000000 / $file_size)) echo filesize: ${file_size}K, filenum: $file_num # receiver_setup receiver_node=max-wgs -receiver_ip=`resolveip -s ${receiver_node}` receiver_port=4201 receiver_dir=/gpfs/petra3/scratch/yakubov/receiver_tests -ssh ${receiver_node} mkdir -p ${receiver_dir}/logs ssh ${receiver_node} mkdir -p ${receiver_dir}/files scp ../../../cmake-build-release/receiver/receiver ${receiver_node}:${receiver_dir} cat receiver.json | @@ -45,14 +44,36 @@ cat receiver.json | else . end ) | - from_entries" > settings_tmp.json -scp settings_tmp.json ${receiver_node}:${receiver_dir}/settings.json + from_entries" > receiver_tmp.json + +scp receiver_tmp.json ${receiver_node}:${receiver_dir}/receiver.json +rm receiver_tmp.json + + +# discovery_setup +discovery_port=5006 +cat discovery.json | + jq "to_entries | + map(if .key == \"Port\" + then . + {value:${discovery_port}} + elif .key == \"Endpoints\" + then . + {value:[\"${receiver_node}:${receiver_port}\"]} + else . + end + ) | + from_entries" > discovery_tmp.json +scp ../../../cmake-build-release/discovery/asapo-discovery ${receiver_node}:${receiver_dir} +scp discovery_tmp.json ${receiver_node}:${receiver_dir}/discovery.json +discovery_ip=`resolveip -s ${receiver_node}` +rm discovery_tmp.json #producer_setup producer_node=max-display001 #producer_node=max-wgs producer_dir=~/fullchain_tests scp ../../../cmake-build-release/examples/producer/dummy-data-producer/dummy-data-producer ${producer_node}:${producer_dir} +producer_nthreads=8 + #broker_setup broker_node=max-wgs @@ -84,8 +105,12 @@ ssh ${monitor_node} influx -execute \"create database db_test\" #mongo_start ssh ${broker_node} docker run -d -p 27017:27017 --name mongo mongo +#discovery_start +ssh ${receiver_node} "bash -c 'cd ${receiver_dir}; nohup ./asapo-discovery -config discovery.json &> ${log_dir}/discovery.log &'" +sleep 0.3 + #receiver_start -ssh ${receiver_node} "bash -c 'cd ${receiver_dir}; nohup ./receiver settings.json &> ${log_dir}/log.receiver &'" +ssh ${receiver_node} "bash -c 'cd ${receiver_dir}; nohup ./receiver receiver.json &> ${log_dir}/log.receiver &'" sleep 0.3 #broker_start @@ -93,8 +118,9 @@ ssh ${broker_node} "bash -c 'cd ${broker_dir}; nohup ./asapo-broker -config brok sleep 0.3 #producer_start -ssh ${producer_node} "bash -c 'cd ${producer_dir}; nohup ./dummy-data-producer ${receiver_ip}:${receiver_port} ${file_size} ${file_num} &> ${producer_dir}/producer.log &'" -sleep 0.3 +ssh ${producer_node} "bash -c 'cd ${producer_dir}; nohup ./dummy-data-producer ${discovery_ip}:${discovery_port} ${file_size} ${file_num} ${producer_nthreads} 0 &> ${log_dir}/producer.log &'" + +sleep 1 #worker_start ssh ${worker_node} ${worker_dir}/getnext_broker ${broker_node}:5005 test_run ${nthreads} diff --git a/tests/manual/performance_producer_receiver/discovery.json b/tests/manual/performance_producer_receiver/discovery.json new file mode 100644 index 0000000000000000000000000000000000000000..476f732bbedad31adc0e4ce4fbbee1ca081cc025 --- /dev/null +++ b/tests/manual/performance_producer_receiver/discovery.json @@ -0,0 +1,7 @@ +{ + "MaxConnections": 32, + "Mode": "static", + "Endpoints":["localhost:4200"], + "Port":5006, + "LogLevel":"info" +} \ No newline at end of file diff --git a/tests/manual/performance_producer_receiver/receiver.json b/tests/manual/performance_producer_receiver/receiver.json index dcb3879550833d7552374b0cc6ddf4f53fe166a2..7cf0d85c122d91a081050a4c7fe648618e84f841 100644 --- a/tests/manual/performance_producer_receiver/receiver.json +++ b/tests/manual/performance_producer_receiver/receiver.json @@ -6,5 +6,6 @@ "ListenPort":4200, "WriteToDisk":true, "WriteToDb":true, - "LogLevel":"info" + "LogLevel":"info", + "Tag": "test_receiver" } diff --git a/tests/manual/performance_producer_receiver/settings_tmp.json b/tests/manual/performance_producer_receiver/settings_tmp.json deleted file mode 100644 index 7b5460157dced99ec9b24104f48b482173cc6d47..0000000000000000000000000000000000000000 --- a/tests/manual/performance_producer_receiver/settings_tmp.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "MonitorDbAddress": "zitpcx27016:8086", - "MonitorDbName": "db_test", - "BrokerDbAddress": "localhost:27017", - "BrokerDbName": "test_run", - "ListenPort": 4201, - "WriteToDisk": true, - "WriteToDb": true -} diff --git a/tests/manual/performance_producer_receiver/test.sh b/tests/manual/performance_producer_receiver/test.sh index 862ae2ec85bafd6df51f610336c10aaa2855e7bc..83a52730d310e50cd533cb3fe85475cef4a8a391 100755 --- a/tests/manual/performance_producer_receiver/test.sh +++ b/tests/manual/performance_producer_receiver/test.sh @@ -2,13 +2,17 @@ set -e +trap Cleanup EXIT + + # starts receiver on $service_node # runs producer with various file sizes from $worker_node and measures performance # a working directory service_node=max-wgs service_ip=`resolveip -s ${service_node}` -service_port=4201 +discovery_port=5006 +receiver_port=4201 monitor_node=zitpcx27016 monitor_port=8086 @@ -28,6 +32,7 @@ ssh ${service_node} mkdir -p ${service_dir}/files ssh ${worker_node} mkdir -p ${worker_dir} scp ../../../cmake-build-release/receiver/receiver ${service_node}:${service_dir} +scp ../../../cmake-build-release/discovery/asapo-discovery ${service_node}:${service_dir} scp ../../../cmake-build-release/examples/producer/dummy-data-producer/dummy-data-producer ${worker_node}:${worker_dir} function do_work { @@ -36,25 +41,41 @@ cat receiver.json | map(if .key == \"MonitorDbAddress\" then . + {value:\"${monitor_node}:${monitor_port}\"} elif .key == \"ListenPort\" - then . + {value:${service_port}} + then . + {value:${receiver_port}} elif .key == \"WriteToDisk\" then . + {value:$1} else . end ) | - from_entries" > settings_tmp.json -scp settings_tmp.json ${service_node}:${service_dir}/settings.json -ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./receiver settings.json &> ${service_dir}/receiver.log &'" + from_entries" > receiver_tmp.json +cat discovery.json | + jq "to_entries | + map(if .key == \"Port\" + then . + {value:${discovery_port}} + elif .key == \"Endpoints\" + then . + {value:[\"${service_node}:${receiver_port}\"]} + else . + end + ) | + from_entries" > discovery_tmp.json + +scp discovery_tmp.json ${service_node}:${service_dir}/discovery.json +scp receiver_tmp.json ${service_node}:${service_dir}/receiver.json +rm discovery_tmp.json receiver_tmp.json +ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./receiver receiver.json &> ${service_dir}/receiver.log &'" +ssh ${service_node} "bash -c 'cd ${service_dir}; nohup ./asapo-discovery -config discovery.json &> ${service_dir}/discovery.log &'" + sleep 0.3 for size in 100 1000 10000 do ssh ${service_node} docker run -d -p 27017:27017 --name mongo mongo echo =================================================================== -ssh ${worker_node} ${worker_dir}/dummy-data-producer ${service_ip}:${service_port} ${size} 1000 +ssh ${worker_node} ${worker_dir}/dummy-data-producer ${service_ip}:${discovery_port} ${size} 1000 8 0 ssh ${service_node} rm -f ${service_dir}/files/* ssh ${service_node} docker rm -f -v mongo done ssh ${service_node} killall receiver +ssh ${service_node} killall asapo-discovery } echo diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index e00a2daa8a2e7650b701fa0b1c23d1c66a7dc8ef..3db20c6660efb2df45cc8bc13ab256bc6f300737 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -2,13 +2,14 @@ #include <chrono> +#include <json_parser/json_parser.h> + #include "io/io_factory.h" #include "http_client/http_error.h" using std::chrono::high_resolution_clock; - namespace asapo { Error HttpCodeToWorkerError(const HttpCode& code) { @@ -22,12 +23,9 @@ Error HttpCodeToWorkerError(const HttpCode& code) { case HttpCode::InternalServerError: message = WorkerErrorMessage::kErrorReadingSource; break; - case HttpCode::NoContent: + case HttpCode::NotFound: message = WorkerErrorMessage::kNoData; return TextErrorWithType(message, ErrorType::kEndOfFile); - case HttpCode::NotFound: - message = WorkerErrorMessage::kSourceNotFound; - break; default: message = WorkerErrorMessage::kErrorReadingSource; break; @@ -35,10 +33,8 @@ Error HttpCodeToWorkerError(const HttpCode& code) { return Error{new HttpError(message, code)}; } - - ServerDataBroker::ServerDataBroker(const std::string& server_uri, - const std::string& source_name): + const std::string& source_name) : io__{GenerateDefaultIO()}, httpclient__{DefaultHttpClient()}, server_uri_{server_uri}, source_name_{source_name} { } @@ -51,11 +47,19 @@ void ServerDataBroker::SetTimeout(uint64_t timeout_ms) { timeout_ms_ = timeout_ms; } +std::string GetIDFromJson(const std::string& json_string, Error* err) { + JsonStringParser parser(json_string); + uint64_t id; + if ((*err = parser.GetUInt64("id", &id)) != nullptr) { + return ""; + } + return std::to_string(id); +} + Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, const std::string& operation) { - std::string full_uri = server_uri_ + "/database/" + source_name_ + "/" + operation; Error err; HttpCode code; - + std::string full_uri = server_uri_ + "/database/" + source_name_ + "/" + operation; std::string response; uint64_t elapsed_ms = 0; while (true) { @@ -68,11 +72,19 @@ Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, const std::string& if (err == nullptr) break; if (err->GetErrorType() != asapo::ErrorType::kEndOfFile) { err->Append(response); -// return err; + return err; + } else { + if (response.find("id") != std::string::npos) { + auto id = GetIDFromJson(response, &err); + if (err) { + return err; + } + full_uri = server_uri_ + "/database/" + source_name_ + "/" + id; + } } if (elapsed_ms >= timeout_ms_) { - err->Append("exit on timeout"); + err = TextErrorWithType("no more data found, exit on timeout", asapo::ErrorType::kTimeOut); return err; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -90,7 +102,7 @@ Error ServerDataBroker::GetNext(FileInfo* info, FileData* data) { return TextError(WorkerErrorMessage::kWrongInput); } - auto err = GetFileInfoFromServer(info, "next"); + auto err = GetFileInfoFromServer(info, "next"); if (err != nullptr) { return err; } diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 2424eb9cd9ad48f5d85a2557413cef1b6966ceab..28e2caef439d0a720983785b64dba9575e5db300 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -92,42 +92,46 @@ TEST_F(ServerDataBrokerTests, GetNextUsesCorrectUri) { data_broker->GetNext(&info, nullptr); } -TEST_F(ServerDataBrokerTests, GetNextReturnsErrorFromHttpClient) { - EXPECT_CALL(mock_http_client, Get_t(_, _, _)).WillOnce(DoAll( + +TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( SetArgPointee<1>(HttpCode::NotFound), SetArgPointee<2>(nullptr), - Return(""))); + Return("{\"id\":1}"))); auto err = data_broker->GetNext(&info, nullptr); - ASSERT_THAT(err->Explain(), HasSubstr(asapo::WorkerErrorMessage::kSourceNotFound)); - ASSERT_THAT(err->GetErrorType(), asapo::ErrorType::kHttpError); - ASSERT_THAT(dynamic_cast<HttpError*>(err.get())->GetCode(), Eq(HttpCode::NotFound)); + ASSERT_THAT(err->Explain(), HasSubstr("timeout")); } -TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) { - EXPECT_CALL(mock_http_client, Get_t(_, _, _)).WillOnce(DoAll( - SetArgPointee<1>(HttpCode::NoContent), +TEST_F(ServerDataBrokerTests, GetNextReturnsWrongResponseFromHttpClient) { + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::NotFound), SetArgPointee<2>(nullptr), - Return(""))); + Return("id"))); auto err = data_broker->GetNext(&info, nullptr); - ASSERT_THAT(err->Explain(), HasSubstr(asapo::WorkerErrorMessage::kNoData)); - ASSERT_THAT(err->GetErrorType(), asapo::ErrorType::kEndOfFile); + ASSERT_THAT(err->Explain(), HasSubstr("Cannot parse")); } + TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClientUntilTimeout) { - EXPECT_CALL(mock_http_client, Get_t(_, _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll( - SetArgPointee<1>(HttpCode::NoContent), + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll( + SetArgPointee<1>(HttpCode::NotFound), SetArgPointee<2>(nullptr), - Return(""))); + Return("{\"id\":1}"))); + + EXPECT_CALL(mock_http_client, Get_t(HasSubstr("1"), _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::NotFound), + SetArgPointee<2>(nullptr), + Return("{\"id\":1}"))); + data_broker->SetTimeout(100); auto err = data_broker->GetNext(&info, nullptr); - ASSERT_THAT(err->Explain(), HasSubstr(asapo::WorkerErrorMessage::kNoData)); - ASSERT_THAT(err->GetErrorType(), asapo::ErrorType::kEndOfFile); + ASSERT_THAT(err->Explain(), HasSubstr("timeout")); }