From b791eb9a00c087b46e23c5af2c5855d405641cf5 Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Fri, 7 May 2021 12:29:24 +0200
Subject: [PATCH] fix deadlock in producer

---
 common/cpp/include/asapo/request/request_handler.h            | 2 +-
 common/cpp/src/request/request_pool.cpp                       | 4 +++-
 common/cpp/unittests/request/mocking.h                        | 3 ++-
 producer/api/cpp/src/request_handler_filesystem.cpp           | 2 +-
 producer/api/cpp/src/request_handler_filesystem.h             | 2 +-
 producer/api/cpp/src/request_handler_tcp.cpp                  | 2 +-
 producer/api/cpp/src/request_handler_tcp.h                    | 2 +-
 producer/api/cpp/unittests/test_request_handler_tcp.cpp       | 2 +-
 .../request_handler/receiver_data_server_request_handler.cpp  | 2 +-
 .../request_handler/receiver_data_server_request_handler.h    | 2 +-
 10 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/common/cpp/include/asapo/request/request_handler.h b/common/cpp/include/asapo/request/request_handler.h
index 6a5289bd1..9257f3b96 100644
--- a/common/cpp/include/asapo/request/request_handler.h
+++ b/common/cpp/include/asapo/request/request_handler.h
@@ -14,7 +14,7 @@ class RequestHandler {
     virtual void PrepareProcessingRequestLocked()  = 0;
     virtual void TearDownProcessingRequestLocked(bool success)  = 0;
     virtual bool ProcessRequestUnlocked(GenericRequest* request, bool* retry)  = 0;
-    virtual void ProcessRequestTimeout(GenericRequest* request)  = 0;
+    virtual void ProcessRequestTimeoutUnlocked(GenericRequest* request)  = 0;
     virtual bool ReadyProcessRequest() = 0;
     virtual ~RequestHandler() = default;
 };
diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp
index dc2573b67..32d0cd999 100644
--- a/common/cpp/src/request/request_pool.cpp
+++ b/common/cpp/src/request/request_pool.cpp
@@ -107,7 +107,9 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler> &request_
                                  ThreadInformation* thread_info) {
     auto request = GetRequestFromQueue();
     if (request->TimedOut()) {
-        request_handler->ProcessRequestTimeout(request.get());
+        thread_info->lock.unlock();
+        request_handler->ProcessRequestTimeoutUnlocked(request.get());
+        thread_info->lock.lock();
         return;
     }
     request_handler->PrepareProcessingRequestLocked();
diff --git a/common/cpp/unittests/request/mocking.h b/common/cpp/unittests/request/mocking.h
index d3687f946..d462be46e 100644
--- a/common/cpp/unittests/request/mocking.h
+++ b/common/cpp/unittests/request/mocking.h
@@ -16,7 +16,8 @@ class MockRequestHandler : public RequestHandler {
     MOCK_METHOD0(ReadyProcessRequest, bool());
     MOCK_METHOD1(TearDownProcessingRequestLocked, void(bool processing_succeeded));
     MOCK_METHOD2(ProcessRequestUnlocked_t, bool (const GenericRequest* request, bool* retry));
-    MOCK_METHOD1(ProcessRequestTimeout, void(GenericRequest* request));
+    MOCK_METHOD1(ProcessRequestTimeoutUnlocked, void(GenericRequest* request))GenericRequest*
+  ;
     uint64_t retry_counter = 0;
     bool ProcessRequestUnlocked(GenericRequest* request, bool* retry)  override {
         retry_counter = request->GetRetryCounter();
diff --git a/producer/api/cpp/src/request_handler_filesystem.cpp b/producer/api/cpp/src/request_handler_filesystem.cpp
index 39a8c3d69..968c68b34 100644
--- a/producer/api/cpp/src/request_handler_filesystem.cpp
+++ b/producer/api/cpp/src/request_handler_filesystem.cpp
@@ -38,7 +38,7 @@ bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request, b
     return true;
 }
 
-void RequestHandlerFilesystem::ProcessRequestTimeout(GenericRequest* request) {
+void RequestHandlerFilesystem::ProcessRequestTimeoutUnlocked(GenericRequest* request) {
     log__->Error("request timeout, id:" + std::to_string(request->header.data_id) + " to " + request->header.stream +
                  " stream");
 }
diff --git a/producer/api/cpp/src/request_handler_filesystem.h b/producer/api/cpp/src/request_handler_filesystem.h
index bbb5250c7..15dc03e56 100644
--- a/producer/api/cpp/src/request_handler_filesystem.h
+++ b/producer/api/cpp/src/request_handler_filesystem.h
@@ -23,7 +23,7 @@ class RequestHandlerFilesystem: public RequestHandler {
     };
     void PrepareProcessingRequestLocked()  override {};
     void TearDownProcessingRequestLocked(bool request_processed_successfully)  override {};
-    void ProcessRequestTimeout(GenericRequest* request)  override;
+    void ProcessRequestTimeoutUnlocked(GenericRequest* request)  override;
 
     virtual ~RequestHandlerFilesystem() = default;
     std::unique_ptr<IO> io__;
diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp
index 85895cfd0..9be95ddd2 100644
--- a/producer/api/cpp/src/request_handler_tcp.cpp
+++ b/producer/api/cpp/src/request_handler_tcp.cpp
@@ -307,7 +307,7 @@ void RequestHandlerTcp::TearDownProcessingRequestLocked(bool request_processed_s
     }
 }
 
-void RequestHandlerTcp::ProcessRequestTimeout(GenericRequest* request) {
+void RequestHandlerTcp::ProcessRequestTimeoutUnlocked(GenericRequest* request) {
     auto producer_request = static_cast<ProducerRequest*>(request);
     auto err_string = "request id:" + std::to_string(request->header.data_id) + ", opcode: " + std::to_string(
         request->header.op_code) + " for " + request->header.stream +
diff --git a/producer/api/cpp/src/request_handler_tcp.h b/producer/api/cpp/src/request_handler_tcp.h
index a891a81d5..7b8e61874 100644
--- a/producer/api/cpp/src/request_handler_tcp.h
+++ b/producer/api/cpp/src/request_handler_tcp.h
@@ -23,7 +23,7 @@ class RequestHandlerTcp: public RequestHandler {
     bool ReadyProcessRequest() override;
     void PrepareProcessingRequestLocked()  override;
     void TearDownProcessingRequestLocked(bool request_processed_successfully)  override;
-    void ProcessRequestTimeout(GenericRequest* request)  override;
+    void ProcessRequestTimeoutUnlocked(GenericRequest* request)  override;
 
     virtual ~RequestHandlerTcp() = default;
     std::unique_ptr<IO> io__;
diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
index 1d7b812cd..9066b9044 100644
--- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp
+++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
@@ -974,7 +974,7 @@ TEST_F(RequestHandlerTcpTests, TimeoutCallsCallback) {
         HasSubstr("stream"))
     ));
 
-    request_handler.ProcessRequestTimeout(&request);
+    request_handler.ProcessRequestTimeoutUnlocked(&request);
 
     ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kTimeout));
     ASSERT_THAT(callback_called, Eq(true));
diff --git a/receiver/src/receiver_data_server/request_handler/receiver_data_server_request_handler.cpp b/receiver/src/receiver_data_server/request_handler/receiver_data_server_request_handler.cpp
index 0f8f387fa..1529dc7ff 100644
--- a/receiver/src/receiver_data_server/request_handler/receiver_data_server_request_handler.cpp
+++ b/receiver/src/receiver_data_server/request_handler/receiver_data_server_request_handler.cpp
@@ -86,7 +86,7 @@ void ReceiverDataServerRequestHandler::TearDownProcessingRequestLocked(bool /*pr
 // do nothing
 }
 
-void ReceiverDataServerRequestHandler::ProcessRequestTimeout(GenericRequest* /*request*/) {
+void ReceiverDataServerRequestHandler::ProcessRequestTimeoutUnlocked(GenericRequest* /*request*/) {
 // do nothing
 }
 
diff --git a/receiver/src/receiver_data_server/request_handler/receiver_data_server_request_handler.h b/receiver/src/receiver_data_server/request_handler/receiver_data_server_request_handler.h
index 18fc5937a..952cb1cb7 100644
--- a/receiver/src/receiver_data_server/request_handler/receiver_data_server_request_handler.h
+++ b/receiver/src/receiver_data_server/request_handler/receiver_data_server_request_handler.h
@@ -17,7 +17,7 @@ class ReceiverDataServerRequestHandler: public RequestHandler {
     bool ReadyProcessRequest() override;
     void PrepareProcessingRequestLocked()  override;
     void TearDownProcessingRequestLocked(bool processing_succeeded)  override;
-    void ProcessRequestTimeout(GenericRequest* request)  override;
+    void ProcessRequestTimeoutUnlocked(GenericRequest* request)  override;
 
     const AbstractLogger* log__;
     Statistics* statistics__;
-- 
GitLab