diff --git a/3d_party/rapidjson/include/rapidjson/internal/diyfp.h b/3d_party/rapidjson/include/rapidjson/internal/diyfp.h index 743a7fb857bf8dfbb4f817c98abcd6162a995a6f..d47dd284ad5880f8ff60aa5dcddb8aa5daeb1f85 100644 --- a/3d_party/rapidjson/include/rapidjson/internal/diyfp.h +++ b/3d_party/rapidjson/include/rapidjson/internal/diyfp.h @@ -210,15 +210,15 @@ inline DiyFp GetCachedPowerByIndex(size_t index) { }; static const int16_t kCachedPowers_E[] = { -1220, -1193, -1166, -1140, -1113, -1087, -1060, -1034, -1007, -980, - -954, -927, -901, -874, -847, -821, -794, -768, -741, -715, - -688, -661, -635, -608, -582, -555, -529, -502, -475, -449, - -422, -396, -369, -343, -316, -289, -263, -236, -210, -183, - -157, -130, -103, -77, -50, -24, 3, 30, 56, 83, - 109, 136, 162, 189, 216, 242, 269, 295, 322, 348, - 375, 402, 428, 455, 481, 508, 534, 561, 588, 614, - 641, 667, 694, 720, 747, 774, 800, 827, 853, 880, - 907, 933, 960, 986, 1013, 1039, 1066 - }; + -954, -927, -901, -874, -847, -821, -794, -768, -741, -715, + -688, -661, -635, -608, -582, -555, -529, -502, -475, -449, + -422, -396, -369, -343, -316, -289, -263, -236, -210, -183, + -157, -130, -103, -77, -50, -24, 3, 30, 56, 83, + 109, 136, 162, 189, 216, 242, 269, 295, 322, 348, + 375, 402, 428, 455, 481, 508, 534, 561, 588, 614, + 641, 667, 694, 720, 747, 774, 800, 827, 853, 880, + 907, 933, 960, 986, 1013, 1039, 1066 + }; return DiyFp(kCachedPowers_F[index], kCachedPowers_E[index]); } diff --git a/3d_party/rapidjson/include/rapidjson/reader.h b/3d_party/rapidjson/include/rapidjson/reader.h index 206b261417b9bc15594fec165685339123b639c0..9b28b268530ca29101b4e351fd4228899a223e4e 100644 --- a/3d_party/rapidjson/include/rapidjson/reader.h +++ b/3d_party/rapidjson/include/rapidjson/reader.h @@ -1223,8 +1223,7 @@ class GenericReader { } i = i * 10 + static_cast<unsigned>(s.TakePush() - '0'); significandDigit++; - } - else + } else while (RAPIDJSON_LIKELY(s.Peek() >= '0' && s.Peek() <= '9')) { if (RAPIDJSON_UNLIKELY(i >= 429496729)) { // 2^32 - 1 = 4294967295 if (RAPIDJSON_LIKELY(i != 429496729 || s.Peek() > '5')) { @@ -1265,8 +1264,7 @@ class GenericReader { } i64 = i64 * 10 + static_cast<unsigned>(s.TakePush() - '0'); significandDigit++; - } - else + } else while (RAPIDJSON_LIKELY(s.Peek() >= '0' && s.Peek() <= '9')) { if (RAPIDJSON_UNLIKELY(i64 >= RAPIDJSON_UINT64_C2(0x19999999, 0x99999999))) // 2^64 - 1 = 18446744073709551615 if (RAPIDJSON_LIKELY(i64 != RAPIDJSON_UINT64_C2(0x19999999, 0x99999999) || s.Peek() > '5')) { diff --git a/3d_party/spd_log/include/spdlog/fmt/bundled/format.h b/3d_party/spd_log/include/spdlog/fmt/bundled/format.h index 1e5319fd2c741103c9d7ffa275f0a10298a7b339..b23ff8b45e74d6c9ce015cbf055fecdf7ab6c3f6 100644 --- a/3d_party/spd_log/include/spdlog/fmt/bundled/format.h +++ b/3d_party/spd_log/include/spdlog/fmt/bundled/format.h @@ -2372,19 +2372,19 @@ template <std::size_t N> struct ArgArray<N, true/*IsPacked*/> { typedef Value Type[N > 0 ? N : 1]; -template <typename Formatter, typename T> -static Value make(const T& value) { + template <typename Formatter, typename T> + static Value make(const T& value) { #ifdef __clang__ - Value result = MakeValue<Formatter>(value); - // Workaround a bug in Apple LLVM version 4.2 (clang-425.0.28) of clang: - // https://github.com/fmtlib/fmt/issues/276 - (void)result.custom.format; - return result; + Value result = MakeValue<Formatter>(value); + // Workaround a bug in Apple LLVM version 4.2 (clang-425.0.28) of clang: + // https://github.com/fmtlib/fmt/issues/276 + (void)result.custom.format; + return result; #else - return MakeValue<Formatter>(value); + return MakeValue<Formatter>(value); #endif -} - }; + } +}; template <std::size_t N> struct ArgArray<N, false/*IsPacked*/> { diff --git a/common/cpp/include/http_client/http_error.h b/common/cpp/include/http_client/http_error.h index 79392511ce4b7a85308d90dce2f02f0f90538354..13660260f9395e6334902e0b85ff6e3e0b63eeff 100644 --- a/common/cpp/include/http_client/http_error.h +++ b/common/cpp/include/http_client/http_error.h @@ -15,11 +15,11 @@ using HttpErrorTemplate = ServiceErrorTemplate<HttpErrorType, ErrorType::kHttpEr namespace HttpErrorTemplates { -auto const kTransferError = HttpErrorTemplate{ +auto const kTransferError = HttpErrorTemplate { "possible transfer error", HttpErrorType::kTransferError }; -auto const kConnectionError = HttpErrorTemplate{ +auto const kConnectionError = HttpErrorTemplate { "connection error", HttpErrorType::kConnectionError }; diff --git a/common/cpp/include/io/io.h b/common/cpp/include/io/io.h index b78016f401cbca063e79e00f3478927426065777..dade78385798a06a20ba647f009d2f0b7a5c153b 100644 --- a/common/cpp/include/io/io.h +++ b/common/cpp/include/io/io.h @@ -14,7 +14,7 @@ namespace asapo { -//Need to be "enum" since multiple flags are allowed +// Can't be enum class since multiple flags are allowed enum FileOpenMode : unsigned short { IO_OPEN_MODE_READ = 1, IO_OPEN_MODE_WRITE = 1 << 1, @@ -49,9 +49,15 @@ class IO { public: /* - * Special + * Special thread functions, the name is limited to 15 chars. + * More then 16 will result in a truncation. + * Setting the name is a best effort feature and currently just works on UNIX systems. + * The indexed function will add :<index> as an postfix to the name. */ - virtual std::unique_ptr<std::thread> NewThread (std::function<void()> function) const = 0; + virtual std::unique_ptr<std::thread> NewThread(const std::string& name, + std::function<void()> function) const = 0; + virtual std::unique_ptr<std::thread> NewThread(const std::string& name, + std::function<void(uint64_t index)> function, uint64_t index) const = 0; /* * Network diff --git a/common/cpp/include/unittests/MockIO.h b/common/cpp/include/unittests/MockIO.h index c5701792106f447dff4c8658f4f2494864e0c7e8..90204e58c4e4909761e74751214171189711f6ad 100644 --- a/common/cpp/include/unittests/MockIO.h +++ b/common/cpp/include/unittests/MockIO.h @@ -25,11 +25,17 @@ class MockIO : public IO { MOCK_CONST_METHOD1(AddressFromSocket_t, std::string (SocketDescriptor socket)); - std::unique_ptr<std::thread> NewThread(std::function<void()> function) const override { + std::unique_ptr<std::thread> NewThread(const std::string& name, std::function<void()> function) const override { return std::unique_ptr<std::thread>(NewThread_t(function)); } MOCK_CONST_METHOD1(NewThread_t, std::thread * (std::function<void()> function)); + std::unique_ptr<std::thread> NewThread(const std::string& name, std::function<void(uint64_t index)> function, + uint64_t index) const override { + return std::unique_ptr<std::thread>(NewThread_t(function, index)); + } + MOCK_CONST_METHOD2(NewThread_t, std::thread * (std::function<void(uint64_t)> function, uint64_t index)); + ListSocketDescriptors WaitSocketsActivity(SocketDescriptor master_socket, ListSocketDescriptors* sockets_to_listen, std::vector<std::string>* new_connections, Error* err) const override { diff --git a/common/cpp/src/http_client/curl_http_client.cpp b/common/cpp/src/http_client/curl_http_client.cpp index 7698e7f81155926f3d7cdb51be28cfd79531e5f9..ca543c0a69cf7761040cb50699d8465f93f43ee5 100644 --- a/common/cpp/src/http_client/curl_http_client.cpp +++ b/common/cpp/src/http_client/curl_http_client.cpp @@ -111,7 +111,7 @@ Error CurlHttpClient::Command(bool post, CurlDataContainer* data_container, cons FileData AllocateMemory(uint64_t size, Error* err) { FileData data; try { - data = FileData{new uint8_t[(size_t)size +1 ]}; + data = FileData{new uint8_t[(size_t)size + 1 ]}; } catch (...) { *err = ErrorTemplates::kMemoryAllocationError.Generate(); return nullptr; @@ -129,7 +129,7 @@ Error CurlHttpClient::Post(const std::string& uri, Error err; CurlDataContainer data_container; data_container.mode = CurlDataMode::array; - uint64_t extended_size =output_data_size + 10000; // for error messages + uint64_t extended_size = output_data_size + 10000; // for error messages *output_data = AllocateMemory(extended_size, &err); if (err) { return err; @@ -141,7 +141,7 @@ Error CurlHttpClient::Post(const std::string& uri, if (*response_code == HttpCode::OK) { if (output_data_size != data_container.bytes_received) { return HttpErrorTemplates::kTransferError.Generate("received " + - std::to_string(data_container.bytes_received) + ", expected " + std::to_string(output_data_size) + "bytes"); + std::to_string(data_container.bytes_received) + ", expected " + std::to_string(output_data_size) + "bytes"); } (*output_data)[output_data_size] = 0; // for reinterpret cast to string worked } else { diff --git a/common/cpp/src/system_io/system_io.cpp b/common/cpp/src/system_io/system_io.cpp index e3c8b5dc94a667f1e5734d9c23d92176e72499be..3da71da9de4cc1a2ff66e09e8311955a27eadaf9 100644 --- a/common/cpp/src/system_io/system_io.cpp +++ b/common/cpp/src/system_io/system_io.cpp @@ -208,8 +208,17 @@ std::string SystemIO::ReadFileToString(const std::string& fname, Error* err) con return std::string(reinterpret_cast<const char*>(data.get()), (size_t)size); } -std::unique_ptr<std::thread> SystemIO::NewThread(std::function<void()> function) const { - return std::unique_ptr<std::thread>(new std::thread(function)); +std::unique_ptr<std::thread> SystemIO::NewThread(const std::string& name, std::function<void()> function) const { + auto thread = std::unique_ptr<std::thread>(new std::thread(function)); + SetThreadName(thread.get(), name); + return thread; +} + +std::unique_ptr<std::thread> SystemIO::NewThread(const std::string& name, std::function<void(uint64_t index)> function, + uint64_t index) const { + auto thread = std::unique_ptr<std::thread>(new std::thread(function, index)); + SetThreadName(thread.get(), name + ":" + std::to_string(index)); + return thread; } void SystemIO::Skip(SocketDescriptor socket_fd, size_t length, Error* err) const { diff --git a/common/cpp/src/system_io/system_io.h b/common/cpp/src/system_io/system_io.h index 78621e1fce011efa0741a7681dabe0e59793d42c..a97bb1152d465b1d40dcc02b1e4d73ee32d327c3 100644 --- a/common/cpp/src/system_io/system_io.h +++ b/common/cpp/src/system_io/system_io.h @@ -42,6 +42,8 @@ class SystemIO final : public IO { ListSocketDescriptors* sockets_to_listen) const; #endif + void SetThreadName(std::thread* threadHandle, const std::string& name) const; + void ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const; //void CollectFileInformationRecursively(const std::string& path, std::vector<FileInfo>* files, IOErrors* err) const; @@ -88,9 +90,15 @@ class SystemIO final : public IO { public: ~SystemIO(); /* - * Special + * Special thread functions, the name is limited to 15 chars. + * More then 16 will result in a truncation. + * Setting the name is a best effort feature and currently just works on UNIX systems. + * The indexed function will add :<index> as an postfix to the name. */ - std::unique_ptr<std::thread> NewThread(std::function<void()> function) const override; + std::unique_ptr<std::thread> NewThread(const std::string& name, + std::function<void()> function) const override; + std::unique_ptr<std::thread> NewThread(const std::string& name, + std::function<void(uint64_t index)> function, uint64_t index) const override; // this is not standard function - to be implemented differently in windows and linux diff --git a/common/cpp/src/system_io/system_io_linux_mac.cpp b/common/cpp/src/system_io/system_io_linux_mac.cpp index aa8d193ae60102b55081cd37c278e94bf12b0063..56fb401e7dc54427cb3c2d6ecbb45d80674f25e4 100644 --- a/common/cpp/src/system_io/system_io_linux_mac.cpp +++ b/common/cpp/src/system_io/system_io_linux_mac.cpp @@ -201,6 +201,11 @@ void SystemIO::CollectFileInformationRecursively(const std::string& path, closedir(dir); } +void SystemIO::SetThreadName(std::thread* threadHandle, const std::string& name) const { + // If the length of name is greater than 15 characters, the excess characters are ignored. + pthread_setname_np(threadHandle->native_handle(), name.c_str()); +} + void SystemIO::ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const { //TODO: Need to change network layer code, so everything can be NonBlocking int flag = 1; diff --git a/common/cpp/src/system_io/system_io_windows.cpp b/common/cpp/src/system_io/system_io_windows.cpp index 2ff541f628ba8ecf7e4e7d0991a71e104e6c7954..1fb5c03250d0a2e4a56e99bb29158e3d5a78f9d1 100644 --- a/common/cpp/src/system_io/system_io_windows.cpp +++ b/common/cpp/src/system_io/system_io_windows.cpp @@ -231,6 +231,10 @@ void SystemIO::CollectFileInformationRecursively(const std::string& path, } +void SystemIO::SetThreadName(std::thread* threadHandle, const std::string& name) const { + // Not supported +} + void asapo::SystemIO::ApplyNetworkOptions(SocketDescriptor socket_fd, Error* err) const { //TODO: Seeing issues when using these settings - need further investigation //Event if NonBlockingIO is set, it seems that _recv is a blocking call :/ diff --git a/consumer/api/cpp/include/consumer/consumer_error.h b/consumer/api/cpp/include/consumer/consumer_error.h index 4f50ae3b0fb835cb25e8263d19de0b8252b0d883..2cb3929c5054416000bd109d952e869bcfd6e83f 100644 --- a/consumer/api/cpp/include/consumer/consumer_error.h +++ b/consumer/api/cpp/include/consumer/consumer_error.h @@ -29,31 +29,31 @@ class ConsumerErrorData : public CustomErrorData { namespace ConsumerErrorTemplates { -auto const kLocalIOError = ConsumerErrorTemplate{ +auto const kLocalIOError = ConsumerErrorTemplate { "local i/o error", ConsumerErrorType::kLocalIOError }; -auto const kStreamFinished = ConsumerErrorTemplate{ +auto const kStreamFinished = ConsumerErrorTemplate { "stream finished", ConsumerErrorType::kStreamFinished }; -auto const kEndOfStream = ConsumerErrorTemplate{ +auto const kEndOfStream = ConsumerErrorTemplate { "no data - end of stream", ConsumerErrorType::kEndOfStream }; -auto const kNoData = ConsumerErrorTemplate{ +auto const kNoData = ConsumerErrorTemplate { "no data", ConsumerErrorType::kNoData }; -auto const kWrongInput = ConsumerErrorTemplate{ +auto const kWrongInput = ConsumerErrorTemplate { "wrong input", ConsumerErrorType::kWrongInput }; -auto const kInterruptedTransaction = ConsumerErrorTemplate{ +auto const kInterruptedTransaction = ConsumerErrorTemplate { "error from broker server", ConsumerErrorType::kInterruptedTransaction }; -auto const kUnavailableService = ConsumerErrorTemplate{ +auto const kUnavailableService = ConsumerErrorTemplate { "service unavailable", ConsumerErrorType::kUnavailableService }; diff --git a/consumer/api/cpp/src/server_data_broker.cpp b/consumer/api/cpp/src/server_data_broker.cpp index 0a54c141dc93f7847e224ac89956c75f3230087e..883f32b1e45d86414b6e8c114b30305178b85cce 100644 --- a/consumer/api/cpp/src/server_data_broker.cpp +++ b/consumer/api/cpp/src/server_data_broker.cpp @@ -69,7 +69,7 @@ Error ConsumerErrorFromHttpCode(const RequestOutput* response, const HttpCode& c Error ConsumerErrorFromServerError(const Error& server_err) { if (server_err == HttpErrorTemplates::kTransferError) { return ConsumerErrorTemplates::kInterruptedTransaction.Generate( - "error processing request: " + server_err->Explain()); + "error processing request: " + server_err->Explain()); } else { return ConsumerErrorTemplates::kUnavailableService.Generate("error processing request: " + server_err->Explain()); } @@ -105,27 +105,27 @@ std::string ServerDataBroker::RequestWithToken(std::string uri) { return std::move(uri) + "?token=" + source_credentials_.user_token; } -Error ServerDataBroker::ProcessPostRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code) { +Error ServerDataBroker::ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) { Error err; switch (request.output_mode) { - case OutputDataMode::string: - response->string_output = - httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, - request.cookie, - request.body, - code, - &err); - break; - case OutputDataMode::array: - err = httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, - request.body, &response->data_output, response->data_output_size, code); - break; + case OutputDataMode::string: + response->string_output = + httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, + request.cookie, + request.body, + code, + &err); + break; + case OutputDataMode::array: + err = httpclient__->Post(RequestWithToken(request.host + request.api) + request.extra_params, request.cookie, + request.body, &response->data_output, response->data_output_size, code); + break; } return err; } -Error ServerDataBroker::ProcessGetRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code) { +Error ServerDataBroker::ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code) { Error err; response->string_output = httpclient__->Get(RequestWithToken(request.host + request.api) + request.extra_params, code, &err); @@ -137,9 +137,9 @@ Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInf Error err; HttpCode code; if (request.post) { - err = ProcessPostRequest(request,response,&code); + err = ProcessPostRequest(request, response, &code); } else { - err = ProcessGetRequest(request,response,&code); + err = ProcessGetRequest(request, response, &code); } if (err && service_uri) { service_uri->clear(); @@ -147,7 +147,7 @@ Error ServerDataBroker::ProcessRequest(RequestOutput* response, const RequestInf return ProcessRequestResponce(err, response, code); } -Error ServerDataBroker::DiscoverService(const std::string& service_name , std::string* uri_to_set) { +Error ServerDataBroker::DiscoverService(const std::string& service_name, std::string* uri_to_set) { if (!uri_to_set->empty()) { return nullptr; } @@ -600,7 +600,8 @@ RequestInfo ServerDataBroker::CreateFolderTokenRequest() const { ri.host = endpoint_; ri.api = "/authorizer/folder"; ri.post = true; - ri.body = "{\"Folder\":\"" + source_path_ + "\",\"BeamtimeId\":\"" + source_credentials_.beamtime_id + "\",\"Token\":\"" + + ri.body = "{\"Folder\":\"" + source_path_ + "\",\"BeamtimeId\":\"" + source_credentials_.beamtime_id + "\",\"Token\":\"" + + source_credentials_.user_token + "\"}"; return ri; } diff --git a/consumer/api/cpp/src/server_data_broker.h b/consumer/api/cpp/src/server_data_broker.h index fb5111ebca6091f85f98711e1697761a250717cb..702632c65b2de5cc0c8c398aa42dd3ce9dd3ef05 100644 --- a/consumer/api/cpp/src/server_data_broker.h +++ b/consumer/api/cpp/src/server_data_broker.h @@ -120,8 +120,8 @@ class ServerDataBroker final : public asapo::DataBroker { std::string BrokerRequestWithTimeout(RequestInfo request, Error* err); Error FtsRequestWithTimeout(const FileInfo* info, FileData* data); Error RequestDataFromFts(const FileInfo* info, FileData* data); - Error ProcessPostRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code); - Error ProcessGetRequest(const RequestInfo& request,RequestOutput* response, HttpCode* code); + Error ProcessPostRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code); + Error ProcessGetRequest(const RequestInfo& request, RequestOutput* response, HttpCode* code); DataSet DecodeDatasetFromResponse(std::string response, Error* err); RequestInfo PrepareRequestInfo(std::string api_url, bool dataset); diff --git a/consumer/api/cpp/unittests/test_server_broker.cpp b/consumer/api/cpp/unittests/test_server_broker.cpp index 2821fcd1562b9f191e5edd3a6e74f970343e6f7d..aba9bfe5ec1833e97f44af4de54150b8eade8247 100644 --- a/consumer/api/cpp/unittests/test_server_broker.cpp +++ b/consumer/api/cpp/unittests/test_server_broker.cpp @@ -1050,7 +1050,8 @@ void ServerDataBrokerTests::MockBeforeFTS(FileData* data) { } void ServerDataBrokerTests::ExpectFolderToken() { - std::string expected_folder_query_string = "{\"Folder\":\"" + expected_path + "\",\"BeamtimeId\":\"" + expected_beamtime_id + std::string expected_folder_query_string = "{\"Folder\":\"" + expected_path + "\",\"BeamtimeId\":\"" + + expected_beamtime_id + "\",\"Token\":\"" + expected_token + "\"}"; EXPECT_CALL(mock_http_client, Post_t(HasSubstr(expected_server_uri + "/authorizer/folder"), diff --git a/examples/pipeline/in_to_out/in_to_out.cpp b/examples/pipeline/in_to_out/in_to_out.cpp index b5813df8032e5d14159db08ea0eea294d30c19c2..3721cfd5d790a1afb08559bb43b3db80fa1e2a31 100644 --- a/examples/pipeline/in_to_out/in_to_out.cpp +++ b/examples/pipeline/in_to_out/in_to_out.cpp @@ -165,7 +165,7 @@ std::vector<std::thread> StartConsumerThreads(const Args& args, const ProducerPt return threads; } -int ProcessAllData(const Args& args, const ProducerPtr& producer , uint64_t* duration_ms, int* nerrors) { +int ProcessAllData(const Args& args, const ProducerPtr& producer, uint64_t* duration_ms, int* nerrors) { asapo::FileInfo fi; system_clock::time_point t1 = system_clock::now(); diff --git a/producer/api/cpp/src/producer_impl.h b/producer/api/cpp/src/producer_impl.h index 371277f97d3f22d5ed6590effac664a0e9c74daf..001cca6a55d99d26cfc749eefc3588f7d627c0bb 100644 --- a/producer/api/cpp/src/producer_impl.h +++ b/producer/api/cpp/src/producer_impl.h @@ -31,11 +31,11 @@ class ProducerImpl : public Producer { void EnableLocalLog(bool enable) override; void EnableRemoteLog(bool enable) override; Error SendData(const EventHeader& event_header, FileData data, uint64_t ingest_mode, RequestCallback callback) override; - Error SendData__(const EventHeader& event_header, void* data , uint64_t ingest_mode, + Error SendData__(const EventHeader& event_header, void* data, uint64_t ingest_mode, RequestCallback callback) override; Error SendData(const EventHeader& event_header, std::string substream, FileData data, uint64_t ingest_mode, RequestCallback callback) override; - Error SendData__(const EventHeader& event_header, std::string substream, void* data , uint64_t ingest_mode, + Error SendData__(const EventHeader& event_header, std::string substream, void* data, uint64_t ingest_mode, RequestCallback callback) override; void StopThreads__() override; Error SendFile(const EventHeader& event_header, std::string full_path, uint64_t ingest_mode, diff --git a/producer/api/cpp/unittests/test_producer_impl.cpp b/producer/api/cpp/unittests/test_producer_impl.cpp index 1442d96d04f1d469a36c0d41a6ef24ad19aa0cd2..aff2e2ffc311beb155aa812e05d4f6c2ac0ed58f 100644 --- a/producer/api/cpp/unittests/test_producer_impl.cpp +++ b/producer/api/cpp/unittests/test_producer_impl.cpp @@ -73,7 +73,7 @@ class ProducerImplTests : public testing::Test { std::string expected_next_substream = "next_substream"; asapo::SourceCredentials expected_credentials{"beamtime_id", "beamline", "subname", "token" - }; + }; asapo::SourceCredentials expected_default_credentials{ "beamtime_id", "", "", "token" }; diff --git a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp index da7be9fba592edbd80c11cf97cf591a09d16b293..3b6eb7cf7f856277c53a75013acce9a52637063b 100644 --- a/producer/api/cpp/unittests/test_request_handler_filesystem.cpp +++ b/producer/api/cpp/unittests/test_request_handler_filesystem.cpp @@ -54,14 +54,14 @@ class RequestHandlerFilesystemTests : public testing::Test { asapo::Opcode expected_op_code = asapo::kOpcodeTransferData; asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, - expected_meta_size, expected_file_name}; + expected_meta_size, expected_file_name}; bool called = false; asapo::GenericRequestHeader callback_header; asapo::ProducerRequest request{"", header, nullptr, "", "", [this](asapo::GenericRequestHeader header, asapo::Error err) { - called = true; - callback_err = std::move(err); - callback_header = header; - }, true, 0}; + called = true; + callback_err = std::move(err); + callback_header = header; + }, true, 0}; asapo::ProducerRequest request_nocallback{"", header, nullptr, "", "", nullptr, true, 0}; asapo::ProducerRequest request_filesend{"", header, nullptr, "", expected_origin_fullpath, nullptr, true, 0}; diff --git a/producer/api/cpp/unittests/test_request_handler_tcp.cpp b/producer/api/cpp/unittests/test_request_handler_tcp.cpp index b8f480184955b5c1c49ab2dc161e7b63fd3e98d8..c3cd919951130a4a12673f67d7ca6a351013e06e 100644 --- a/producer/api/cpp/unittests/test_request_handler_tcp.cpp +++ b/producer/api/cpp/unittests/test_request_handler_tcp.cpp @@ -65,26 +65,26 @@ class RequestHandlerTcpTests : public testing::Test { asapo::Error callback_err; asapo::GenericRequestHeader header{expected_op_code, expected_file_id, expected_file_size, - expected_meta_size, expected_file_name, expected_substream}; + expected_meta_size, expected_file_name, expected_substream}; asapo::GenericRequestHeader header_fromfile{expected_op_code, expected_file_id, 0, expected_meta_size, - expected_file_name, expected_substream}; + expected_file_name, expected_substream}; bool callback_called = false; asapo::GenericRequestHeader callback_header; asapo::ProducerRequest request{expected_beamtime_id, header, nullptr, expected_metadata, "", [this](asapo::GenericRequestHeader header, asapo::Error err) { - callback_called = true; - callback_err = std::move(err); - callback_header = header; - }, true, 0}; + callback_called = true; + callback_err = std::move(err); + callback_header = header; + }, true, 0}; std::string expected_origin_fullpath = std::string("origin/") + expected_file_name; asapo::ProducerRequest request_filesend{expected_beamtime_id, header_fromfile, nullptr, expected_metadata, - expected_origin_fullpath, [this](asapo::GenericRequestHeader header, asapo::Error err) { - callback_called = true; - callback_err = std::move(err); - callback_header = header; - }, true, 0}; + expected_origin_fullpath, [this](asapo::GenericRequestHeader header, asapo::Error err) { + callback_called = true; + callback_err = std::move(err); + callback_header = header; + }, true, 0}; asapo::ProducerRequest request_nocallback{expected_beamtime_id, header, nullptr, expected_metadata, "", nullptr, true, 0}; diff --git a/producer/event_monitor_producer/src/system_folder_watch_windows.cpp b/producer/event_monitor_producer/src/system_folder_watch_windows.cpp index b3a17bcc62f26f0989368b12fc2af988aacffd88..37f7bd418ef24d01c16c305a0f41a34a0565ec6f 100644 --- a/producer/event_monitor_producer/src/system_folder_watch_windows.cpp +++ b/producer/event_monitor_producer/src/system_folder_watch_windows.cpp @@ -11,7 +11,7 @@ namespace asapo { Error SystemFolderWatch::StartFolderMonitor(const std::string& root_folder, const std::vector<std::string>& monitored_folders) { for (auto& folder : monitored_folders ) { - auto thread = io__->NewThread([root_folder, folder, this] { + auto thread = io__->NewThread("FolderMonitor", [root_folder, folder, this] { auto folder_watch = std::unique_ptr<SingleFolderWatch>(new SingleFolderWatch(root_folder, folder, &event_list_)); while (true) { auto err = folder_watch->Watch(); diff --git a/receiver/src/receiver.cpp b/receiver/src/receiver.cpp index 6ea50c2507c1acb0a2c328a7560f55c0680add52..a5c98ce9e3c4bc8e23f38ee3f66a9ffdc4fee85e 100644 --- a/receiver/src/receiver.cpp +++ b/receiver/src/receiver.cpp @@ -55,7 +55,8 @@ void Receiver::ProcessConnections(Error* err) { void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, const std::string& address) { log__->Info("new connection from " + address); - auto thread = io__->NewThread([connection_socket_fd, address, this] { + auto thread = io__->NewThread("ConFd:" + std::to_string(connection_socket_fd), + [connection_socket_fd, address, this] { auto connection = std::unique_ptr<Connection>(new Connection(connection_socket_fd, address, cache_, GetReceiverConfig()->tag)); connection->Listen(); }); @@ -63,7 +64,6 @@ void Receiver::StartNewConnectionInSeparateThread(int connection_socket_fd, cons if (thread) { threads_.emplace_back(std::move(thread)); } - return; } -} \ No newline at end of file +} diff --git a/receiver/src/receiver_error.h b/receiver/src/receiver_error.h index 9a2fc4874ba8bd0e8696a0acce7462d602a49262..40c7537a0823d6a9ee0768bf14b91f14aa86cbe8 100644 --- a/receiver/src/receiver_error.h +++ b/receiver/src/receiver_error.h @@ -19,29 +19,29 @@ using ReceiverErrorTemplate = ServiceErrorTemplate<ReceiverErrorType, ErrorType: namespace ReceiverErrorTemplates { -auto const kWarningDuplicatedRequest = ReceiverErrorTemplate{ +auto const kWarningDuplicatedRequest = ReceiverErrorTemplate { "Duplicated request, possible due to retry", ReceiverErrorType::kWarningDuplicatedRequest }; -auto const kInvalidOpCode = ReceiverErrorTemplate{ +auto const kInvalidOpCode = ReceiverErrorTemplate { "Invalid Opcode", ReceiverErrorType::kInvalidOpCode }; -auto const kInternalServerError = ReceiverErrorTemplate{ +auto const kInternalServerError = ReceiverErrorTemplate { "server error", ReceiverErrorType::kInternalServerError }; -auto const kBadRequest = ReceiverErrorTemplate{ +auto const kBadRequest = ReceiverErrorTemplate { "Bad request", ReceiverErrorType::kBadRequest }; -auto const kAuthorizationFailure = ReceiverErrorTemplate{ +auto const kAuthorizationFailure = ReceiverErrorTemplate { "authorization failure", ReceiverErrorType::kAuthorizationFailure }; -auto const kReAuthorizationFailure = ReceiverErrorTemplate{ +auto const kReAuthorizationFailure = ReceiverErrorTemplate { "reauthorization for auto beamtime failed", ReceiverErrorType::kReAuthorizationFailure }; diff --git a/receiver/src/request_handler_receive_data.cpp b/receiver/src/request_handler_receive_data.cpp index 28b59048f952dfee03189f3dd8db6c0cff75ab7f..26e78d3caba7a59a5333f28f71be07428a4082a3 100644 --- a/receiver/src/request_handler_receive_data.cpp +++ b/receiver/src/request_handler_receive_data.cpp @@ -25,7 +25,7 @@ Error RequestHandlerReceiveData::ProcessRequest(Request* request) const { return err; } -RequestHandlerReceiveData::RequestHandlerReceiveData() : io__{GenerateDefaultIO()} , log__{GetDefaultReceiverLogger()} { +RequestHandlerReceiveData::RequestHandlerReceiveData() : io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { } diff --git a/receiver/src/request_handler_receive_metadata.cpp b/receiver/src/request_handler_receive_metadata.cpp index 93c8d283cb648389c42016a5da4074e7e55c5154..974a906e45ca243075beeb4983e9fde0ceece25f 100644 --- a/receiver/src/request_handler_receive_metadata.cpp +++ b/receiver/src/request_handler_receive_metadata.cpp @@ -24,7 +24,7 @@ Error RequestHandlerReceiveMetaData::ProcessRequest(Request* request) const { return nullptr; } -RequestHandlerReceiveMetaData::RequestHandlerReceiveMetaData() : io__{GenerateDefaultIO()} , log__{GetDefaultReceiverLogger()} { +RequestHandlerReceiveMetaData::RequestHandlerReceiveMetaData() : io__{GenerateDefaultIO()}, log__{GetDefaultReceiverLogger()} { } diff --git a/receiver/unittests/receiver_data_server/test_request_handler.cpp b/receiver/unittests/receiver_data_server/test_request_handler.cpp index d67c63cfa8bb2f01f39ba3621b13cbf16b076f32..fa5b8d8dd1dd9e38d42eb20eb39d982e68b75564 100644 --- a/receiver/unittests/receiver_data_server/test_request_handler.cpp +++ b/receiver/unittests/receiver_data_server/test_request_handler.cpp @@ -59,7 +59,7 @@ class RequestHandlerTests : public Test { uint64_t expected_source_id = 11; bool retry; asapo::GenericRequestHeader header{asapo::kOpcodeGetBufferData, expected_buf_id, expected_data_size, - expected_meta_size, ""}; + expected_meta_size, ""}; asapo::ReceiverDataServerRequest request{std::move(header), expected_source_id}; uint8_t tmp; void SetUp() override { diff --git a/receiver/unittests/test_requests_dispatcher.cpp b/receiver/unittests/test_requests_dispatcher.cpp index 5b887b046e264f1a549221d18cae26a04414bb8e..8818e5dce99ddae3568ebeca10881cb4af5302f6 100644 --- a/receiver/unittests/test_requests_dispatcher.cpp +++ b/receiver/unittests/test_requests_dispatcher.cpp @@ -86,7 +86,7 @@ class MockRequestFactory: public asapo::RequestFactory { } MOCK_CONST_METHOD4(GenerateRequest_t, Request * (const GenericRequestHeader&, - SocketDescriptor , std::string , + SocketDescriptor, std::string, ErrorInterface**)); }; diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp index 31517dec86243b196fee64c4c7cdac2cdd38d99d..2c3dea1c339af3b61eec490f24af4d8ed327d552 100644 --- a/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp +++ b/tests/automatic/system_io/ip_tcp_network/client_serv/ip_tcp_network.cpp @@ -36,7 +36,7 @@ void ExitIfErrIsNotOk(Error* err, int exit_number) { } std::unique_ptr<std::thread> CreateEchoServerThread() { - return io->NewThread([&] { + return io->NewThread("EchoServer", [&] { Error err; FileDescriptor socket = io->CreateSocket(AddressFamilies::INET, SocketTypes::STREAM, SocketProtocols::IP, &err); ExitIfErrIsNotOk(&err, 100); diff --git a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp index 37e5af9be97177eaae7c3c30f061a2cf3206736a..12a9fbc2ec52495ced87c95a065b01650d7035ee 100644 --- a/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp +++ b/tests/automatic/system_io/ip_tcp_network/client_serv_multicon/multicon.cpp @@ -36,7 +36,7 @@ void ExitIfErrIsNotOk(Error* err, int exit_number) { } std::unique_ptr<std::thread> CreateEchoServerThread() { - return io->NewThread([&] { + return io->NewThread("EchoServer", [&] { Error err; SocketDescriptor master_socket = io->CreateAndBindIPTCPSocketListener(kListenAddress, 3, &err); std::cout << "[SERVER] master socket " << master_socket << std::endl; @@ -113,13 +113,13 @@ int main(int argc, char* argv[]) { kThreadStarted.get_future().get();//Make sure that the server is started std::cout << "Check" << std::endl; - auto thread1 = io->NewThread([&] { + auto thread1 = io->NewThread("CheckNormal 1", [&] { CheckNormal(30); }); - auto thread2 = io->NewThread([&] { + auto thread2 = io->NewThread("CheckNormal 2", [&] { CheckNormal(30); }); - auto thread3 = io->NewThread([&] { + auto thread3 = io->NewThread("CheckNormal 3", [&] { CheckNormal(30); });