From d0d765b32c801264714e93cd30df85d137d2191c Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Mon, 24 Feb 2020 14:22:18 +0100
Subject: [PATCH] refactor authorization

---
 common/cpp/include/request/request_handler.h  |  4 +-
 common/cpp/src/request/request_pool.cpp       |  5 +-
 common/cpp/unittests/request/mocking.h        |  2 +-
 examples/pipeline/in_to_out/in_to_out.cpp     |  2 +-
 .../cpp/src/request_handler_filesystem.cpp    |  4 +-
 .../api/cpp/src/request_handler_filesystem.h  |  4 +-
 producer/api/cpp/src/request_handler_tcp.cpp  | 48 +++++----
 producer/api/cpp/src/request_handler_tcp.h    | 12 +--
 .../test_request_handler_filesystem.cpp       | 12 +--
 .../unittests/test_request_handler_tcp.cpp    | 99 ++++++++++++-------
 .../receiver_data_server_request_handler.cpp  |  3 +-
 .../receiver_data_server_request_handler.h    |  2 +-
 receiver/src/request_handler_authorize.cpp    |  9 +-
 receiver/src/request_handler_authorize.h      |  2 +-
 .../test_request_handler.cpp                  | 11 ++-
 15 files changed, 137 insertions(+), 82 deletions(-)

