From cbbb95e3edc798f42a1679131b037b44235094a1 Mon Sep 17 00:00:00 2001 From: Sergey Yakubov <sergey.yakubov@desy.de> Date: Mon, 23 Apr 2018 18:16:32 +0200 Subject: [PATCH] full chain, test, checkl max_id --- 3d_party/rapidjson/include/rapidjson/reader.h | 6 +- broker/src/hidra2_broker/database/mongodb.go | 67 +++++++++++++--- .../hidra2_broker/database/mongodb_test.go | 5 +- common/cpp/include/common/error.h | 8 +- common/cpp/include/common/io_error.h | 80 +++++++++---------- .../dummy_data_producer.cpp | 2 +- examples/worker/getnext_broker/CMakeLists.txt | 1 + .../worker/getnext_broker/getnext_broker.cpp | 1 + .../api/include/producer/producer_error.h | 20 ++--- receiver/src/receiver_error.h | 8 +- tests/automatic/CMakeLists.txt | 4 +- .../automatic/broker/get_next/check_linux_.sh | 7 ++ tests/automatic/full_chain/CMakeLists.txt | 1 + .../full_chain/simple_chain/CMakeLists.txt | 9 +++ .../full_chain/simple_chain/check_linux.sh | 41 ++++++++++ .../transfer_single_file/check_linux.sh | 2 +- .../transfer_single_file/check_windows.bat | 2 +- worker/api/cpp/include/worker/data_broker.h | 2 + worker/api/cpp/src/folder_data_broker.h | 2 +- worker/api/cpp/src/server_data_broker.cpp | 42 +++++++--- worker/api/cpp/src/server_data_broker.h | 2 + .../api/cpp/unittests/test_server_broker.cpp | 14 ++++ 22 files changed, 236 insertions(+), 90 deletions(-) create mode 100755 tests/automatic/broker/get_next/check_linux_.sh create mode 100644 tests/automatic/full_chain/CMakeLists.txt create mode 100644 tests/automatic/full_chain/simple_chain/CMakeLists.txt create mode 100644 tests/automatic/full_chain/simple_chain/check_linux.sh diff --git a/3d_party/rapidjson/include/rapidjson/reader.h b/3d_party/rapidjson/include/rapidjson/reader.h index 9b28b2685..206b26141 100644 --- a/3d_party/rapidjson/include/rapidjson/reader.h +++ b/3d_party/rapidjson/include/rapidjson/reader.h @@ -1223,7 +1223,8 @@ class GenericReader { } i = i * 10 + static_cast<unsigned>(s.TakePush() - '0'); significandDigit++; - } else + } + else while (RAPIDJSON_LIKELY(s.Peek() >= '0' && s.Peek() <= '9')) { if (RAPIDJSON_UNLIKELY(i >= 429496729)) { // 2^32 - 1 = 4294967295 if (RAPIDJSON_LIKELY(i != 429496729 || s.Peek() > '5')) { @@ -1264,7 +1265,8 @@ class GenericReader { } i64 = i64 * 10 + static_cast<unsigned>(s.TakePush() - '0'); significandDigit++; - } else + } + else while (RAPIDJSON_LIKELY(s.Peek() >= '0' && s.Peek() <= '9')) { if (RAPIDJSON_UNLIKELY(i64 >= RAPIDJSON_UINT64_C2(0x19999999, 0x99999999))) // 2^64 - 1 = 18446744073709551615 if (RAPIDJSON_LIKELY(i64 != RAPIDJSON_UINT64_C2(0x19999999, 0x99999999) || s.Peek() > '5')) { diff --git a/broker/src/hidra2_broker/database/mongodb.go b/broker/src/hidra2_broker/database/mongodb.go index c6e54099e..2942d4335 100644 --- a/broker/src/hidra2_broker/database/mongodb.go +++ b/broker/src/hidra2_broker/database/mongodb.go @@ -19,19 +19,21 @@ const data_collection_name = "data" const pointer_collection_name = "current_location" const pointer_field_name = "current_pointer" const no_session_msg = "database session not created" +const wrong_id_type = "wrong id type" const already_connected_msg = "already connected" type Mongodb struct { - main_session *mgo.Session - timeout time.Duration - databases []string + main_session *mgo.Session + timeout time.Duration + databases []string + db_pointers_created map[string]bool } func (db *Mongodb) Copy() Agent { - new_db:= new(Mongodb) + new_db := new(Mongodb) new_db.main_session = db.main_session.Copy() - new_db.databases = make([]string,len(db.databases)) - copy(new_db.databases,db.databases) + new_db.databases = make([]string, len(db.databases)) + copy(new_db.databases, db.databases) return new_db } @@ -102,16 +104,41 @@ func (db *Mongodb) InsertRecord(dbname string, s interface{}) error { return c.Insert(s) } -func (db *Mongodb) incrementField(dbname string, res interface{}) (err error) { +func (db *Mongodb) getMaxIndex(dbname string) (max_id int, err error) { + c := db.main_session.DB(dbname).C(data_collection_name) + var id Pointer + err = c.Find(nil).Sort("-_id").Select(bson.M{"_id": 1}).One(&id) + if err != nil { + return 0, nil + } + return id.ID, nil +} + +func (db *Mongodb) createField(dbname string) (err error) { change := mgo.Change{ - Update: bson.M{"$inc": bson.M{pointer_field_name: 1}}, - Upsert: true, - ReturnNew: true, + Update: bson.M{"$inc": bson.M{pointer_field_name: 0}}, + Upsert: true, } q := bson.M{"_id": 0} c := db.main_session.DB(dbname).C(pointer_collection_name) - _, err = c.Find(q).Apply(change, res) + var res map[string]interface{} + _, err = c.Find(q).Apply(change, &res) + return err +} +func (db *Mongodb) incrementField(dbname string, max_ind int, res interface{}) (err error) { + update := bson.M{"$inc": bson.M{pointer_field_name: 1}} + change := mgo.Change{ + Update: update, + Upsert: false, + ReturnNew: true, + } + q := bson.M{"$and": []bson.M{{"_id": 0}, {pointer_field_name: bson.M{"$lt": max_ind}}}} + c := db.main_session.DB(dbname).C(pointer_collection_name) + _, err = c.Find(q).Apply(change, res) + if err == mgo.ErrNotFound { + return &DBError{utils.StatusNoData, err.Error()} + } return err } @@ -121,7 +148,7 @@ func (db *Mongodb) getRecordByID(dbname string, id int) (interface{}, error) { c := db.main_session.DB(dbname).C(data_collection_name) err := c.Find(q).One(&res) if err == mgo.ErrNotFound { - return nil, &DBError{utils.StatusNoData, err.Error()} + return nil, &DBError{utils.StatusWrongInput, err.Error()} } return &res, err } @@ -134,12 +161,25 @@ func (db *Mongodb) checkDatabaseOperationPrerequisites(db_name string) error { if err := db.dataBaseExist(db_name); err != nil { return &DBError{utils.StatusWrongInput, err.Error()} } + + if !db.db_pointers_created[db_name] { + if db.db_pointers_created == nil { + db.db_pointers_created = make(map[string]bool) + } + db.db_pointers_created[db_name] = true + db.createField(db_name) + } + return nil } func (db *Mongodb) getCurrentPointer(db_name string) (Pointer, error) { + max_ind, err := db.getMaxIndex(db_name) + if err != nil { + return Pointer{}, err + } var curPointer Pointer - err := db.incrementField(db_name, &curPointer) + err = db.incrementField(db_name, max_ind, &curPointer) if err != nil { return Pointer{}, err } @@ -148,6 +188,7 @@ func (db *Mongodb) getCurrentPointer(db_name string) (Pointer, error) { } func (db *Mongodb) GetNextRecord(db_name string) ([]byte, error) { + if err := db.checkDatabaseOperationPrerequisites(db_name); err != nil { return nil, err } diff --git a/broker/src/hidra2_broker/database/mongodb_test.go b/broker/src/hidra2_broker/database/mongodb_test.go index 484e2020f..dcad1b703 100644 --- a/broker/src/hidra2_broker/database/mongodb_test.go +++ b/broker/src/hidra2_broker/database/mongodb_test.go @@ -27,6 +27,7 @@ var rec2_expect, _ = json.Marshal(rec2) func cleanup() { db.DeleteAllRecords(dbname) + db.db_pointers_created = nil db.Close() } @@ -59,10 +60,8 @@ func TestMongoDBGetNextErrorWhenWrongDatabasename(t *testing.T) { func TestMongoDBGetNextErrorWhenEmptyCollection(t *testing.T) { db.Connect(dbaddress) + db.databases = append(db.databases, dbname) defer cleanup() - var curPointer Pointer - db.incrementField(dbname, &curPointer) - _, err := db.GetNextRecord(dbname) assert.Equal(t, utils.StatusNoData, err.(*DBError).Code) } diff --git a/common/cpp/include/common/error.h b/common/cpp/include/common/error.h index 71c32b6e8..e4602afd5 100644 --- a/common/cpp/include/common/error.h +++ b/common/cpp/include/common/error.h @@ -146,11 +146,11 @@ inline Error TextErrorWithType(const std::string& error, ErrorType error_type) { namespace ErrorTemplates { auto const kMemoryAllocationError = SimpleErrorTemplate { - "kMemoryAllocationError", ErrorType::kMemoryAllocationError - }; + "kMemoryAllocationError", ErrorType::kMemoryAllocationError +}; auto const kEndOfFile = SimpleErrorTemplate { - "End of file", ErrorType::kEndOfFile - }; + "End of file", ErrorType::kEndOfFile +}; } diff --git a/common/cpp/include/common/io_error.h b/common/cpp/include/common/io_error.h index db3606b3b..40c9d4dd7 100644 --- a/common/cpp/include/common/io_error.h +++ b/common/cpp/include/common/io_error.h @@ -72,68 +72,68 @@ static inline std::ostream& operator<<(std::ostream& os, const IOErrorTemplate& namespace IOErrorTemplates { auto const kUnknownIOError = IOErrorTemplate { - "Unknown Error", IOErrorType::kUnknownIOError - }; + "Unknown Error", IOErrorType::kUnknownIOError +}; auto const kFileNotFound = IOErrorTemplate { - "No such file or directory", IOErrorType::kFileNotFound - }; + "No such file or directory", IOErrorType::kFileNotFound +}; auto const kReadError = IOErrorTemplate { - "Read error", IOErrorType::kReadError - }; + "Read error", IOErrorType::kReadError +}; auto const kBadFileNumber = IOErrorTemplate { - "Bad file number", IOErrorType::kBadFileNumber - }; + "Bad file number", IOErrorType::kBadFileNumber +}; auto const kResourceTemporarilyUnavailable = IOErrorTemplate { - "Resource temporarily unavailable", IOErrorType::kResourceTemporarilyUnavailable - }; + "Resource temporarily unavailable", IOErrorType::kResourceTemporarilyUnavailable +}; auto const kPermissionDenied = IOErrorTemplate { - "Permission denied", IOErrorType::kPermissionDenied - }; + "Permission denied", IOErrorType::kPermissionDenied +}; auto const kUnsupportedAddressFamily = IOErrorTemplate { - "Unsupported address family", IOErrorType::kUnsupportedAddressFamily - }; + "Unsupported address family", IOErrorType::kUnsupportedAddressFamily +}; auto const kInvalidAddressFormat = IOErrorTemplate { - "Invalid address format", IOErrorType::kInvalidAddressFormat - }; + "Invalid address format", IOErrorType::kInvalidAddressFormat +}; auto const kAddressAlreadyInUse = IOErrorTemplate { - "Address already in use", IOErrorType::kAddressAlreadyInUse - }; + "Address already in use", IOErrorType::kAddressAlreadyInUse +}; auto const kConnectionRefused = IOErrorTemplate { - "Connection refused", IOErrorType::kConnectionRefused - }; + "Connection refused", IOErrorType::kConnectionRefused +}; auto const kConnectionResetByPeer = IOErrorTemplate { - "kConnectionResetByPeer", IOErrorType::kConnectionResetByPeer - }; + "kConnectionResetByPeer", IOErrorType::kConnectionResetByPeer +}; auto const kTimeout = IOErrorTemplate { - "kTimeout", IOErrorType::kTimeout - }; + "kTimeout", IOErrorType::kTimeout +}; auto const kFileAlreadyExists = IOErrorTemplate { - "kFileAlreadyExists", IOErrorType::kFileAlreadyExists - }; + "kFileAlreadyExists", IOErrorType::kFileAlreadyExists +}; auto const kNoSpaceLeft = IOErrorTemplate { - "kNoSpaceLeft", IOErrorType::kNoSpaceLeft - }; + "kNoSpaceLeft", IOErrorType::kNoSpaceLeft +}; auto const kSocketOperationOnNonSocket = IOErrorTemplate { - "kSocketOperationOnNonSocket", IOErrorType::kSocketOperationOnNonSocket - }; + "kSocketOperationOnNonSocket", IOErrorType::kSocketOperationOnNonSocket +}; auto const kInvalidMemoryAddress = IOErrorTemplate { - "kInvalidMemoryAddress", IOErrorType::kInvalidMemoryAddress - }; + "kInvalidMemoryAddress", IOErrorType::kInvalidMemoryAddress +}; auto const kUnableToResolveHostname = IOErrorTemplate { - "kUnableToResolveHostname", IOErrorType::kUnableToResolveHostname - }; + "kUnableToResolveHostname", IOErrorType::kUnableToResolveHostname +}; auto const kSocketOperationUnknownAtLevel = IOErrorTemplate { - "kSocketOperationUnknownAtLevel", IOErrorType::kSocketOperationUnknownAtLevel - }; + "kSocketOperationUnknownAtLevel", IOErrorType::kSocketOperationUnknownAtLevel +}; auto const kSocketOperationValueOutOfBound = IOErrorTemplate { - "kSocketOperationValueOutOfBound", IOErrorType::kSocketOperationValueOutOfBound - }; + "kSocketOperationValueOutOfBound", IOErrorType::kSocketOperationValueOutOfBound +}; auto const kAddressNotValid = IOErrorTemplate { - "Address not valid", IOErrorType::kAddressNotValid - }; + "Address not valid", IOErrorType::kAddressNotValid +}; } diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index c7bae8553..dd1e4cf48 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -30,7 +30,7 @@ bool SendDummyData(hidra2::Producer* producer, size_t number_of_byte, uint64_t i for(uint64_t i = 0; i < iterations; i++) { // std::cerr << "Send file " << i + 1 << "/" << iterations << std::endl; - auto err = producer->Send(i, buffer.get(), number_of_byte); + auto err = producer->Send(i + 1, buffer.get(), number_of_byte); if (err) { std::cerr << "File was not successfully send: " << err << std::endl; diff --git a/examples/worker/getnext_broker/CMakeLists.txt b/examples/worker/getnext_broker/CMakeLists.txt index 903ff9db5..03483d371 100644 --- a/examples/worker/getnext_broker/CMakeLists.txt +++ b/examples/worker/getnext_broker/CMakeLists.txt @@ -3,6 +3,7 @@ set(SOURCE_FILES getnext_broker.cpp) add_executable(${TARGET_NAME} ${SOURCE_FILES}) target_link_libraries(${TARGET_NAME} hidra2-worker) + #use expression generator to get rid of VS adding Debug/Release folders set_target_properties(${TARGET_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}$<$<CONFIG:Debug>:> diff --git a/examples/worker/getnext_broker/getnext_broker.cpp b/examples/worker/getnext_broker/getnext_broker.cpp index 3c5a43d06..fc00c718f 100644 --- a/examples/worker/getnext_broker/getnext_broker.cpp +++ b/examples/worker/getnext_broker/getnext_broker.cpp @@ -32,6 +32,7 @@ std::vector<std::thread> StartThreads(const std::string& server, const std::stri hidra2::FileInfo fi; Error err; auto broker = hidra2::DataBrokerFactory::CreateServerBroker(server, run_name, &err); + broker->SetTimeout(1000); while ((err = broker->GetNext(&fi, nullptr)) == nullptr) { (*nfiles)[i] ++; } diff --git a/producer/api/include/producer/producer_error.h b/producer/api/include/producer/producer_error.h index decf91083..cdcf69a74 100644 --- a/producer/api/include/producer/producer_error.h +++ b/producer/api/include/producer/producer_error.h @@ -53,23 +53,23 @@ class ProducerErrorTemplate : public SimpleErrorTemplate { namespace ProducerErrorTemplates { auto const kAlreadyConnected = ProducerErrorTemplate { - "Already connected", ProducerErrorType::kAlreadyConnected - }; + "Already connected", ProducerErrorType::kAlreadyConnected +}; auto const kConnectionNotReady = ProducerErrorTemplate { - "Connection not ready", ProducerErrorType::kConnectionNotReady - }; + "Connection not ready", ProducerErrorType::kConnectionNotReady +}; auto const kFileTooLarge = ProducerErrorTemplate { - "File too large", ProducerErrorType::kFileTooLarge - }; + "File too large", ProducerErrorType::kFileTooLarge +}; auto const kFileIdAlreadyInUse = ProducerErrorTemplate { - "File already in use", ProducerErrorType::kFileIdAlreadyInUse - }; + "File already in use", ProducerErrorType::kFileIdAlreadyInUse +}; auto const kUnknownServerError = ProducerErrorTemplate { - "Unknown server error", ProducerErrorType::kUnknownServerError - }; + "Unknown server error", ProducerErrorType::kUnknownServerError +}; }; diff --git a/receiver/src/receiver_error.h b/receiver/src/receiver_error.h index 63da34f6f..42eb039d8 100644 --- a/receiver/src/receiver_error.h +++ b/receiver/src/receiver_error.h @@ -50,11 +50,11 @@ class ReceiverErrorTemplate : public SimpleErrorTemplate { namespace ReceiverErrorTemplates { auto const kInvalidOpCode = ReceiverErrorTemplate { - "Invalid Opcode", ReceiverErrorType::kInvalidOpCode - }; + "Invalid Opcode", ReceiverErrorType::kInvalidOpCode +}; auto const kBadRequest = ReceiverErrorTemplate { - "Bad request", ReceiverErrorType::kBadRequest - }; + "Bad request", ReceiverErrorType::kBadRequest +}; }; } diff --git a/tests/automatic/CMakeLists.txt b/tests/automatic/CMakeLists.txt index b0a661944..f041b5fe8 100644 --- a/tests/automatic/CMakeLists.txt +++ b/tests/automatic/CMakeLists.txt @@ -20,4 +20,6 @@ add_subdirectory(worker) add_subdirectory(curl_http_client) -add_subdirectory(producer_receiver) \ No newline at end of file +add_subdirectory(producer_receiver) + +add_subdirectory(full_chain) \ No newline at end of file diff --git a/tests/automatic/broker/get_next/check_linux_.sh b/tests/automatic/broker/get_next/check_linux_.sh new file mode 100755 index 000000000..a701ba97b --- /dev/null +++ b/tests/automatic/broker/get_next/check_linux_.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +database_name=data + +echo "db.data.insert({"_id":2})" | mongo ${database_name} +echo "db.data.insert({"_id":1})" | mongo ${database_name} + diff --git a/tests/automatic/full_chain/CMakeLists.txt b/tests/automatic/full_chain/CMakeLists.txt new file mode 100644 index 000000000..24928ac77 --- /dev/null +++ b/tests/automatic/full_chain/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(simple_chain) diff --git a/tests/automatic/full_chain/simple_chain/CMakeLists.txt b/tests/automatic/full_chain/simple_chain/CMakeLists.txt new file mode 100644 index 000000000..5198c49bb --- /dev/null +++ b/tests/automatic/full_chain/simple_chain/CMakeLists.txt @@ -0,0 +1,9 @@ +set(TARGET_NAME full_chain_simple_chain) + +################################ +# Testing +################################ +configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/receiver.json receiver.json COPYONLY) +configure_file(${CMAKE_SOURCE_DIR}/tests/automatic/settings/broker_settings.json broker.json COPYONLY) + +add_script_test("${TARGET_NAME}" "$<TARGET_FILE:dummy-data-producer> $<TARGET_FILE:receiver-bin> $<TARGET_PROPERTY:hidra2-broker,EXENAME> $<TARGET_FILE:getnext_broker>" nomem) diff --git a/tests/automatic/full_chain/simple_chain/check_linux.sh b/tests/automatic/full_chain/simple_chain/check_linux.sh new file mode 100644 index 000000000..f1e136565 --- /dev/null +++ b/tests/automatic/full_chain/simple_chain/check_linux.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash + +set -e + +trap Cleanup EXIT + +broker_database_name=test_run +monitor_database_name=db_test +broker_address=127.0.0.1:5005 + +Cleanup() { + echo cleanup + rm -rf files + kill -9 $receiverid + kill -9 $brokerid + #kill -9 $producerrid + echo "db.dropDatabase()" | mongo ${broker_database_name} + influx -execute "drop database ${monitor_database_name}" +} + +influx -execute "create database ${monitor_database_name}" + + +#receiver +$2 receiver.json & +sleep 0.3 +receiverid=`echo $!` + +#broker +$3 broker.json & +sleep 0.3 +brokerid=`echo $!` + + +#producer +mkdir files +$1 localhost:4200 100 10 & +#producerrid=`echo $!` +sleep 0.1 + +$4 ${broker_address} ${broker_database_name} 2 | grep "Processed 10 file(s)" diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh index 0c38e3337..48d4ff3dc 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh +++ b/tests/automatic/producer_receiver/transfer_single_file/check_linux.sh @@ -21,4 +21,4 @@ mkdir files $1 localhost:4200 100 1 -ls -ln files/0.bin | awk '{ print $5 }'| grep 102400 +ls -ln files/1.bin | awk '{ print $5 }'| grep 102400 diff --git a/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat b/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat index 988e6881f..8a470b403 100644 --- a/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat +++ b/tests/automatic/producer_receiver/transfer_single_file/check_windows.bat @@ -11,7 +11,7 @@ mkdir files ping 1.0.0.0 -n 1 -w 100 > nul -FOR /F "usebackq" %%A IN ('files\0.bin') DO set size=%%~zA +FOR /F "usebackq" %%A IN ('files\1.bin') DO set size=%%~zA if %size% NEQ 102400 goto :error diff --git a/worker/api/cpp/include/worker/data_broker.h b/worker/api/cpp/include/worker/data_broker.h index e0859dcae..a7baaa2a4 100644 --- a/worker/api/cpp/include/worker/data_broker.h +++ b/worker/api/cpp/include/worker/data_broker.h @@ -29,6 +29,8 @@ class DataBroker { //! Connect to the data source - will scan file folders or connect to the database. // TODO: do we need this? virtual Error Connect() = 0; + //! Set timeout for broker operations. Default - no timeout + virtual void SetTimeout(uint64_t timeout_ms) = 0; //! Receive next image. /*! \param info - where to store image metadata. Can be set to nullptr only image data is needed. diff --git a/worker/api/cpp/src/folder_data_broker.h b/worker/api/cpp/src/folder_data_broker.h index f388e5b1f..15bc208fd 100644 --- a/worker/api/cpp/src/folder_data_broker.h +++ b/worker/api/cpp/src/folder_data_broker.h @@ -15,9 +15,9 @@ class FolderDataBroker final : public hidra2::DataBroker { explicit FolderDataBroker(const std::string& source_name); Error Connect() override; Error GetNext(FileInfo* info, FileData* data) override; + void SetTimeout(uint64_t timeout_ms) override {}; // to timeout in this case std::unique_ptr<hidra2::IO> io__; // modified in testings to mock system calls,otherwise do not touch - private: std::string base_path_; bool is_connected_; diff --git a/worker/api/cpp/src/server_data_broker.cpp b/worker/api/cpp/src/server_data_broker.cpp index 0b739e6e7..fcbd1d449 100644 --- a/worker/api/cpp/src/server_data_broker.cpp +++ b/worker/api/cpp/src/server_data_broker.cpp @@ -1,9 +1,17 @@ #include "server_data_broker.h" + + +#include <chrono> + + #include "io/io_factory.h" #include "http_client/curl_http_client.h" #include "http_client/http_error.h" +using std::chrono::high_resolution_clock; + + namespace hidra2 { Error HttpCodeToWorkerError(const HttpCode& code) { @@ -42,20 +50,36 @@ Error ServerDataBroker::Connect() { return nullptr; } +void ServerDataBroker::SetTimeout(uint64_t timeout_ms) { + timeout_ms_ = timeout_ms; +} + Error ServerDataBroker::GetFileInfoFromServer(FileInfo* info, const std::string& operation) { std::string full_uri = server_uri_ + "/database/" + source_name_ + "/" + operation; Error err; HttpCode code; - auto response = httpclient__->Get(full_uri, &code, &err); - if (err != nullptr) { - return err; - } - - err = HttpCodeToWorkerError(code); - if (err != nullptr) { - err->Append(response); - return err; + std::string response; + uint64_t elapsed_ms = 0; + while (true) { + response = httpclient__->Get(full_uri, &code, &err); + if (err != nullptr) { + return err; + } + + err = HttpCodeToWorkerError(code); + if (err == nullptr) break; + if (err->GetErrorType() != hidra2::ErrorType::kEndOfFile) { + err->Append(response); +// return err; + } + + if (elapsed_ms >= timeout_ms_) { + err->Append("exit on timeout"); + return err; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + elapsed_ms += 100; } if (!info->SetFromJson(response)) { diff --git a/worker/api/cpp/src/server_data_broker.h b/worker/api/cpp/src/server_data_broker.h index 5be1b4236..94a818b07 100644 --- a/worker/api/cpp/src/server_data_broker.h +++ b/worker/api/cpp/src/server_data_broker.h @@ -15,12 +15,14 @@ class ServerDataBroker final : public hidra2::DataBroker { explicit ServerDataBroker(const std::string& server_uri, const std::string& source_name); Error Connect() override; Error GetNext(FileInfo* info, FileData* data) override; + void SetTimeout(uint64_t timeout_ms) override; std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch std::unique_ptr<HttpClient> httpclient__; private: Error GetFileInfoFromServer(FileInfo* info, const std::string& operation); std::string server_uri_; std::string source_name_; + uint64_t timeout_ms_ = 0; }; } diff --git a/worker/api/cpp/unittests/test_server_broker.cpp b/worker/api/cpp/unittests/test_server_broker.cpp index 7987769fd..13d85ba3e 100644 --- a/worker/api/cpp/unittests/test_server_broker.cpp +++ b/worker/api/cpp/unittests/test_server_broker.cpp @@ -117,6 +117,20 @@ TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClient) { ASSERT_THAT(err->GetErrorType(), hidra2::ErrorType::kEndOfFile); } +TEST_F(ServerDataBrokerTests, GetNextReturnsEOFFromHttpClientUntilTimeout) { + EXPECT_CALL(mock_http_client, Get_t(_, _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll( + SetArgPointee<1>(HttpCode::NoContent), + SetArgPointee<2>(nullptr), + Return(""))); + + data_broker->SetTimeout(100); + auto err = data_broker->GetNext(&info, nullptr); + + ASSERT_THAT(err->Explain(), HasSubstr(hidra2::WorkerErrorMessage::kNoData)); + ASSERT_THAT(err->GetErrorType(), hidra2::ErrorType::kEndOfFile); +} + + FileInfo CreateFI() { FileInfo fi; -- GitLab