From 20ed827a951c2bc80bd3ba7388e2d76385b6176f Mon Sep 17 00:00:00 2001
From: Sergey Yakubov <sergey.yakubov@desy.de>
Date: Wed, 6 Nov 2019 18:06:41 +0100
Subject: [PATCH] use read/send instead of sendfile

---
 common/cpp/src/system_io/system_io.cpp        |  4 +-
 common/cpp/src/system_io/system_io.h          |  2 +
 .../cpp/src/system_io/system_io_linux_mac.cpp | 37 ++++++----
 .../cpp/src/system_io/system_io_windows.cpp   | 72 ++++++++++---------
 producer/api/cpp/src/producer_request.cpp     |  2 +-
 .../cpp/src/request_handler_filesystem.cpp    |  3 +-
 producer/api/cpp/src/request_handler_tcp.cpp  |  2 -
 .../unittests/test_request_handler_tcp.cpp    |  4 +-
 8 files changed, 70 insertions(+), 56 deletions(-)

diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp
index 8a71217e9..9b72ff7a3 100644
--- a/common/cpp/src/system_io/system_io.cpp
+++ b/common/cpp/src/system_io/system_io.cpp
@@ -27,6 +27,8 @@ namespace asapo {
 const int SystemIO::kNetBufferSize = 1024 * 1024;
 const int SystemIO::kWaitTimeoutMs = 1000;
 const size_t SystemIO::kMaxTransferChunkSize = size_t(1024) * size_t(1024) * size_t(1024) * size_t(2); //2GiByte
+const size_t SystemIO::kReadBufSize = size_t(1024) * 1024 * 50; //50MiByte
+
 
 
 /*******************************************************************************
@@ -70,7 +72,7 @@ std::unique_ptr<std::tuple<std::string, uint16_t>> SystemIO::SplitAddressToHostn
     }
 }
 
-uint8_t* AllocateArray(uint64_t fsize, Error* err) {
+uint8_t* SystemIO::AllocateArray(uint64_t fsize, Error* err) const {
     uint8_t* data_array = nullptr;
     try {
         data_array = new uint8_t[(size_t)fsize];
diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h
index c2ef4d362..0c8813a2f 100644
--- a/common/cpp/src/system_io/system_io.h
+++ b/common/cpp/src/system_io/system_io.h
@@ -26,6 +26,7 @@ class SystemIO final : public IO {
   private:
     static const int kNetBufferSize;//TODO: need to set by config
     static const size_t kMaxTransferChunkSize;
+    static const size_t kReadBufSize;
 
     static const int kWaitTimeoutMs;
 
@@ -77,6 +78,7 @@ class SystemIO final : public IO {
     Error CreateEpoolIfNeeded(SocketDescriptor master_socket) const;
     Error ProcessNewConnection(SocketDescriptor master_socket, std::vector<std::string>* new_connections,
                                ListSocketDescriptors* sockets_to_listen) const;
+    uint8_t* AllocateArray(uint64_t fsize, Error* err) const;
 
 #endif
   public:
diff --git a/common/cpp/src/system_io/system_io_linux_mac.cpp b/common/cpp/src/system_io/system_io_linux_mac.cpp
index f124dcc91..490fc0d94 100644
--- a/common/cpp/src/system_io/system_io_linux_mac.cpp
+++ b/common/cpp/src/system_io/system_io_linux_mac.cpp
@@ -295,27 +295,36 @@ std::string SystemIO::AddressFromSocket(SocketDescriptor socket) const noexcept
 
 Error SystemIO::SendFile(SocketDescriptor socket_fd, const std::string& fname, size_t length) const {
 
-    ssize_t bytes_sent, total_bytes_sent = 0;
-    off_t offset = 0;
+    size_t total_bytes_sent = 0;
 
-    int in_fd = _open(fname.c_str(), O_RDONLY);
-    if (in_fd < 0) {
-        return GetLastError();
+    size_t buf_size = std::min(length, kReadBufSize);
+
+    Error err;
+    auto fd = Open(fname, IO_OPEN_MODE_READ, &err);
+    if (err != nullptr) {
+        return err;
+    }
+
+    auto data_array = std::unique_ptr<uint8_t> {AllocateArray(buf_size, &err)};
+    if (err != nullptr) {
+        return err;
     }
 
     while (total_bytes_sent < length) {
-        if ((bytes_sent = sendfile(socket_fd, in_fd, &offset,
-                                   std::min(kMaxTransferChunkSize, length - total_bytes_sent))) <= 0) {
-            if (errno == EINTR || errno == EAGAIN) {
-                // Interrupted system call/try again, just skip to the top of the loop and try again
-                continue;
-            }
-            close(in_fd);
-            return GetLastError();
+        auto bytes_read = Read(fd, data_array.get(), buf_size, &err);
+        if (err != nullptr && err != ErrorTemplates::kEndOfFile) {
+            Close(fd, nullptr);
+            return err;
+        }
+        auto bytes_sent = Send(socket_fd, data_array.get(), bytes_read, &err);
+        if (err != nullptr) {
+            Close(fd, nullptr);
+            return err;
         }
         total_bytes_sent += bytes_sent;
     }
-    close(in_fd);
+
+    Close(fd, nullptr);
     return nullptr;
 }
 
diff --git a/common/cpp/src/system_io/system_io_windows.cpp b/common/cpp/src/system_io/system_io_windows.cpp
index e8bc87528..5c059643d 100644
--- a/common/cpp/src/system_io/system_io_windows.cpp
+++ b/common/cpp/src/system_io/system_io_windows.cpp
@@ -8,6 +8,8 @@
 #include <fcntl.h>
 #include <iostream>
 #include <direct.h>
+#include <mswsock.h>
+
 
 using std::string;
 using std::vector;
@@ -39,34 +41,34 @@ IOInstance::~IOInstance() {
 Error IOErrorFromGetLastError() {
     DWORD last_error = GetLastError();
     switch (last_error) {
-        case ERROR_SUCCESS:
-            return nullptr;
-        case ERROR_PATH_NOT_FOUND:
-        case ERROR_FILE_NOT_FOUND:
-            return IOErrorTemplates::kFileNotFound.Generate();
-        case ERROR_ACCESS_DENIED:
-            return IOErrorTemplates::kPermissionDenied.Generate();
-        case ERROR_CONNECTION_REFUSED:
-            return IOErrorTemplates::kConnectionRefused.Generate();
-        case WSAEFAULT:
-            return IOErrorTemplates::kInvalidMemoryAddress.Generate();
-        case WSAECONNRESET:
-            return IOErrorTemplates::kConnectionResetByPeer.Generate();
-        case WSAENOTSOCK:
-            return IOErrorTemplates::kSocketOperationOnNonSocket.Generate();
-        case WSAEWOULDBLOCK:
-            return IOErrorTemplates::kResourceTemporarilyUnavailable.Generate();
-        case WSAEADDRNOTAVAIL:
-            return IOErrorTemplates::kAddressNotValid.Generate();
-        case WSAECONNREFUSED:
-            return IOErrorTemplates::kConnectionRefused.Generate();
-        case ERROR_FILE_EXISTS:
-            return IOErrorTemplates::kFileAlreadyExists.Generate();
-        default:
-            std::cout << "[IOErrorFromGetLastError] Unknown error code: " << last_error << std::endl;
-            Error err = IOErrorTemplates::kUnknownIOError.Generate();
-            (*err).Append("Unknown error code: " + std::to_string(last_error));
-            return err;
+    case ERROR_SUCCESS:
+        return nullptr;
+    case ERROR_PATH_NOT_FOUND:
+    case ERROR_FILE_NOT_FOUND:
+        return IOErrorTemplates::kFileNotFound.Generate();
+    case ERROR_ACCESS_DENIED:
+        return IOErrorTemplates::kPermissionDenied.Generate();
+    case ERROR_CONNECTION_REFUSED:
+        return IOErrorTemplates::kConnectionRefused.Generate();
+    case WSAEFAULT:
+        return IOErrorTemplates::kInvalidMemoryAddress.Generate();
+    case WSAECONNRESET:
+        return IOErrorTemplates::kConnectionResetByPeer.Generate();
+    case WSAENOTSOCK:
+        return IOErrorTemplates::kSocketOperationOnNonSocket.Generate();
+    case WSAEWOULDBLOCK:
+        return IOErrorTemplates::kResourceTemporarilyUnavailable.Generate();
+    case WSAEADDRNOTAVAIL:
+        return IOErrorTemplates::kAddressNotValid.Generate();
+    case WSAECONNREFUSED:
+        return IOErrorTemplates::kConnectionRefused.Generate();
+    case ERROR_FILE_EXISTS:
+        return IOErrorTemplates::kFileAlreadyExists.Generate();
+    default:
+        std::cout << "[IOErrorFromGetLastError] Unknown error code: " << last_error << std::endl;
+        Error err = IOErrorTemplates::kUnknownIOError.Generate();
+        (*err).Append("Unknown error code: " + std::to_string(last_error));
+        return err;
     }
 }
 
@@ -110,10 +112,10 @@ std::chrono::system_clock::time_point FileTime2TimePoint(const FILETIME& ft, Err
     auto nsec = GetLinuxNanosecFromWindowsEpoch(ull);
 
     std::chrono::nanoseconds d = std::chrono::nanoseconds{nsec} +
-        std::chrono::seconds{sec};
+                                 std::chrono::seconds{sec};
 
     auto tp = system_clock::time_point
-        {std::chrono::duration_cast<std::chrono::system_clock::duration>(d)};
+    {std::chrono::duration_cast<std::chrono::system_clock::duration>(d)};
 
     *err = nullptr;
     return tp;
@@ -121,8 +123,8 @@ std::chrono::system_clock::time_point FileTime2TimePoint(const FILETIME& ft, Err
 
 bool IsDirectory(const WIN32_FIND_DATA f) {
     return (f.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) &&
-        strstr(f.cFileName, "..") == nullptr &&
-        strstr(f.cFileName, ".") == nullptr;
+           strstr(f.cFileName, "..") == nullptr &&
+           strstr(f.cFileName, ".") == nullptr;
 }
 
 FileInfo GetFileInfo_win(const WIN32_FIND_DATA& f, const string& name, Error* err) {
@@ -313,9 +315,9 @@ std::string SystemIO::AddressFromSocket(SocketDescriptor socket) const noexcept
 }
 
 ListSocketDescriptors SystemIO::WaitSocketsActivity(SocketDescriptor master_socket,
-                                                    ListSocketDescriptors* sockets_to_listen,
-                                                    std::vector<std::string>* new_connections,
-                                                    Error* err) const {
+        ListSocketDescriptors* sockets_to_listen,
+        std::vector<std::string>* new_connections,
+        Error* err) const {
     fd_set readfds;
     ListSocketDescriptors active_sockets;
     bool client_activity = false;
diff --git a/producer/api/cpp/src/producer_request.cpp b/producer/api/cpp/src/producer_request.cpp
index 521ff7e81..53f589e0f 100644
--- a/producer/api/cpp/src/producer_request.cpp
+++ b/producer/api/cpp/src/producer_request.cpp
@@ -43,7 +43,7 @@ ProducerRequest::~ProducerRequest() {
     }
 }
 Error ProducerRequest::UpdateDataSizeFromFileIfNeeded(const IO* io) {
-    if (data != nullptr || original_filepath.empty() || header.data_size > 0) {
+    if (!DataFromFile() || header.data_size > 0) {
         return nullptr;
     }
 
diff --git a/producer/api/cpp/src/request_handler_filesystem.cpp b/producer/api/cpp/src/request_handler_filesystem.cpp
index 580e5c380..7c1ff56d4 100644
--- a/producer/api/cpp/src/request_handler_filesystem.cpp
+++ b/producer/api/cpp/src/request_handler_filesystem.cpp
@@ -20,7 +20,8 @@ bool RequestHandlerFilesystem::ProcessRequestUnlocked(GenericRequest* request) {
     auto producer_request = static_cast<ProducerRequest*>(request);
     Error err;
     if (producer_request->DataFromFile()) {
-        auto data = io__->GetDataFromFile(producer_request->original_filepath, &producer_request->header.data_size, &err);
+        producer_request->data = io__->GetDataFromFile(producer_request->original_filepath, &producer_request->header.data_size,
+                                                       &err);
         if (err) {
             return false;
         }
diff --git a/producer/api/cpp/src/request_handler_tcp.cpp b/producer/api/cpp/src/request_handler_tcp.cpp
index f408b7539..3fc32f4fb 100644
--- a/producer/api/cpp/src/request_handler_tcp.cpp
+++ b/producer/api/cpp/src/request_handler_tcp.cpp
@@ -6,12 +6,10 @@
 
 namespace asapo {
 
-
 RequestHandlerTcp::RequestHandlerTcp(ReceiverDiscoveryService* discovery_service, uint64_t thread_id,
                                      uint64_t* shared_counter):
     io__{GenerateDefaultIO()}, log__{GetDefaultProducerLogger()}, discovery_service__{discovery_service}, thread_id_{thread_id},
     ncurrent_connections_{shared_counter} {
-
 }
 
 Error RequestHandlerTcp::Authorize(const std::string& beamtime_id) {
diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
index a5a962fb0..36e3d0cd1 100644
--- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp
+++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp
@@ -844,7 +844,7 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyOK) {
 }
 
 TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) {
-    ExpectGetFileSize(true);
+    expected_file_size = 0;
     ExpectOKConnect(true);
     ExpectOKAuthorize(true);
     ExpectOKSendHeader(true);
@@ -854,7 +854,7 @@ TEST_F(RequestHandlerTcpTests, SendMetaOnlyForFileReadOK) {
     request_handler.PrepareProcessingRequestLocked();
 
     EXPECT_CALL(mock_io, SendFile_t(_, _, _)).Times(0);
-
+    EXPECT_CALL(mock_io, GetFileInfo_t(_, _)).Times(0);
     auto ingest_mode = asapo::IngestModeFlags::kTransferMetaDataOnly;
 
     request_filesend.header.custom_data[asapo::kPosIngestMode] = ingest_mode;
-- 
GitLab