diff --git a/common/cpp/include/request/request_handler.h b/common/cpp/include/request/request_handler.h
index 60fbd767b..06cd2a59b 100644
--- a/common/cpp/include/request/request_handler.h
+++ b/common/cpp/include/request/request_handler.h
@@ -12,8 +12,8 @@ namespace  asapo {
 class RequestHandler {
   public:
     virtual void PrepareProcessingRequestLocked()  = 0;
-    virtual void TearDownProcessingRequestLocked(bool processing_succeeded)  = 0;
-    virtual bool ProcessRequestUnlocked(GenericRequest* request)  = 0;
+    virtual void TearDownProcessingRequestLocked(bool success)  = 0;
+    virtual bool ProcessRequestUnlocked(GenericRequest* request, bool* retry)  = 0;
     virtual void ProcessRequestTimeout(GenericRequest* request)  = 0;
     virtual bool ReadyProcessRequest() = 0;
     virtual ~RequestHandler() = default;
diff --git a/common/cpp/src/request/request_pool.cpp b/common/cpp/src/request/request_pool.cpp
index de91b3f15..f49f28c75 100644
--- a/common/cpp/src/request/request_pool.cpp
+++ b/common/cpp/src/request/request_pool.cpp
@@ -50,11 +50,12 @@ void RequestPool::ProcessRequest(const std::unique_ptr<RequestHandler>& request_
     request_handler->PrepareProcessingRequestLocked();
     requests_in_progress_++;
     thread_info->lock.unlock();
-    auto success = request_handler->ProcessRequestUnlocked(request.get());
+    bool retry;
+    auto success = request_handler->ProcessRequestUnlocked(request.get(), &retry);
     thread_info->lock.lock();
     requests_in_progress_--;
     request_handler->TearDownProcessingRequestLocked(success);
-    if (!success) {
+    if (retry) {
         PutRequestBackToQueue(std::move(request));
         thread_info->lock.unlock();
         condition_.notify_all();
diff --git a/common/cpp/unittests/request/mocking.h b/common/cpp/unittests/request/mocking.h
index d37fdd79d..c4ed7dfd5 100644
--- a/common/cpp/unittests/request/mocking.h
+++ b/common/cpp/unittests/request/mocking.h
@@ -18,7 +18,7 @@ class MockRequestHandler : public RequestHandler {
     MOCK_METHOD1(ProcessRequestUnlocked_t, bool (const GenericRequest* request));
     MOCK_METHOD1(ProcessRequestTimeout, void(GenericRequest* request));
     uint64_t retry_counter = 0;
-    bool ProcessRequestUnlocked(GenericRequest* request)  override {
+    bool ProcessRequestUnlocked(GenericRequest* request, bool* retry)  override {
         retry_counter = request->GetRetryCounter();
         std::this_thread::sleep_for(std::chrono::milliseconds(50));
         return ProcessRequestUnlocked_t(request);
diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp
index 714e704c2..97d5234e4 100644
--- a/examples/pipeline/in_to_out/in_to_out.cpp
+++ b/examples/pipeline/in_to_out/in_to_out.cpp
@@ -39,7 +39,7 @@ struct Args {
 };
 
 void ProcessAfterSend(asapo::GenericRequestHeader header, asapo::Error err) {
-    if (err && err!=asapo::ProducerErrorTemplates::kServerWarning) {
+    if (err && err != asapo::ProducerErrorTemplates::kServerWarning) {
         std::cerr << "Data was not successfully send: " << err << std::endl;
         return;
     }
diff --git a/producer/api/cpp/src/request_handler_filesystem.cpp b/producer/api/cpp/src/request_handler_filesystem.cpp
index fd75e003f..5e1437d25 100644
--- a/producer/api/cpp/src/request_handler_filesystem.cpp
+++ b/producer/api/cpp/src/request_handler_filesystem.cpp
@@ -16,13 +16,14 @@ RequestHandlerFilesystem::RequestHandlerFilesystem(std::string destination_folde
     thread_id_{thread_id} {
 }
 
-bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request) {
+bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request, bool* retry) {
     auto producer_request = static_cast<ProducerRequest*>(request);
     Error err;
     if (producer_request->DataFromFile()) {
         producer_request->data = io__->GetDataFromFile(producer_request->original_filepath, &producer_request->header.data_size,
                                                        &err);
         if (err) {
+            *retry = true;
             return false;
         }
     }
@@ -32,6 +33,7 @@ bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request) {
     if (producer_request->callback) {
         producer_request->callback(request->header, std::move(err));
     }
+    *retry = false;
     return true;
 }
 
diff --git a/producer/api/cpp/src/request_handler_filesystem.h b/producer/api/cpp/src/request_handler_filesystem.h
index 72fb21061..66b1ca980 100644
--- a/producer/api/cpp/src/request_handler_filesystem.h
+++ b/producer/api/cpp/src/request_handler_filesystem.h
@@ -17,12 +17,12 @@ namespace asapo {
 class RequestHandlerFilesystem: public RequestHandler {
   public:
     explicit RequestHandlerFilesystem(std::string destination_folder, uint64_t thread_id);
-    bool ProcessRequestUnlocked(GenericRequest* request) override;
+    bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) override;
     bool ReadyProcessRequest() override {
         return true;
     };
     void PrepareProcessingRequestLocked()  override {};
-    void TearDownProcessingRequestLocked(bool processing_succeeded)  override {};
+    void TearDownProcessingRequestLocked(bool request_processed_successfully)  override {};
     void ProcessRequestTimeout(GenericRequest* request)  override;
 
     virtual ~RequestHandlerFilesystem() = default;
diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp
index a1f25ffb5..9baccb887 100644
--- a/producer/api/cpp/src/request_handler_tcp.cpp
+++ b/producer/api/cpp/src/request_handler_tcp.cpp
@@ -12,8 +12,8 @@ RequestHandlerTcp::RequestHandlerTcp(ReceiverDiscoveryService* discovery_service
     ncurrent_connections_{shared_counter} {
 }
 
-Error RequestHandlerTcp::Authorize(const std::string& beamtime_id) {
-    GenericRequestHeader header{kOpcodeAuthorize, 0, 0, 0, beamtime_id.c_str()};
+Error RequestHandlerTcp::Authorize(const std::string& source_credentials) {
+    GenericRequestHeader header{kOpcodeAuthorize, 0, 0, 0, source_credentials.c_str()};
     Error err;
     io__->Send(sd_, &header, sizeof(header), &err);
     if(err) {
@@ -23,7 +23,7 @@ Error RequestHandlerTcp::Authorize(const std::string& beamtime_id) {
 }
 
 
-Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const std::string& receiver_address) {
+Error RequestHandlerTcp::ConnectToReceiver(const std::string& source_credentials, const std::string& receiver_address) {
     Error err;
 
     sd_ = io__->CreateAndConnectIPTCPSocket(receiver_address, &err);
@@ -34,7 +34,7 @@ Error RequestHandlerTcp::ConnectToReceiver(const std::string& beamtime_id, const
     log__->Debug("connected to receiver at " + receiver_address);
 
     connected_receiver_uri_ = receiver_address;
-    err = Authorize(beamtime_id);
+    err = Authorize(source_credentials);
     if (err != nullptr) {
         log__->Error("authorization failed at " + receiver_address + " - " + err->Explain());
         Disconnect();
@@ -206,31 +206,44 @@ bool RequestHandlerTcp::ProcessErrorFromReceiver(const Error& error,
 }
 
 
-bool RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request) {
+void RequestHandlerTcp::ProcessRequestCallback(Error err, ProducerRequest* request,bool* retry) {
+    if (request->callback) {
+        request->callback(request->header, std::move(err));
+    }
+    *retry = false;
+}
+
+
+bool RequestHandlerTcp::SendDataToOneOfTheReceivers(ProducerRequest* request, bool* retry) {
     for (auto receiver_uri : receivers_list_) {
         if (Disconnected()) {
             auto err = ConnectToReceiver(request->source_credentials, receiver_uri);
-            if (err != nullptr ) continue;
+            if (err == ProducerErrorTemplates::kWrongInput) {
+                ProcessRequestCallback(std::move(err),request,retry);
+                return false;
+            } else {
+                if (err != nullptr ) continue;
+            }
         }
 
         auto err = TrySendToReceiver(request);
-        auto retry = ProcessErrorFromReceiver(err, request, receiver_uri);
-        if (retry)  {
+        bool server_error_can_retry = ProcessErrorFromReceiver(err, request, receiver_uri);
+        if (server_error_can_retry)  {
             continue;
         }
 
-        if (request->callback) {
-            request->callback(request->header, std::move(err));
-        }
-        return true;
+        bool success = err && err != ProducerErrorTemplates::kServerWarning ? false : true;
+        ProcessRequestCallback(std::move(err),request,retry);
+        return success;
     }
     log__->Warning("put back to the queue, request opcode: " + std::to_string(request->header.op_code) +
                    ", id: " + std::to_string(request->header.data_id));
+    *retry = true;
     return false;
 }
 
 
-bool RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request) {
+bool RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request, bool* retry) {
     auto producer_request = static_cast<ProducerRequest*>(request);
 
     auto err = producer_request->UpdateDataSizeFromFileIfNeeded(io__.get());
@@ -238,14 +251,15 @@ bool RequestHandlerTcp::ProcessRequestUnlocked(GenericRequest* request) {
         if (producer_request->callback) {
             producer_request->callback(producer_request->header, std::move(err));
         }
-        return true;
+        *retry = false;
+        return false;
     }
 
 
     if (NeedRebalance()) {
         CloseConnectionToPeformRebalance();
     }
-    return SendDataToOneOfTheReceivers(producer_request);
+    return SendDataToOneOfTheReceivers(producer_request, retry);
 }
 
 bool RequestHandlerTcp::Connected() {
@@ -264,8 +278,8 @@ void RequestHandlerTcp::PrepareProcessingRequestLocked() {
     UpdateIfNewConnection();
 }
 
-void RequestHandlerTcp::TearDownProcessingRequestLocked(bool processing_succeeded) {
-    if (!processing_succeeded) {
+void RequestHandlerTcp::TearDownProcessingRequestLocked(bool request_processed_successfully) {
+    if (!request_processed_successfully) {
         (*ncurrent_connections_)--;
     }
 }
diff --git a/producer/api/cpp/src/request_handler_tcp.h b/producer/api/cpp/src/request_handler_tcp.h
index 0cba33e5d..fa2d39b71 100644
--- a/producer/api/cpp/src/request_handler_tcp.h
+++ b/producer/api/cpp/src/request_handler_tcp.h
@@ -19,10 +19,10 @@ namespace asapo {
 class RequestHandlerTcp: public RequestHandler {
   public:
     explicit RequestHandlerTcp(ReceiverDiscoveryService* discovery_service, uint64_t thread_id, uint64_t* shared_counter);
-    bool ProcessRequestUnlocked(GenericRequest* request) override;
+    bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) override;
     bool ReadyProcessRequest() override;
     void PrepareProcessingRequestLocked()  override;
-    void TearDownProcessingRequestLocked(bool processing_succeeded)  override;
+    void TearDownProcessingRequestLocked(bool request_processed_successfully)  override;
     void ProcessRequestTimeout(GenericRequest* request)  override;
 
     virtual ~RequestHandlerTcp() = default;
@@ -30,9 +30,9 @@ class RequestHandlerTcp: public RequestHandler {
     const AbstractLogger* log__;
     ReceiverDiscoveryService* discovery_service__;
   private:
-    Error Authorize(const std::string& beamtime_id);
-    Error ConnectToReceiver(const std::string& beamtime_id, const std::string& receiver_address);
-    bool SendDataToOneOfTheReceivers(ProducerRequest* request);
+    Error Authorize(const std::string& source_credentials);
+    Error ConnectToReceiver(const std::string& source_credentials, const std::string& receiver_address);
+    bool SendDataToOneOfTheReceivers(ProducerRequest* request, bool* retry);
     Error SendRequestContent(const ProducerRequest* request);
     Error ReceiveResponse(const GenericRequestHeader& request_header);
     Error TrySendToReceiver(const ProducerRequest* request);
@@ -50,7 +50,7 @@ class RequestHandlerTcp: public RequestHandler {
     bool ProcessErrorFromReceiver(const Error& error, const ProducerRequest* request, const std::string& receiver_uri);
     ReceiversList receivers_list_;
     system_clock::time_point last_receivers_uri_update_;
-
+    void ProcessRequestCallback(Error err, ProducerRequest* request, bool* retry);
     uint64_t thread_id_;
     uint64_t* ncurrent_connections_;
     std::string connected_receiver_uri_;
diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp
index 4e443b971..da7be9fba 100644
--- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp
+++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp
@@ -50,7 +50,7 @@ class RequestHandlerFilesystemTests : public testing::Test {
     std::string  expected_destination = "destination";
     std::string expected_fullpath = expected_destination + "/" + expected_file_name;
     std::string expected_origin_fullpath = std::string("origin/") + expected_file_name;
-
+    bool retry;
     asapo::Opcode expected_op_code = asapo::kOpcodeTransferData;
     asapo::Error callback_err;
     asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size,
@@ -104,7 +104,7 @@ TEST_F(RequestHandlerFilesystemTests, CallBackErrorIfCannotSaveFile) {
     );
 
 
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(callback_err, Eq(asapo::IOErrorTemplates::kUnknownIOError));
     ASSERT_THAT(called, Eq(true));
@@ -119,7 +119,7 @@ TEST_F(RequestHandlerFilesystemTests, WorksWithemptyCallback) {
     );
 
 
-    auto success = request_handler.ProcessRequestUnlocked(&request_nocallback);
+    auto success = request_handler.ProcessRequestUnlocked(&request_nocallback, &retry);
 
     ASSERT_THAT(called, Eq(false));
     ASSERT_THAT(success, Eq(true));
@@ -135,7 +135,7 @@ TEST_F(RequestHandlerFilesystemTests, FileRequestErrorOnReadData) {
             Return(nullptr)
         ));
 
-    auto success = request_handler.ProcessRequestUnlocked(&request_filesend);
+    auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry);
     ASSERT_THAT(success, Eq(false));
 }
 
@@ -154,7 +154,7 @@ TEST_F(RequestHandlerFilesystemTests, FileRequestOK) {
         Return(nullptr)
     );
 
-    auto success = request_handler.ProcessRequestUnlocked(&request_filesend);
+    auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry);
     ASSERT_THAT(success, Eq(true));
 }
 
@@ -169,7 +169,7 @@ TEST_F(RequestHandlerFilesystemTests, TransferOK) {
     );
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(true));
     ASSERT_THAT(callback_err, Eq(nullptr));
diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
index ab5cebbc6..3d898b6a1 100644
--- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp
+++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
@@ -101,6 +101,7 @@ class RequestHandlerTcpTests : public testing::Test {
 
     std::vector<asapo::SocketDescriptor> expected_sds{83942, 83943};
 
+    bool retry;
     Sequence seq_receive;
     void ExpectFailConnect(bool only_once = false);
     void ExpectFailAuthorize(bool only_once = false);
@@ -339,7 +340,6 @@ void RequestHandlerTcpTests::ExpectFailSend(uint64_t expected_size, bool only_on
     EXPECT_CALL(mock_logger, Warning(HasSubstr("put back")));
 }
 
-
 void RequestHandlerTcpTests::ExpectFailSendData(bool only_once) {
     ExpectFailSend(expected_file_size, only_once);
 }
@@ -495,7 +495,7 @@ void RequestHandlerTcpTests::DoSingleSend(bool connect, bool success) {
     }
 
     request_handler.PrepareProcessingRequestLocked();
-    request_handler.ProcessRequestUnlocked(&request);
+    request_handler.ProcessRequestUnlocked(&request, &retry);
 
     Mock::VerifyAndClearExpectations(&mock_io);
     Mock::VerifyAndClearExpectations(&mock_logger);
@@ -531,11 +531,9 @@ TEST_F(RequestHandlerTcpTests, DoesNotGetsUriIfAlreadyConnected) {
 
 TEST_F(RequestHandlerTcpTests, ReduceConnectionNumberAtTearDownIfError) {
     n_connections = 1;
-
     request_handler.TearDownProcessingRequestLocked(false);
 
     ASSERT_THAT(n_connections, Eq(0));
-
 }
 
 TEST_F(RequestHandlerTcpTests, DoNotReduceConnectionNumberAtTearDownIfNoError) {
@@ -551,21 +549,27 @@ TEST_F(RequestHandlerTcpTests, TriesConnectWhenNotConnected) {
     ExpectFailConnect();
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(false));
+    ASSERT_THAT(retry, Eq(true));
 }
 
 TEST_F(RequestHandlerTcpTests, FailsWhenCannotAuthorize) {
-    ExpectOKConnect();
-    ExpectFailAuthorize();
+    ExpectOKConnect(true);
+    ExpectFailAuthorize(true);
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
     request_handler.TearDownProcessingRequestLocked(success);
 
-    ASSERT_THAT(success, Eq(false));
     ASSERT_THAT(n_connections, Eq(0));
+    ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kWrongInput));
+    ASSERT_THAT(callback_called, Eq(true));
+    ASSERT_THAT(success, Eq(false));
+    ASSERT_THAT(retry, Eq(false));
+
+
 }
 
 
@@ -581,11 +585,12 @@ TEST_F(RequestHandlerTcpTests, DoesNotTryConnectWhenConnected) {
 
     ExpectFailSendHeader(true);
 
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(false));
-}
+    ASSERT_THAT(retry, Eq(true));
 
+}
 
 
 TEST_F(RequestHandlerTcpTests, DoNotCloseWhenNotConnected) {
@@ -595,9 +600,11 @@ TEST_F(RequestHandlerTcpTests, DoNotCloseWhenNotConnected) {
     ExpectFailSendHeader();
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(false));
+    ASSERT_THAT(retry, Eq(true));
+
 }
 
 
@@ -610,9 +617,11 @@ TEST_F(RequestHandlerTcpTests, CloseConnectionWhenRebalance) {
 
     EXPECT_CALL(mock_io, CloseSocket_t(_, _));
 
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(false));
+    ASSERT_THAT(retry, Eq(true));
+
 }
 
 
@@ -623,9 +632,11 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendHeader) {
     ExpectFailSendHeader();
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(false));
+    ASSERT_THAT(retry, Eq(true));
+
 }
 
 
@@ -637,9 +648,11 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendData) {
     ExpectFailSendData();
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(false));
+    ASSERT_THAT(retry, Eq(true));
+
 }
 
 TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendMetaData) {
@@ -649,9 +662,11 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendMetaData) {
     ExpectFailSendMetaData();
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(false));
+    ASSERT_THAT(retry, Eq(true));
+
 }
 
 TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) {
@@ -664,9 +679,11 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotReceiveData) {
     ExpectFailReceive(true);
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(false));
+    ASSERT_THAT(retry, Eq(true));
+
 }
 
 void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode error_code,
@@ -696,10 +713,12 @@ void RequestHandlerTcpTests::AssertImmediatelyCallBack(asapo::NetworkErrorCode e
                                   ));
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
     ASSERT_THAT(callback_err, Eq(err_template));
     ASSERT_THAT(callback_called, Eq(true));
-    ASSERT_THAT(success, Eq(true));
+    ASSERT_THAT(success, Eq(false));
+    ASSERT_THAT(retry, Eq(false));
+
 }
 
 void RequestHandlerTcpTests::ExpectGetFileSize(bool ok) {
@@ -732,10 +751,11 @@ TEST_F(RequestHandlerTcpTests, SendEmptyCallBack) {
     ExpectOKReceive();
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request_nocallback);
+    auto success = request_handler.ProcessRequestUnlocked(&request_nocallback, &retry);
 
     ASSERT_THAT(success, Eq(true));
     ASSERT_THAT(callback_called, Eq(false));
+    ASSERT_THAT(retry, Eq(false));
 }
 
 TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithReadError) {
@@ -747,11 +767,13 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithReadError) {
     ExpectFailSendFile(asapo::ProducerErrorTemplates::kLocalIOError, true);
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request_filesend);
+    auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry);
 
-    ASSERT_THAT(success, Eq(true));
+    ASSERT_THAT(success, Eq(false));
     ASSERT_THAT(callback_called, Eq(true));
     ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kLocalIOError));
+    ASSERT_THAT(retry, Eq(false));
+
 }
 
 TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithServerError) {
@@ -763,10 +785,12 @@ TEST_F(RequestHandlerTcpTests, ErrorWhenCannotSendFileWithServerError) {
     ExpectFailSendFile(asapo::ProducerErrorTemplates::kInternalServerError);
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request_filesend);
+    auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry);
 
     ASSERT_THAT(success, Eq(false));
     ASSERT_THAT(callback_called, Eq(false));
