From 8272d801cafa1baae2a22f8767820b163ded961d Mon Sep 17 00:00:00 2001 From: Carsten Patzke <carsten.patzke@desy.de> Date: Wed, 18 Mar 2020 15:52:19 +0100 Subject: [PATCH] Introduced named threads for better debugging --- .../include/rapidjson/internal/diyfp.h | 18 ++++----- 3d_party/rapidjson/include/rapidjson/reader.h | 6 +-- .../include/spdlog/fmt/bundled/format.h | 20 +++++----- common/cpp/include/http_client/http_error.h | 4 +- common/cpp/include/io/io.h | 12 ++++-- common/cpp/include/unittests/MockIO.h | 8 +++- .../cpp/src/http_client/curl_http_client.cpp | 6 +-- common/cpp/src/system_io/system_io.cpp | 13 ++++++- common/cpp/src/system_io/system_io.h | 12 +++++- .../cpp/src/system_io/system_io_linux_mac.cpp | 5 +++ .../cpp/src/system_io/system_io_windows.cpp | 4 ++ .../api/cpp/include/consumer/consumer_error.h | 14 +++---- consumer/api/cpp/src/server_data_broker.cpp | 39 ++++++++++--------- consumer/api/cpp/src/server_data_broker.h | 4 +- .../api/cpp/unittests/test_server_broker.cpp | 3 +- examples/pipeline/in_to_out/in_to_out.cpp | 2 +- producer/api/cpp/src/producer_impl.h | 4 +- .../api/cpp/unittests/test_producer_impl.cpp | 2 +- .../test_request_handler_filesystem.cpp | 10 ++--- .../unittests/test_request_handler_tcp.cpp | 22 +++++------ .../src/system_folder_watch_windows.cpp | 2 +- receiver/src/receiver.cpp | 6 +-- receiver/src/receiver_error.h | 12 +++--- receiver/src/request_handler_receive_data.cpp | 2 +- .../src/request_handler_receive_metadata.cpp | 2 +- .../test_request_handler.cpp | 2 +- .../unittests/test_requests_dispatcher.cpp | 2 +- .../client_serv/ip_tcp_network.cpp | 2 +- .../client_serv_multicon/multicon.cpp | 8 ++-- 29 files changed, 142 insertions(+), 104 deletions(-) diff --git a/3d_party/rapidjson/include/rapidjson/internal/diyfp.h b/3d_party/rapidjson/include/rapidjson/internal/diyfp.h index 743a7fb85..d47dd284a 100644 --- a/3d_party/rapidjson/include/rapidjson/internal/diyfp.h +++ b/3d_party/rapidjson/include/rapidjson/internal/diyfp.h @@ -210,15 +210,15 @@ inline DiyFp GetCachedPowerByIndex(size_t index) { }; static const int16_t kCachedPowers_E[] = { -1220, -1193, -1166, -1140, -1113, -1087, -1060, -1034, -1007, -980, - -954, -927, -901, -874, -847, -821, -794, -768, -741, -715, - -688, -661, -635, -608, -582, -555, -529, -502, -475, -449, - -422, -396, -369, -343, -316, -289, -263, -236, -210, -183, - -157, -130, -103, -77, -50, -24, 3, 30, 56, 83, - 109, 136, 162, 189, 216, 242, 269, 295, 322, 348, - 375, 402, 428, 455, 481, 508, 534, 561, 588, 614, - 641, 667, 694, 720, 747, 774, 800, 827, 853, 880, - 907, 933, 960, 986, 1013, 1039, 1066 - }; + -954, -927, -901, -874, -847, -821, -794, -768, -741, -715, + -688, -661, -635, -608, -582, -555, -529, -502, -475, -449, + -422, -396, -369, -343, -316, -289, -263, -236, -210, -183, + -157, -130, -103, -77, -50, -24, 3, 30, 56, 83, + 109, 136, 162, 189, 216, 242, 269, 295, 322, 348, + 375, 402, 428, 455, 481, 508, 534, 561, 588, 614, + 641, 667, 694, 720, 747, 774, 800, 827, 853, 880, + 907, 933, 960, 986, 1013, 1039, 1066 + }; return DiyFp(kCachedPowers_F[index], kCachedPowers_E[index]); } diff --git a/3d_party/rapidjson/include/rapidjson/reader.h b/3d_party/rapidjson/include/rapidjson/reader.h index 206b26141..9b28b2685 100644 --- a/3d_party/rapidjson/include/rapidjson/reader.h +++ b/3d_party/rapidjson/include/rapidjson/reader.h @@ -1223,8 +1223,7 @@ class GenericReader { } i = i * 10 + static_cast<unsigned>(s.TakePush() - '0'); significandDigit++; - } - else + } else while (RAPIDJSON_LIKELY(s.Peek() >= '0' && s.Peek() <= '9')) { if (RAPIDJSON_UNLIKELY(i >= 429496729)) { // 2^32 - 1 = 4294967295 if (RAPIDJSON_LIKELY(i != 429496729 || s.Peek() > '5')) { @@ -1265,8 +1264,7 @@ class GenericReader { } i64 = i64 * 10 + static_cast<unsigned>(s.TakePush() - '0'); significandDigit++; - } - else + } else while (RAPIDJSON_LIKELY(s.Peek() >= '0' && s.Peek() <= '9')) { if (RAPIDJSON_UNLIKELY(i64 >= RAPIDJSON_UINT64_C2(0x19999999, 0x99999999))) // 2^64 - 1 = 18446744073709551615 if (RAPIDJSON_LIKELY(i64 != RAPIDJSON_UINT64_C2(0x19999999, 0x99999999) || s.Peek() > '5')) { diff --git a/3d_party/spd_log/include/spdlog/fmt/bundled/format.h b/3d_party/spd_log/include/spdlog/fmt/bundled/format.h index 1e5319fd2..b23ff8b45 100644 --- a/3d_party/spd_log/include/spdlog/fmt/bundled/format.h +++ b/3d_party/spd_log/include/spdlog/fmt/bundled/format.h @@ -2372,19 +2372,19 @@ template <std::size_t N> struct ArgArray<N, true/*IsPacked*/> { typedef Value Type[N > 0 ? N : 1]; -template <typename Formatter, typename T> -static Value make(const T& value) { + template <typename Formatter, typename T> + static Value make(const T& value) { #ifdef __clang__ - Value result = MakeValue<Formatter>(value); - // Workaround a bug in Apple LLVM version 4.2 (clang-425.0.28) of clang: - // https://github.com/fmtlib/fmt/issues/276 - (void)result.custom.format; - return result; + Value result = MakeValue<Formatter>(value); + // Workaround a bug in Apple LLVM version 4.2 (clang-425.0.28) of clang: + // https://github.com/fmtlib/fmt/issues/276 + (void)result.custom.format; + return result; #else - return MakeValue<Formatter>(value); + return MakeValue<Formatter>(value); #endif -} - }; + } +}; template <std::size_t N> struct ArgArray<N, false/*IsPacked*/> { diff --git a/common/cpp/include/http_client/http_error.h b/common/cpp/include/http_client/http_error.h index 79392511c..13660260f 100644 --- a/common/cpp/include/http_client/http_error.h +++ b/common/cpp/include/http_client/http_error.h @@ -15,11 +15,11 @@ using HttpErrorTemplate = ServiceErrorTemplate<HttpErrorType, ErrorType::kHttpEr namespace HttpErrorTemplates { -auto const kTransferError = HttpErrorTemplate{ +auto const kTransferError = HttpErrorTemplate { "possible transfer error", HttpErrorType::kTransferError }; -auto const kConnectionError = HttpErrorTemplate{ +auto const kConnectionError = HttpErrorTemplate { "connection error", HttpErrorType::kConnectionError }; diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index b78016f40..dade78385 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -14,7 +14,7 @@ namespace asapo { -//Need to be "enum" since multiple flags are allowed +// Can't be enum class since multiple flags are allowed enum FileOpenMode : unsigned short { IO_OPEN_MODE_READ = 1, IO_OPEN_MODE_WRITE = 1 << 1, @@ -49,9 +49,15 @@ class IO { public: /* - * Special + * Special thread functions, the name is limited to 15 chars. + * More then 16 will result in a truncation. + * Setting the name is a best effort feature and currently just works on UNIX systems. + * The indexed function will add :<index> as an postfix to the name. */ - virtual std::unique_ptr<std::thread> NewThread (std::function<void()> function) const = 0; + virtual std::unique_ptr<std::thread> NewThread(const std::string& name, + std::function<void()> function) const = 0; + virtual std::unique_ptr<std::thread> NewThread(const std::string& name, + std::function<void(uint64_t index)> function, uint64_t index) const = 0; /* * Network diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index c57017921..90204e58c 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -25,11 +25,17 @@ class MockIO : public IO { MOCK_CONST_METHOD1(AddressFromSocket_t, std::string (SocketDescriptor socket)); - std::unique_ptr<std::thread> NewThread(std::function<void()> function) const override { + std::unique_ptr<std::thread> NewThread(const std::string& name, std::function<void()> function) const override { return std::unique_ptr<std::thread>(NewThread_t(function)); } MOCK_CONST_METHOD1(NewThread_t, std::thread * (std::function<void()> function)); + std::unique_ptr<std::thread> NewThread(const std::string& name, std::function<void(uint64_t index)> function, + uint64_t index) const override { + return std::unique_ptr<std::thread>(NewThread_t(function, index)); + } + MOCK_CONST_METHOD2(NewThread_t, std::thread * (std::function<void(uint64_t)> function, uint64_t index)); + ListSocketDescriptors WaitSocketsActivity(SocketDescriptor master_socket, ListSocketDescriptors* sockets_to_listen, std::vector<std::string>* new_connections, Error* err) const override { diff --git a/common/cpp/src/http_client/curl_http_client.cpp b/common/cpp/src/http_client/curl_http_client.cpp index 7698e7f81..ca543c0a6 100644 --- a/common/cpp/src/http_client/curl_http_client.cpp +++ b/common/cpp/src/http_client/curl_http_client.cpp @@ -111,7 +111,7 @@ Error CurlHttpClient::Command(bool post, CurlDataContainer* data_container, cons FileData AllocateMemory(uint64_t size, Error* err) { FileData data; try { - data = FileData{new uint8_t[(size_t)size +1 ]}; + data = FileData{new uint8_t[(size_t)size + 1 ]}; } catch (...) { *err = ErrorTemplates::kMemoryAllocationError.Generate(); return nullptr; @@ -129,7 +129,7 @@ Error CurlHttpClient::Post(const std::string& uri, Error err; CurlDataContainer data_container; data_container.mode = CurlDataMode::array; - uint64_t extended_size =output_data_size + 10000; // for error messages + uint64_t extended_size = output_data_size + 10000; // for error messages *output_data = AllocateMemory(extended_size, &err); if (err) { return err; @@ -141,7 +141,7 @@ Error CurlHttpClient::Post(const std::string& uri, if (*response_code == HttpCode::OK) { if (output_data_size != data_container.bytes_received) { return HttpErrorTemplates::kTransferError.Generate("received " + - std::to_string(data_container.bytes_received) + ", expected " + std::to_string(output_data_size) + "bytes"); + std::to_string(data_container.bytes_received) + ", expected " + std::to_string(output_data_size) + "bytes"); } (*output_data)[output_data_size] = 0; // for reinterpret cast to string worked } else { diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index e3c8b5dc9..3da71da9d 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -208,8 +208,17 @@ std::string SystemIO::ReadFileToString(const std::string& fname, Error* err) con return std::string(reinterpret_cast<const char*>(data.get()), (size_t)size); } -std::unique_ptr<std::thread> SystemIO::NewThread(std::function<void()> function) const { - return std::unique_ptr<std::thread>(new std::thread(function)); +std::unique_ptr<std::thread> SystemIO::NewThread(const std::string& name, std::function<void()> function) const { + auto thread = std::unique_ptr<std::thread>(new std::thread(function)); + SetThreadName(thread.get(), name); + return thread; +} + +std::unique_ptr<std::thread> SystemIO::NewThread(const std::string& name, std::function<void(uint64_t index)> function, + uint64_t index) const { + auto thread = std::unique_ptr<std::thread>(new std::thread(function, index)); + SetThreadName(thread.get(), name + ":" + std::to_string(index)); + return thread; } void SystemIO::Skip(SocketDescriptor socket_fd, size_t length, Error* err) const { diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index 78621e1fc..a97bb1152 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -42,6 +42,8 @@ class SystemIO final : public IO { ListSocketDescriptors* sockets_to_listen) const; #endif + void SetThreadName(std::thread* threadHandle, const std::string& name) const; + void ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const; //void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, IOErrors* err) const; @@ -88,9 +90,15 @@ class SystemIO final : public IO { public: ~SystemIO(); /* - * Special + * Special thread functions, the name is limited to 15 chars. + * More then 16 will result in a truncation. + * Setting the name is a best effort feature and currently just works on UNIX systems. + * The indexed function will add :<index> as an postfix to the name. */ - std::unique_ptr<std::thread> NewThread(std::function<void()> function) const override; + std::unique_ptr<std::thread> NewThread(const std::string& name, + std::function<void()> function) const override; + std::unique_ptr<std::thread> NewThread(const std::string& name, + std::function<void(uint64_t index)> function, uint64_t index) const override; // this is not standard function - to be implemented differently in windows and linux diff --git a/common/cpp/src/system_io/system_io_linux_mac.cpp b/common/cpp/src/system_io/system_io_linux_mac.cpp index aa8d193ae..56fb401e7 100644 --- a/common/cpp/src/system_io/system_io_linux_mac.cpp +++ b/common/cpp/src/system_io/system_io_linux_mac.cpp @@ -201,6 +201,11 @@ void SystemIO::CollectFileInformationRecursively(const std::string& path, closedir(dir); } +void SystemIO::SetThreadName(std::thread* threadHandle, const std::string& name) const { + // If the length of name is greater than 15 characters, the excess characters are ignored. + pthread_setname_np(threadHandle->native_handle(), name.c_str()); +} + void SystemIO::ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const { //TODO: Need to change network layer code, so everything can be NonBlocking int flag = 1; diff --git a/common/cpp/src/system_io/system_io_windows.cpp b/common/cpp/src/system_io/system_io_windows.cpp index 2ff541f62..1fb5c0325 100644 --- a/common/cpp/src/system_io/system_io_windows.cpp +++ b/common/cpp/src/system_io/system_io_windows.cpp @@ -231,6 +231,10 @@ void SystemIO::CollectFileInformationRecursively(const std::string& path, } +void SystemIO::SetThreadName(std::thread* threadHandle, const std::string& name) const { + // Not supported +} + void asapo::SystemIO::ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const { //TODO: Seeing issues when using these settings - need further investigation //Event if NonBlockingIO is set, it seems that _recv is a blocking call :/ diff --git a/consumer/api/cpp/include/consumer/consumer_error.h b/consumer/api/cpp/include/consumer/consumer_error.h index 4f50ae3b0..2cb3929c5 100644 --- a/consumer/api/cpp/include/consumer/consumer_error.h +++ b/consumer/api/cpp/include/consumer/consumer_error.h @@ -29,31 +29,31 @@ class ConsumerErrorData : public CustomErrorData { namespace ConsumerErrorTemplates { -auto const kLocalIOError = ConsumerErrorTemplate{ +auto const kLocalIOError = ConsumerErrorTemplate { "local i/o error", ConsumerErrorType::kLocalIOError }; -auto const kStreamFinished = ConsumerErrorTemplate{ +auto const kStreamFinished = ConsumerErrorTemplate { "stream finished", ConsumerErrorType::kStreamFinished }; -auto const kEndOfStream = ConsumerErrorTemplate{ +auto const kEndOfStream = ConsumerErrorTemplate { "no data - end of stream", ConsumerErrorType::kEndOfStream }; -auto const kNoData = ConsumerErrorTemplate{ +auto const kNoData = ConsumerErrorTemplate { "no data", ConsumerErrorType::kNoData }; -auto const kWrongInput = ConsumerErrorTemplate{ +auto const kWrongInput = ConsumerErrorTemplate { "wrong input", ConsumerErrorType::kWrongInput }; -auto const kInterruptedTransaction = ConsumerErrorTemplate{ +auto const kInterruptedTransaction = ConsumerErrorTemplate { "error from broker server", ConsumerErrorType::kInterruptedTransaction }; -auto const kUnavailableService = ConsumerErrorTemplate{ +auto const kUnavailableService = ConsumerErrorTemplate { "service unavailable", ConsumerErrorType::kUnavailableService }; diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 0a54c141d..883f32b1e 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -69,7 +69,7 @@ Error ConsumerErrorFromHttpCode(const RequestOutput* response, const HttpCode& c Error ConsumerErrorFromServerError(const Error& server_err) { if (server_err == HttpErrorTemplates::kTransferError) { return ConsumerErrorTemplates::kInterruptedTransaction.Generate( - "error processing request: " + server_err->Explain()); + "error processing request: " + server_err->Explain()); } else { return ConsumerErrorTemplates::kUnavailableService.Generate("error processing request: " + server_err->Explain()); } @@ -105,27 +105,27 @@ std::string ServerDataBroker::RequestWithToken(std::string uri) { return std::move(uri) + "?token=" + source_credentials_.user_token; } -Error ServerDataBroker::ProcessPostRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code) { +Error ServerDataBroker::ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) { Error err; switch (request.output_mode) { - case OutputDataMode::string: - response->string_output = - httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, - request.cookie, - request.body, - code, - &err); - break; - case OutputDataMode::array: - err = httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, - request.body, &response->data_output, response->data_output_size, code); - break; + case OutputDataMode::string: + response->string_output = + httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, + request.cookie, + request.body, + code, + &err); + break; + case OutputDataMode::array: + err = httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, + request.body, &response->data_output, response->data_output_size, code); + break; } return err; } -Error ServerDataBroker::ProcessGetRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code) { +Error ServerDataBroker::ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) { Error err; response->string_output = httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, code, &err); @@ -137,9 +137,9 @@ Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInf Error err; HttpCode code; if (request.post) { - err = ProcessPostRequest(request,response,&code); + err = ProcessPostRequest(request, response, &code); } else { - err = ProcessGetRequest(request,response,&code); + err = ProcessGetRequest(request, response, &code); } if (err && service_uri) { service_uri->clear(); @@ -147,7 +147,7 @@ Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInf return ProcessRequestResponce(err, response, code); } -Error ServerDataBroker::DiscoverService(const std::string& service_name , std::string* uri_to_set) { +Error ServerDataBroker::DiscoverService(const std::string& service_name, std::string* uri_to_set) { if (!uri_to_set->empty()) { return nullptr; } @@ -600,7 +600,8 @@ RequestInfo ServerDataBroker::CreateFolderTokenRequest() const { ri.host = endpoint_; ri.api = "/authorizer/folder"; ri.post = true; - ri.body = "{\"Folder\":\"" + source_path_ + "\",\"BeamtimeId\":\"" + source_credentials_.beamtime_id + "\",\"Token\":\"" + + ri.body = "{\"Folder\":\"" + source_path_ + "\",\"BeamtimeId\":\"" + source_credentials_.beamtime_id + "\",\"Token\":\"" + + source_credentials_.user_token + "\"}"; return ri; } diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index fb5111ebc..702632c65 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -120,8 +120,8 @@ class ServerDataBroker final : public asapo::DataBroker { std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); Error FtsRequestWithTimeout(const FileInfo* info, FileData* data); Error RequestDataFromFts(const FileInfo* info, FileData* data); - Error ProcessPostRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code); - Error ProcessGetRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code); + Error ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code); + Error ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code); DataSet DecodeDatasetFromResponse(std::string response, Error* err); RequestInfo PrepareRequestInfo(std::string api_url, bool dataset); diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 2821fcd15..aba9bfe5e 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -1050,7 +1050,8 @@ void ServerDataBrokerTests::MockBeforeFTS(FileData* data) { } void ServerDataBrokerTests::ExpectFolderToken() { - std::string expected_folder_query_string = "{\"Folder\":\"" + expected_path + "\",\"BeamtimeId\":\"" + expected_beamtime_id + std::string expected_folder_query_string = "{\"Folder\":\"" + expected_path + "\",\"BeamtimeId\":\"" + + expected_beamtime_id + "\",\"Token\":\"" + expected_token + "\"}"; EXPECT_CALL(mock_http_client, Post_t(HasSubstr(expected_server_uri + "/authorizer/folder"), diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index b5813df80..3721cfd5d 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -165,7 +165,7 @@ std::vector<std::thread> StartConsumerThreads(const Args& args, const ProducerPt return threads; } -int ProcessAllData(const Args& args, const ProducerPtr& producer , uint64_t* duration_ms, int* nerrors) { +int ProcessAllData(const Args& args, const ProducerPtr& producer, uint64_t* duration_ms, int* nerrors) { asapo::FileInfo fi; system_clock::time_point t1 = system_clock::now(); diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 371277f97..001cca6a5 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -31,11 +31,11 @@ class ProducerImpl : public Producer { void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; Error SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode, RequestCallback callback) override; - Error SendData__(const EventHeader& event_header, void* data , uint64_t ingest_mode, + Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode, RequestCallback callback) override; Error SendData(const EventHeader& event_header, std::string substream, FileData data, uint64_t ingest_mode, RequestCallback callback) override; - Error SendData__(const EventHeader& event_header, std::string substream, void* data , uint64_t ingest_mode, + Error SendData__(const EventHeader& event_header, std::string substream, void* data, uint64_t ingest_mode, RequestCallback callback) override; void StopThreads__() override; Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 1442d96d0..aff2e2ffc 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -73,7 +73,7 @@ class ProducerImplTests : public testing::Test { std::string expected_next_substream = "next_substream"; asapo::SourceCredentials expected_credentials{"beamtime_id", "beamline", "subname", "token" - }; + }; asapo::SourceCredentials expected_default_credentials{ "beamtime_id", "", "", "token" }; diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp index da7be9fba..3b6eb7cf7 100644 --- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp @@ -54,14 +54,14 @@ class RequestHandlerFilesystemTests : public testing::Test { asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, - expected_meta_size, expected_file_name}; + expected_meta_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; asapo::ProducerRequest request{"", header, nullptr, "", "", [this](asapo::GenericRequestHeader header, asapo::Error err) { - called = true; - callback_err = std::move(err); - callback_header = header; - }, true, 0}; + called = true; + callback_err = std::move(err); + callback_header = header; + }, true, 0}; asapo::ProducerRequest request_nocallback{"", header, nullptr, "", "", nullptr, true, 0}; asapo::ProducerRequest request_filesend{"", header, nullptr, "", expected_origin_fullpath, nullptr, true, 0}; diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index b8f480184..c3cd91995 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -65,26 +65,26 @@ class RequestHandlerTcpTests : public testing::Test { asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, - expected_meta_size, expected_file_name, expected_substream}; + expected_meta_size, expected_file_name, expected_substream}; asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size, - expected_file_name, expected_substream}; + expected_file_name, expected_substream}; bool callback_called = false; asapo::GenericRequestHeader callback_header; asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, expected_metadata, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { - callback_called = true; - callback_err = std::move(err); - callback_header = header; - }, true, 0}; + callback_called = true; + callback_err = std::move(err); + callback_header = header; + }, true, 0}; std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; asapo::ProducerRequest request_filesend{expected_beamtime_id, header_fromfile, nullptr, expected_metadata, - expected_origin_fullpath, [this](asapo::GenericRequestHeader header, asapo::Error err) { - callback_called = true; - callback_err = std::move(err); - callback_header = header; - }, true, 0}; + expected_origin_fullpath, [this](asapo::GenericRequestHeader header, asapo::Error err) { + callback_called = true; + callback_err = std::move(err); + callback_header = header; + }, true, 0}; asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr, true, 0}; diff --git a/producer/event_monitor_producer/src/system_folder_watch_windows.cpp b/producer/event_monitor_producer/src/system_folder_watch_windows.cpp index b3a17bcc6..37f7bd418 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/src/system_folder_watch_windows.cpp @@ -11,7 +11,7 @@ namespace asapo { Error SystemFolderWatch::StartFolderMonitor(const std::string& root_folder, const std::vector<std::string>& monitored_folders) { for (auto& folder : monitored_folders ) { - auto thread = io__->NewThread([root_folder, folder, this] { + auto thread = io__->NewThread("FolderMonitor", [root_folder, folder, this] { auto folder_watch = std::unique_ptr<SingleFolderWatch>(new SingleFolderWatch(root_folder, folder, &event_list_)); while (true) { auto err = folder_watch->Watch(); diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index 6ea50c250..a5c98ce9e 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -55,7 +55,8 @@ void Receiver::ProcessConnections(Error* err) { void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address) { log__->Info("new connection from " + address); - auto thread = io__->NewThread([connection_socket_fd, address, this] { + auto thread = io__->NewThread("ConFd:" + std::to_string(connection_socket_fd), + [connection_socket_fd, address, this] { auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, GetReceiverConfig()->tag)); connection->Listen(); }); @@ -63,7 +64,6 @@ void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, cons if (thread) { threads_.emplace_back(std::move(thread)); } - return; } -} \ No newline at end of file +} diff --git a/receiver/src/receiver_error.h b/receiver/src/receiver_error.h index 9a2fc4874..40c7537a0 100644 --- a/receiver/src/receiver_error.h +++ b/receiver/src/receiver_error.h @@ -19,29 +19,29 @@ using ReceiverErrorTemplate = ServiceErrorTemplate<ReceiverErrorType, ErrorType: namespace ReceiverErrorTemplates { -auto const kWarningDuplicatedRequest = ReceiverErrorTemplate{ +auto const kWarningDuplicatedRequest = ReceiverErrorTemplate { "Duplicated request, possible due to retry", ReceiverErrorType::kWarningDuplicatedRequest }; -auto const kInvalidOpCode = ReceiverErrorTemplate{ +auto const kInvalidOpCode = ReceiverErrorTemplate { "Invalid Opcode", ReceiverErrorType::kInvalidOpCode }; -auto const kInternalServerError = ReceiverErrorTemplate{ +auto const kInternalServerError = ReceiverErrorTemplate { "server error", ReceiverErrorType::kInternalServerError }; -auto const kBadRequest = ReceiverErrorTemplate{ +auto const kBadRequest = ReceiverErrorTemplate { "Bad request", ReceiverErrorType::kBadRequest }; -auto const kAuthorizationFailure = ReceiverErrorTemplate{ +auto const kAuthorizationFailure = ReceiverErrorTemplate { "authorization failure", ReceiverErrorType::kAuthorizationFailure }; -auto const kReAuthorizationFailure = ReceiverErrorTemplate{ +auto const kReAuthorizationFailure = ReceiverErrorTemplate { "reauthorization for auto beamtime failed", ReceiverErrorType::kReAuthorizationFailure }; diff --git a/receiver/src/request_handler_receive_data.cpp b/receiver/src/request_handler_receive_data.cpp index 28b59048f..26e78d3ca 100644 --- a/receiver/src/request_handler_receive_data.cpp +++ b/receiver/src/request_handler_receive_data.cpp @@ -25,7 +25,7 @@ Error RequestHandlerReceiveData::ProcessRequest(Request* request) const { return err; } -RequestHandlerReceiveData::RequestHandlerReceiveData() : io__{GenerateDefaultIO()} , log__{GetDefaultReceiverLogger()} { +RequestHandlerReceiveData::RequestHandlerReceiveData() : io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { } diff --git a/receiver/src/request_handler_receive_metadata.cpp b/receiver/src/request_handler_receive_metadata.cpp index 93c8d283c..974a906e4 100644 --- a/receiver/src/request_handler_receive_metadata.cpp +++ b/receiver/src/request_handler_receive_metadata.cpp @@ -24,7 +24,7 @@ Error RequestHandlerReceiveMetaData::ProcessRequest(Request* request) const { return nullptr; } -RequestHandlerReceiveMetaData::RequestHandlerReceiveMetaData() : io__{GenerateDefaultIO()} , log__{GetDefaultReceiverLogger()} { +RequestHandlerReceiveMetaData::RequestHandlerReceiveMetaData() : io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { } diff --git a/receiver/unittests/receiver_data_server/test_request_handler.cpp b/receiver/unittests/receiver_data_server/test_request_handler.cpp index d67c63cfa..fa5b8d8dd 100644 --- a/receiver/unittests/receiver_data_server/test_request_handler.cpp +++ b/receiver/unittests/receiver_data_server/test_request_handler.cpp @@ -59,7 +59,7 @@ class RequestHandlerTests : public Test { uint64_t expected_source_id = 11; bool retry; asapo::GenericRequestHeader header{asapo::kOpcodeGetBufferData, expected_buf_id, expected_data_size, - expected_meta_size, ""}; + expected_meta_size, ""}; asapo::ReceiverDataServerRequest request{std::move(header), expected_source_id}; uint8_t tmp; void SetUp() override { diff --git a/receiver/unittests/test_requests_dispatcher.cpp b/receiver/unittests/test_requests_dispatcher.cpp index 5b887b046..8818e5dce 100644 --- a/receiver/unittests/test_requests_dispatcher.cpp +++ b/receiver/unittests/test_requests_dispatcher.cpp @@ -86,7 +86,7 @@ class MockRequestFactory: public asapo::RequestFactory { } MOCK_CONST_METHOD4(GenerateRequest_t, Request * (const GenericRequestHeader&, - SocketDescriptor , std::string , + SocketDescriptor, std::string, ErrorInterface**)); }; diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp index 31517dec8..2c3dea1c3 100644 --- a/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp +++ b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp @@ -36,7 +36,7 @@ void ExitIfErrIsNotOk(Error* err, int exit_number) { } std::unique_ptr<std::thread> CreateEchoServerThread() { - return io->NewThread([&] { + return io->NewThread("EchoServer", [&] { Error err; FileDescriptor socket = io->CreateSocket(AddressFamilies::INET, SocketTypes::STREAM, SocketProtocols::IP, &err); ExitIfErrIsNotOk(&err, 100); diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp index 37e5af9be..12a9fbc2e 100644 --- a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp +++ b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp @@ -36,7 +36,7 @@ void ExitIfErrIsNotOk(Error* err, int exit_number) { } std::unique_ptr<std::thread> CreateEchoServerThread() { - return io->NewThread([&] { + return io->NewThread("EchoServer", [&] { Error err; SocketDescriptor master_socket = io->CreateAndBindIPTCPSocketListener(kListenAddress, 3, &err); std::cout << "[SERVER] master socket " << master_socket << std::endl; @@ -113,13 +113,13 @@ int main(int argc, char* argv[]) { kThreadStarted.get_future().get();//Make sure that the server is started std::cout << "Check" << std::endl; - auto thread1 = io->NewThread([&] { + auto thread1 = io->NewThread("CheckNormal 1", [&] { CheckNormal(30); }); - auto thread2 = io->NewThread([&] { + auto thread2 = io->NewThread("CheckNormal 2", [&] { CheckNormal(30); }); - auto thread3 = io->NewThread([&] { + auto thread3 = io->NewThread("CheckNormal 3", [&] { CheckNormal(30); }); -- GitLab