From 5495d033bc2172ec9ced68f9921eb8fc4f76a372 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Tue, 4 Feb 2020 17:02:16 +0100
Subject: [PATCH] update timout processing

---
 common/cpp/include/http_client/http_error.h   | 26 +++++++----
 .../cpp/src/http_client/curl_http_client.cpp  |  7 ++-
 consumer/api/cpp/src/server_data_broker.cpp   | 12 +++--
 .../api/cpp/unittests/test_server_broker.cpp  | 28 ++++++++++--
 consumer/api/python/asapo_consumer.pyx.in     |  2 +
 .../consumer_api_python/check_linux.sh        | 45 ++-----------------
 .../consumer_api_python/consumer_api.py       | 24 +++++-----
 7 files changed, 77 insertions(+), 67 deletions(-)

diff --git a/common/cpp/include/http_client/http_error.h b/common/cpp/include/http_client/http_error.h
index e0746ad1c..79392511c 100644
--- a/common/cpp/include/http_client/http_error.h
+++ b/common/cpp/include/http_client/http_error.h
@@ -6,17 +6,25 @@
 
 namespace asapo {
 
-class HttpError: public SimpleError {
-  public:
-    HttpError(const std::string& error, HttpCode http_code): SimpleError{error, ErrorType::kHttpError}, http_code_{http_code} {
-    }
-    HttpCode GetCode() const {
-        return http_code_;
-    }
-  private:
-    HttpCode http_code_;
+enum class HttpErrorType {
+    kTransferError,
+    kConnectionError
 };
 
+using HttpErrorTemplate = ServiceErrorTemplate<HttpErrorType, ErrorType::kHttpError>;
+
+namespace HttpErrorTemplates {
+
+auto const kTransferError = HttpErrorTemplate{
+    "possible transfer error", HttpErrorType::kTransferError
+};
+
+auto const kConnectionError = HttpErrorTemplate{
+    "connection error", HttpErrorType::kConnectionError
+};
+
+}
+
 }
 
 #endif //ASAPO_HTTP_ERROR_H
diff --git a/common/cpp/src/http_client/curl_http_client.cpp b/common/cpp/src/http_client/curl_http_client.cpp
index 8e80571f6..cf335c73a 100644
--- a/common/cpp/src/http_client/curl_http_client.cpp
+++ b/common/cpp/src/http_client/curl_http_client.cpp
@@ -1,6 +1,7 @@
 #include "curl_http_client.h"
 
 #include <cstring>
+#include "http_client/http_error.h"
 
 namespace asapo {
 
@@ -64,7 +65,11 @@ Error ProcessCurlResponse(CURL* curl, CURLcode res, const char* errbuf,
         return nullptr;
     } else {
         *buffer = GetCurlError(curl, res, errbuf);
-        return TextError("Curl client error: " + *buffer);
+        if (res == CURLE_COULDNT_CONNECT || res == CURLE_COULDNT_RESOLVE_HOST) {
+            return HttpErrorTemplates::kConnectionError.Generate(*buffer);
+        } else {
+            return HttpErrorTemplates::kTransferError.Generate(*buffer);
+        }
     }
 }
 
diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp
index 96809340e..c642c66e8 100644
--- a/consumer/api/cpp/src/server_data_broker.cpp
+++ b/consumer/api/cpp/src/server_data_broker.cpp
@@ -96,7 +96,11 @@ Error ServerDataBroker::ProcessRequest(std::string* response, const RequestInfo&
     }
     if (err != nullptr) {
         current_broker_uri_ = "";
-        return ConsumerErrorTemplates::kInterruptedTransaction.Generate("error processing request: " + err->Explain());
+        if (err == HttpErrorTemplates::kTransferError) {
+            return ConsumerErrorTemplates::kInterruptedTransaction.Generate("error processing request: " + err->Explain());
+        } else {
+            return ConsumerErrorTemplates::kUnavailableService.Generate("error processing request: " + err->Explain());
+        }
     }
     return ErrorFromServerResponce(*response, code);
 }
@@ -153,6 +157,7 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g
     uint64_t elapsed_ms = 0;
     Error no_data_error;
     while (true) {
+        auto start = system_clock::now();
         auto err = GetBrokerUri();
         if (err == nullptr) {
             auto  ri = PrepareRequestInfo(request_api + request_suffix, dataset);
@@ -175,7 +180,7 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g
             return no_data_error ? std::move(no_data_error) : std::move(err);
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(100));
-        elapsed_ms += 100;
+        elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - start).count();
     }
     return nullptr;
 }