+    ASSERT_THAT(retry, Eq(true));
+
 }
 
 TEST_F(RequestHandlerTcpTests, FileRequestErrorGettingFileSize) {
@@ -774,10 +798,12 @@ TEST_F(RequestHandlerTcpTests, FileRequestErrorGettingFileSize) {
 
     request_handler.PrepareProcessingRequestLocked();
 
-    auto success = request_handler.ProcessRequestUnlocked(&request_filesend);
-    ASSERT_THAT(success, Eq(true));
+    auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry);
+    ASSERT_THAT(success, Eq(false));
     ASSERT_THAT(callback_called, Eq(true));
     ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kLocalIOError));
+    ASSERT_THAT(retry, Eq(false));
+
 }
 
 
@@ -793,10 +819,12 @@ TEST_F(RequestHandlerTcpTests, FileRequestOK) {
 
     request_handler.PrepareProcessingRequestLocked();
 
-    auto success = request_handler.ProcessRequestUnlocked(&request_filesend);
+    auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry);
     ASSERT_THAT(success, Eq(true));
     ASSERT_THAT(callback_called, Eq(true));
     ASSERT_THAT(callback_err, Eq(nullptr));
+    ASSERT_THAT(retry, Eq(false));
+
 }
 
 
@@ -808,9 +836,10 @@ TEST_F(RequestHandlerTcpTests, SendOK) {
     ExpectOKReceive();
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(true));
+    ASSERT_THAT(retry, Eq(false));
     ASSERT_THAT(callback_err, Eq(nullptr));
     ASSERT_THAT(callback_called, Eq(true));
     ASSERT_THAT(callback_header.data_size, Eq(header.data_size));
