From 8879da8d98b0711347003abe2db4037c30811625 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Tue, 1 Dec 2020 14:03:59 +0100
Subject: [PATCH] add interrupt_current_operation

---
 CHANGELOG.md                                  |  5 +--
 .../api/cpp/include/consumer/data_broker.h    |  2 ++
 consumer/api/cpp/src/server_data_broker.cpp   | 13 +++++++
 consumer/api/cpp/src/server_data_broker.h     |  6 +++-
 .../api/cpp/unittests/test_server_broker.cpp  | 34 ++++++++++++++++---
 consumer/api/python/asapo_consumer.pxd        |  2 +-
 consumer/api/python/asapo_consumer.pyx.in     |  3 +-
 .../consumer_api_python/consumer_api.py       | 28 +++++++++++++++
 8 files changed, 84 insertions(+), 9 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 436778416..64eed736b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,11 +1,12 @@
-## 20.09.2 (in progress)
+## 20.12.0 (in progress)
 
 FEATURES
 * implemented possibility to send data without writing to database (no need of consecutive indexes, etc. but will not be able to consume such data)
 * allow to return incomplete datasets (wihout error if one sets minimum dataset size, otherwise with "partial data" error)
 
  IMPROVEMENTS
-* Consumer API - change behavior of GetLast/get_last - do not set current pointer after call to the last image  
+* Consumer API - change behavior of GetLast/get_last - do not set current pointer after call to the last image
+* Consumer API - add interrupt_current_operation to allow interrupting (from a separate thread) long consumer operation  
 * Producer API - return original data in callback payload.  
 * Producer API - allow to set queue limits (number of pending requests and/or max memory), reject new requests if reached the limits  
 
diff --git a/consumer/api/cpp/include/consumer/data_broker.h b/consumer/api/cpp/include/consumer/data_broker.h
index e82175856..f5e5f22b1 100644
--- a/consumer/api/cpp/include/consumer/data_broker.h
+++ b/consumer/api/cpp/include/consumer/data_broker.h
@@ -196,6 +196,8 @@ class DataBroker {
     */
     virtual void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) = 0;
 
+  //! Will try to interrupt current long runnung operations (mainly needed to exit waiting loop in C from Python)
+    virtual void InterruptCurrentOperation() = 0;
 
     virtual ~DataBroker() = default; // needed for unique_ptr to delete itself
 };
diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp
index 0d9519cf0..8318d76eb 100644
--- a/consumer/api/cpp/src/server_data_broker.cpp
+++ b/consumer/api/cpp/src/server_data_broker.cpp
@@ -243,6 +243,7 @@ RequestInfo ServerDataBroker::PrepareRequestInfo(std::string api_url, bool datas
 Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string group_id, std::string substream,
                                             GetImageServerOperation op,
                                             bool dataset, uint64_t min_size) {
+    interrupt_flag_= false;
     std::string request_suffix = OpToUriCmd(op);
     std::string request_group = OpToUriCmd(op);
     std::string request_api = "/database/" + source_credentials_.beamtime_id + "/" + source_credentials_.stream
@@ -250,6 +251,10 @@ Error ServerDataBroker::GetRecordFromServer(std::string* response, std::string g
     uint64_t elapsed_ms = 0;
     Error no_data_error;
     while (true) {
+        if (interrupt_flag_) {
+            return ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request");
+        }
+
         auto start = system_clock::now();
         auto err = DiscoverService(kBrokerServiceName, &current_broker_uri_);
         if (err == nullptr) {
@@ -444,9 +449,14 @@ Error ServerDataBroker::ServiceRequestWithTimeout(const std::string &service_nam
                                                   std::string* service_uri,
                                                   RequestInfo request,
                                                   RequestOutput* response) {
+    interrupt_flag_= false;
     uint64_t elapsed_ms = 0;
     Error err;
     while (elapsed_ms <= timeout_ms_) {
+        if (interrupt_flag_) {
+            err = ConsumerErrorTemplates::kInterruptedTransaction.Generate("interrupted by user request");
+            break;
+        }
         auto start = system_clock::now();
         err = DiscoverService(service_name, service_uri);
         if (err == nullptr) {
@@ -843,5 +853,8 @@ Error ServerDataBroker::NegativeAcknowledge(std::string group_id,
     BrokerRequestWithTimeout(ri, &err);
     return err;
 }
+void ServerDataBroker::InterruptCurrentOperation() {
+    interrupt_flag_= true;
+}
 
 }
diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h
index 53a938813..ffd619532 100644
--- a/consumer/api/cpp/src/server_data_broker.h
+++ b/consumer/api/cpp/src/server_data_broker.h
@@ -3,6 +3,7 @@
 
 #include <common/networking.h>
 #include <mutex>
+#include <atomic>
 #include "consumer/data_broker.h"
 #include "io/io.h"
 #include "http_client/http_client.h"
@@ -113,10 +114,12 @@ class ServerDataBroker final : public asapo::DataBroker {
     StreamInfos GetSubstreamList(std::string from, Error* err) override;
     void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts) override;
 
+    virtual void InterruptCurrentOperation() override;
+
+
     std::unique_ptr<IO> io__; // modified in testings to mock system calls,otherwise do not touch
     std::unique_ptr<HttpClient> httpclient__;
     std::unique_ptr<NetClient> net_client__;
-
     std::mutex net_client_mutex__; // Required for the lazy initialization of net_client
   private:
     Error GetDataFromFileTransferService(FileInfo* info, FileData* data, bool retry_with_new_token);
@@ -167,6 +170,7 @@ class ServerDataBroker final : public asapo::DataBroker {
     bool resend_ = false;
     uint64_t delay_sec_;
     uint64_t resend_attempts_;
+    std::atomic<bool> interrupt_flag_{ false};
 };
 
 }
diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp
index d7b1f9536..bb068946d 100644
--- a/consumer/api/cpp/unittests/test_server_broker.cpp
+++ b/consumer/api/cpp/unittests/test_server_broker.cpp
@@ -1,6 +1,7 @@
 #include <gmock/gmock.h>
 #include <gmock/gmock.h>
 #include "gtest/gtest.h"
+#include <chrono>
 
 #include "consumer/data_broker.h"
 #include "consumer/consumer_error.h"
@@ -411,7 +412,7 @@ TEST_F(ServerDataBrokerTests, GetImageReturnsNoDataAfterTimeoutEvenIfOtherErrorO
             ",\"id_max\":2,\"next_substream\":\"""\"}")));
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
-    + std::to_string(expected_dataset_id) + "?token="
+                                            + std::to_string(expected_dataset_id) + "?token="
                                             + expected_token, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
         SetArgPointee<1>(HttpCode::NotFound),
         SetArgPointee<2>(nullptr),
@@ -724,7 +725,7 @@ TEST_F(ServerDataBrokerTests, GetByIdTimeouts) {
     data_broker->SetTimeout(10);
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
-    + std::to_string(expected_dataset_id) + "?token="
+                                            + std::to_string(expected_dataset_id) + "?token="
                                             + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::Conflict),
         SetArgPointee<2>(nullptr),
@@ -740,7 +741,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStream) {
     data_broker->SetTimeout(10);
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
-    + std::to_string(expected_dataset_id) + "?token="
+                                            + std::to_string(expected_dataset_id) + "?token="
                                             + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::Conflict),
         SetArgPointee<2>(nullptr),
@@ -756,7 +757,7 @@ TEST_F(ServerDataBrokerTests, GetByIdReturnsEndOfStreamWhenIdTooLarge) {
     data_broker->SetTimeout(10);
 
     EXPECT_CALL(mock_http_client, Get_t(expected_broker_uri + "/database/beamtime_id/" + expected_stream + "/default/0/"
-    + std::to_string(expected_dataset_id) + "?token="
+                                            + std::to_string(expected_dataset_id) + "?token="
                                             + expected_token, _, _)).WillOnce(DoAll(
         SetArgPointee<1>(HttpCode::Conflict),
         SetArgPointee<2>(nullptr),
@@ -1368,4 +1369,29 @@ TEST_F(ServerDataBrokerTests, NegativeAcknowledgeUsesCorrectUri) {
     ASSERT_THAT(err, Eq(nullptr));
 }
 
+TEST_F(ServerDataBrokerTests, CanInterruptOperation) {
+    EXPECT_CALL(mock_http_client, Get_t(_, _, _)).Times(AtLeast(1)).WillRepeatedly(DoAll(
+        SetArgPointee<1>(HttpCode::NotFound),
+        SetArgPointee<2>(nullptr),
+        Return("")));
+
+    auto start = std::chrono::system_clock::now();
+    asapo::Error err;
+    auto exec = [this,&err]() {
+      data_broker->SetTimeout(10000);
+      err = data_broker->GetNext(&info, "", nullptr);
+    };
+    auto thread = std::thread(exec);
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    data_broker->InterruptCurrentOperation();
+
+    thread.join();
+
+    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count();
+    ASSERT_THAT(elapsed_ms, testing::Lt(1000));
+    ASSERT_THAT(err, Eq(asapo::ConsumerErrorTemplates::kInterruptedTransaction));
+
+}
+
 }
diff --git a/consumer/api/python/asapo_consumer.pxd b/consumer/api/python/asapo_consumer.pxd
index 34450e0e1..7ade66f19 100644
--- a/consumer/api/python/asapo_consumer.pxd
+++ b/consumer/api/python/asapo_consumer.pxd
@@ -81,7 +81,7 @@ cdef extern from "asapo_consumer.h" namespace "asapo" nogil:
         Error RetrieveData(FileInfo* info, FileData* data)
         vector[StreamInfo] GetSubstreamList(string from_substream, Error* err)
         void SetResendNacs(bool resend, uint64_t delay_sec, uint64_t resend_attempts)
-
+        void InterruptCurrentOperation()
 
 cdef extern from "asapo_consumer.h" namespace "asapo" nogil:
     cdef cppclass DataBrokerFactory:
diff --git a/consumer/api/python/asapo_consumer.pyx.in b/consumer/api/python/asapo_consumer.pyx.in
index bb37ba89c..0cc14340b 100644
--- a/consumer/api/python/asapo_consumer.pyx.in
+++ b/consumer/api/python/asapo_consumer.pyx.in
@@ -317,7 +317,8 @@ cdef class PyDataBroker:
         meta = json.loads(_str(meta_str))
         del meta['_id']
         return meta
-
+    def interrupt_current_operation(self):
+        self.c_broker.InterruptCurrentOperation()
 cdef class __PyDataBrokerFactory:
     cdef DataBrokerFactory c_factory
     def __cinit__(self):
diff --git a/tests/automatic/consumer/consumer_api_python/consumer_api.py b/tests/automatic/consumer/consumer_api_python/consumer_api.py
index 98f1d29f9..820b59faf 100644
--- a/tests/automatic/consumer/consumer_api_python/consumer_api.py
+++ b/tests/automatic/consumer/consumer_api_python/consumer_api.py
@@ -3,7 +3,10 @@ from __future__ import print_function
 import asapo_consumer
 import json
 import sys
+import time
+from threading import Thread
 
+thread_res = 0
 
 def exit_on_noerr(name):
     print(name)
@@ -44,6 +47,7 @@ def check_file_transfer_service(broker, group_id):
 
 
 def check_single(broker, group_id):
+    global thread_res
     _, meta = broker.get_next(group_id, meta_only=True)
     assert_metaname(meta, "1", "get next1")
     assert_usermetadata(meta, "get next1")
@@ -210,6 +214,30 @@ def check_single(broker, group_id):
     else:
         exit_on_noerr("should be AsapoWrongInputError")
 
+# interrupt
+    thread_res = 0
+    def long_call(broker):
+        global thread_res
+        try:
+            broker.get_last(meta_only=True)
+            thread_res = 1
+        except asapo_consumer.AsapoInterruptedTransactionError as err:
+            global res
+            print(err)
+            thread_res = 2
+            pass
+        else:
+            print("interrupt test failed")
+            thread_res = 3
+            pass
+
+    broker = asapo_consumer.create_server_broker("bla", path, True, beamtime, "", token, 60000)
+    t = Thread(target =  long_call, args =  (broker,) )
+    t.start()
+    broker.interrupt_current_operation()
+    t.join()
+    assert_eq(thread_res, 2, "long call res")
+
 
 
 def check_dataset(broker, group_id):
-- 
GitLab