@@ -278,6 +283,7 @@ std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Erro
     uint64_t elapsed_ms = 0;
     std::string response;
     while (elapsed_ms <= timeout_ms_) {
+        auto start = system_clock::now();
         *err = GetBrokerUri();
         if (*err == nullptr) {
             request.host = current_broker_uri_;
@@ -287,7 +293,7 @@ std::string ServerDataBroker::BrokerRequestWithTimeout(RequestInfo request, Erro
             }
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(100));
-        elapsed_ms += 100;
+        elapsed_ms += std::chrono::duration_cast<std::chrono::milliseconds>( system_clock::now() - start).count();
     }
     return "";
 }
diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp
index ff8c6b4db..be8c853c8 100644
--- a/consumer/api/cpp/unittests/test_server_broker.cpp
+++ b/consumer/api/cpp/unittests/test_server_broker.cpp
@@ -23,7 +23,6 @@ using asapo::MockIO;
 using asapo::MockHttpClient;
 using asapo::MockNetClient;
 using asapo::HttpCode;
-using asapo::HttpError;
 using asapo::SimpleError;
 
 using ::testing::AtLeast;
@@ -335,12 +334,12 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorO
 }
 
 
-TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnServerError) {
+TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnTransferError) {
     MockGetBrokerUri();
 
     EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).WillOnce(DoAll(
                 SetArgPointee<1>(HttpCode::InternalServerError),
-                SetArgPointee<2>(asapo::IOErrorTemplates::kSocketOperationOnNonSocket.Generate("sss").release()),
+                SetArgPointee<2>(asapo::HttpErrorTemplates::kTransferError.Generate("sss").release()),
                 Return("")));
 
     data_broker->SetTimeout(300);
@@ -348,7 +347,30 @@ TEST_F(ServerDataBrokerTests, GetNextImageReturnsImmediatelyOnServerError) {
 
     ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction));
     ASSERT_THAT(err->Explain(), HasSubstr("sss"));
+}
+
+
+ACTION(AssignArg2) {
+    *arg2 = asapo::HttpErrorTemplates::kConnectionError.Generate().release();
+}
+
+
+TEST_F(ServerDataBrokerTests, GetNextRetriesIfConnectionHttpClientErrorUntilTimeout) {
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr(expected_server_uri + "/discovery/broker"), _,
+                                        _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
+                                                    SetArgPointee<1>(HttpCode::OK),
+                                                    SetArgPointee<2>(nullptr),
+                                                    Return(expected_broker_uri)));
+
+    EXPECT_CALL(mock_http_client, Get_t(HasSubstr("next"), _, _)).Times(AtLeast(2)).WillRepeatedly(DoAll(
+                SetArgPointee<1>(HttpCode::Conflict),
+                AssignArg2(),
+                Return("")));
+
+    data_broker->SetTimeout(300);
+    auto err = data_broker->GetNext(&info, expected_group_id, nullptr);
 