@@ -832,9 +861,11 @@ TEST_F(RequestHandlerTcpTests, SendMetadataIgnoresIngestMode) {
     request.header.op_code = asapo::kOpcodeTransferMetaData;
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(true));
+    ASSERT_THAT(retry, Eq(false));
+
 }
 
 
@@ -849,9 +880,10 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) {
 
     request.header.custom_data[asapo::kPosIngestMode] = ingest_mode;
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(true));
+    ASSERT_THAT(retry, Eq(false));
     ASSERT_THAT(callback_header.custom_data[asapo::kPosIngestMode], Eq(ingest_mode));
 }
 
@@ -870,8 +902,10 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) {
     auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly;
 
     request_filesend.header.custom_data[asapo::kPosIngestMode] = ingest_mode;
-    auto success = request_handler.ProcessRequestUnlocked(&request_filesend);
+    auto success = request_handler.ProcessRequestUnlocked(&request_filesend, &retry);
     ASSERT_THAT(success, Eq(true));
+    ASSERT_THAT(retry, Eq(false));
+
 }
 
 
@@ -902,9 +936,10 @@ TEST_F(RequestHandlerTcpTests, SendWithWarning) {
 
 
     request_handler.PrepareProcessingRequestLocked();
-    auto success = request_handler.ProcessRequestUnlocked(&request);
+    auto success = request_handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(true));
+    ASSERT_THAT(retry, Eq(false));
     ASSERT_THAT(callback_err, Eq(asapo::ProducerErrorTemplates::kServerWarning));
     ASSERT_THAT(callback_err->Explain(), HasSubstr(expected_warning));
     ASSERT_THAT(callback_called, Eq(true));