+    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kUnavailableService));
 }
 
 TEST_F(ServerDataBrokerTests, GetImageReturnsFileInfo) {
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index b4d3d413c..48f22fad8 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -140,6 +140,8 @@ cdef class PyDataBroker:
         if err:
             throw_exception(err)
         return size
+    def set_timeout(self,timeout):
+        self.c_broker.SetTimeout(timeout)
     def set_lastread_marker(self,value,group_id):
         cdef string b_group_id = _bytes(group_id)
         cdef Error err
diff --git a/tests/automatic/consumer/consumer_api_python/check_linux.sh b/tests/automatic/consumer/consumer_api_python/check_linux.sh
index ce498bbf5..2832a9e89 100644
--- a/tests/automatic/consumer/consumer_api_python/check_linux.sh
+++ b/tests/automatic/consumer/consumer_api_python/check_linux.sh
@@ -9,47 +9,16 @@ set -e
 
 trap Cleanup EXIT
 
-function wait_mongo {
-NEXT_WAIT_TIME=0
-until mongo --port 27016 --eval "db.version()" | tail -2 | grep version || [ $NEXT_WAIT_TIME -eq 30 ]; do
-  echo "Wait for mongo"
-  NEXT_WAIT_TIME=$(( NEXT_WAIT_TIME++ ))
-  sleep 1
-done
-if (( NEXT_WAIT_TIME == 30 )); then
-    echo "Timeout"
-    exit -1
-fi
-}
-
-
-function kill_mongo {
-    kill -2 `ps xa | grep mongod | grep 27016 | awk '{print $1;}'`
-}
-
-
-function start_mongo {
-    mongod --dbpath /tmp/mongo --port 27016 --logpath /tmp/mongolog --fork
-}
-
-
 Cleanup() {
     set +e
     nomad stop nginx >/dev/null
     nomad run nginx_kill.nmd  && nomad stop -yes -purge nginx_kill
     nomad stop discovery >/dev/null
     nomad stop broker >/dev/null
-	echo "db.dropDatabase()" | mongo --port 27016 ${database_name} >/dev/null
+	echo "db.dropDatabase()" | mongo ${database_name} >/dev/null
 	rm 1 1_1
-    kill_mongo
 }
 
-sed -i 's/27017/27016/g' discovery.json.tpl
-
-
-start_mongo
-wait_mongo
-
 nomad run nginx.nmd
 nomad run discovery.nmd
 nomad run broker.nmd
@@ -59,24 +28,18 @@ echo hello1 > 1_1
 
 for i in `seq 1 5`;
 do
-	echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo --port 27016 ${database_name} >/dev/null
+	echo 'db.data.insert({"_id":'$i',"size":6,"name":"'$i'","lastchange":1,"source":"none","buf_id":0,"meta":{"test":10}})' | mongo ${database_name} >/dev/null
 done
 
 sleep 1
 
 export PYTHONPATH=$1:${PYTHONPATH}
 
-#kill_mongo
-#python consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run broker_server_error
-#sleep 1
-#start_mongo
-#wait_mongo
 python consumer_api.py 127.0.0.1:8400 $source_path $beamtime_id $token_test_run single
 
 
-
 #check datasets
-echo "db.dropDatabase()" | mongo --port 27016 ${database_name} > /dev/null
+echo "db.dropDatabase()" | mongo ${database_name} > /dev/null
 
 sleep 1
 
@@ -88,7 +51,7 @@ do
 		images="$images,{"_id":$j,"size":6,"name":'${i}_${j}',"lastchange":1,"source":'none',"buf_id":0,"meta":{"test":10}}"
 	done
 	images=${images#?}
-	echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo --port 27016 ${database_name} >/dev/null
+	echo 'db.data.insert({"_id":'$i',"size":3,"images":['$images']})' | mongo ${database_name} >/dev/null
 done
 
 
diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py
index 0f7b0db05..42669134a 100644
--- a/tests/automatic/consumer/consumer_api_python/consumer_api.py
+++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py
@@ -10,6 +10,7 @@ def exit_on_noerr(name):
 
 
 def assert_metaname(meta,compare,name):
+    print ("asserting meta for "+name)
     if meta['name'] != compare:
         print ("error at "+name)
         print ('meta: ', json.dumps(meta, indent=4, sort_keys=True))
@@ -45,6 +46,8 @@ def check_single(broker,group_id_new):
     assert_metaname(meta,"1","get next1")
     assert_usermetadata(meta,"get next1")
 
+    broker.set_timeout(1000)
+
     data = broker.retrieve_data(meta)
     assert_eq(data.tostring().decode("utf-8"),"hello1","retrieve_data data")
 
@@ -136,16 +139,14 @@ def check_single(broker,group_id_new):
     else:
         exit_on_noerr("wrong query")
 
-#    broker = asapo_consumer.create_server_broker("bla",path, beamtime,"",token,60000)
-#    try:
-#        broker.get_last(group_id_new, meta_only=True)
-#    except asapo_consumer.AsapoUnavailableServiceError as err:
-#        print(err)
-#        pass
-#    else:
-#        exit_on_noerr("AsapoBrokerServersNotFound")
-
-
+    broker = asapo_consumer.create_server_broker("bla",path, beamtime,"",token,1000)
+    try:
+        broker.get_last(group_id_new, meta_only=True)
+    except asapo_consumer.AsapoUnavailableServiceError as err:
+        print(err)
+        pass
+    else:
+        exit_on_noerr("AsapoBrokerServersNotFound")
 
 def check_dataset(broker,group_id_new):
     id, metas = broker.get_next_dataset(group_id_new)
@@ -154,6 +155,9 @@ def check_dataset(broker,group_id_new):
     assert_metaname(metas[1],"1_2","get nextdataset1 name2")
     assert_usermetadata(metas[0],"get nextdataset1 meta")
 
+    broker.set_timeout(1000)
+
+
     data = broker.retrieve_data(metas[0])
     assert_eq(data.tostring().decode("utf-8"),"hello1","retrieve_data from dataset data")
 
-- 
GitLab