@@ -914,6 +949,4 @@ TEST_F(RequestHandlerTcpTests, SendWithWarning) {
     ASSERT_THAT(std::string{callback_header.message}, Eq(std::string{header.message}));
 }
 
-
-
 }
diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp
index f3d1672a8..03d8ae764 100644
--- a/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp
+++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.cpp
@@ -46,7 +46,8 @@ void* ReceiverDataServerRequestHandler::GetSlot(const ReceiverDataServerRequest*
 }
 
 
-bool ReceiverDataServerRequestHandler::ProcessRequestUnlocked(GenericRequest* request) {
+bool ReceiverDataServerRequestHandler::ProcessRequestUnlocked(GenericRequest* request, bool* retry) {
+    *retry = false;
     auto receiver_request = dynamic_cast<ReceiverDataServerRequest*>(request);
     if (!CheckRequest(receiver_request)) {
         SendResponce(receiver_request, kNetErrorWrongRequest);
diff --git a/receiver/src/receiver_data_server/receiver_data_server_request_handler.h b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h
index 0ca1a46b5..4bae2ee45 100644
--- a/receiver/src/receiver_data_server/receiver_data_server_request_handler.h
+++ b/receiver/src/receiver_data_server/receiver_data_server_request_handler.h
@@ -13,7 +13,7 @@ namespace asapo {
 class ReceiverDataServerRequestHandler: public RequestHandler {
   public:
     explicit ReceiverDataServerRequestHandler(const NetServer* server, DataCache* data_cache, Statistics* statistics);
-    bool ProcessRequestUnlocked(GenericRequest* request) override;
+    bool ProcessRequestUnlocked(GenericRequest* request, bool* retry) override;
     bool ReadyProcessRequest() override;
     void PrepareProcessingRequestLocked()  override;
     void TearDownProcessingRequestLocked(bool processing_succeeded)  override;
diff --git a/receiver/src/request_handler_authorize.cpp b/receiver/src/request_handler_authorize.cpp
index 0aed9549d..d11573d2b 100644
--- a/receiver/src/request_handler_authorize.cpp
+++ b/receiver/src/request_handler_authorize.cpp
@@ -15,10 +15,13 @@ std::string RequestHandlerAuthorize::GetRequestString(const Request* request, co
     return request_string;
 }
 
-Error RequestHandlerAuthorize::ErrorFromServerResponse(const Error& err, HttpCode code) const {
+Error RequestHandlerAuthorize::ErrorFromAuthorizationServerResponse(const Error& err, HttpCode code) const {
     if (err) {
         return asapo::ReceiverErrorTemplates::kInternalServerError.Generate("cannot authorize request: " + err->Explain());
     } else {
+        if (code != HttpCode::Unauthorized) {
+            return asapo::ReceiverErrorTemplates::kInternalServerError.Generate("return code " + std::to_string(int(code)));
+        }
         return asapo::ReceiverErrorTemplates::kAuthorizationFailure.Generate("return code " + std::to_string(int(code)));
     }
 }
@@ -31,7 +34,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr
     auto response = http_client__->Post(GetReceiverConfig()->authorization_server + "/authorize", request_string, &code,
                                         &err);
     if (err || code != HttpCode::OK) {
-        auto auth_error = ErrorFromServerResponse(err, code);
+        auto auth_error = ErrorFromAuthorizationServerResponse(err, code);
         log__->Error("failure authorizing at " + GetReceiverConfig()->authorization_server + " request: " + request_string +
                      " - " +
                      auth_error->Explain());
@@ -45,7 +48,7 @@ Error RequestHandlerAuthorize::Authorize(Request* request, const char* source_cr
     (err = parser.GetString("Year", &beamtime_year_)) ||
     (err = parser.GetString("Beamline", &beamline_));
     if (err) {
-        return ErrorFromServerResponse(err, code);
+        return ErrorFromAuthorizationServerResponse(err, code);
     } else {
         log__->Debug(std::string("authorized connection from ") + request->GetOriginUri() + " beamline: " +
                      beamline_ + ", beamtime id: " + beamtime_id_ + ", stream: " + stream_);
diff --git a/receiver/src/request_handler_authorize.h b/receiver/src/request_handler_authorize.h
index 95c788026..7f46d3eba 100644
--- a/receiver/src/request_handler_authorize.h
+++ b/receiver/src/request_handler_authorize.h
@@ -30,7 +30,7 @@ class RequestHandlerAuthorize final: public ReceiverRequestHandler {
     Error ProcessAuthorizationRequest(Request* request) const;
     Error ProcessOtherRequest(Request* request) const;
     Error Authorize(Request* request, const char* source_credentials) const;
-    Error ErrorFromServerResponse(const Error& err, HttpCode code) const;
+    Error ErrorFromAuthorizationServerResponse(const Error& err, HttpCode code) const;
 
     std::string GetRequestString(const Request* request, const char* source_credentials) const;
 };
diff --git a/receiver/unittests/receiver_data_server/test_request_handler.cpp b/receiver/unittests/receiver_data_server/test_request_handler.cpp
index 6b56beb2b..d67c63cfa 100644
--- a/receiver/unittests/receiver_data_server/test_request_handler.cpp
+++ b/receiver/unittests/receiver_data_server/test_request_handler.cpp
@@ -57,6 +57,7 @@ class RequestHandlerTests : public Test {
     uint64_t expected_meta_size = 100;
     uint64_t expected_buf_id = 12345;
     uint64_t expected_source_id = 11;
+    bool retry;
     asapo::GenericRequestHeader header{asapo::kOpcodeGetBufferData, expected_buf_id, expected_data_size,
                                        expected_meta_size, ""};
     asapo::ReceiverDataServerRequest request{std::move(header), expected_source_id};
@@ -97,7 +98,7 @@ TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) {
 
     EXPECT_CALL(mock_logger, Error(HasSubstr("wrong request")));
 
-    auto success = handler.ProcessRequestUnlocked(&request);
+    auto success = handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(true));
 }
@@ -105,7 +106,7 @@ TEST_F(RequestHandlerTests, ProcessRequest_WronOpCode) {
 TEST_F(RequestHandlerTests, ProcessRequestReturnsNoDataWhenCacheNotUsed) {
     MockSendResponce(asapo::kNetErrorNoData, true);
 
-    auto success  = handler_no_cache.ProcessRequestUnlocked(&request);
+    auto success  = handler_no_cache.ProcessRequestUnlocked(&request, &retry);
     EXPECT_CALL(mock_logger, Debug(_)).Times(0);
 
     ASSERT_THAT(success, Eq(true));
@@ -116,7 +117,7 @@ TEST_F(RequestHandlerTests, ProcessRequestReadSlotReturnsNull) {
     MockSendResponce(asapo::kNetErrorNoData, true);
     EXPECT_CALL(mock_logger, Debug(HasSubstr("not found")));
 
-    auto success = handler.ProcessRequestUnlocked(&request);
+    auto success = handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(true));
 }
@@ -128,7 +129,7 @@ TEST_F(RequestHandlerTests, ProcessRequestReadSlotErrorSendingResponce) {
     EXPECT_CALL(mock_net, SendData_t(expected_source_id, &tmp, expected_data_size)).Times(0);
     EXPECT_CALL(mock_cache, UnlockSlot(_));
 
-    auto success  = handler.ProcessRequestUnlocked(&request);
+    auto success  = handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(true));
 }
@@ -145,7 +146,7 @@ TEST_F(RequestHandlerTests, ProcessRequestOk) {
     EXPECT_CALL(mock_logger, Debug(HasSubstr("sending")));
     EXPECT_CALL(mock_stat, IncreaseRequestCounter_t());
     EXPECT_CALL(mock_stat, IncreaseRequestDataVolume_t(expected_data_size));
-    auto success  = handler.ProcessRequestUnlocked(&request);
+    auto success  = handler.ProcessRequestUnlocked(&request, &retry);
 
     ASSERT_THAT(success, Eq(true));
 }
-- 
GitLab