diff --git a/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp b/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp index 11d0f8e5d8c8a63683f5c3f813172b79c9f0c463..b4cac75c1e61600bd5013fd0c6c2c1bf19ff132f 100644 --- a/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp +++ b/common/cpp/src/asapo_fabric/common/fabric_context_impl.cpp @@ -227,24 +227,24 @@ void FabricContextImpl::CompletionThread() { ret = fi_cq_sreadfrom(completion_queue_, &entry, 1, &tmpAddress, nullptr, 10 /*ms*/); switch (ret) { - case -FI_EAGAIN: // No data - std::this_thread::yield(); - break; - case -FI_EAVAIL: // An error is in the queue - CompletionThreadHandleErrorAvailable(&error); - break; - case 1: { // We got 1 data entry back - auto task = (FabricWaitableTask*)(entry.op_context); - if (task) { - task->HandleCompletion(&entry, tmpAddress); - } else { - error = FabricErrorTemplates::kInternalError.Generate("nullptr context from fi_cq_sreadfrom"); - } - break; + case -FI_EAGAIN: // No data + std::this_thread::yield(); + break; + case -FI_EAVAIL: // An error is in the queue + CompletionThreadHandleErrorAvailable(&error); + break; + case 1: { // We got 1 data entry back + auto task = (FabricWaitableTask*)(entry.op_context); + if (task) { + task->HandleCompletion(&entry, tmpAddress); + } else { + error = FabricErrorTemplates::kInternalError.Generate("nullptr context from fi_cq_sreadfrom"); } - default: - error = ErrorFromFabricInternal("Unknown error while fi_cq_readfrom", ret); - break; + break; + } + default: + error = ErrorFromFabricInternal("Unknown error while fi_cq_readfrom", ret); + break; } } @@ -299,7 +299,7 @@ void FabricContextImpl::InternalWait(FabricAddress targetAddress, FabricWaitable } void FabricContextImpl::InternalWaitWithAliveCheck(FabricAddress targetAddress, FabricWaitableTask* task, - Error* error) {// Handle advanced alive check + Error* error) {// Handle advanced alive check bool aliveCheckFailed = false; for (uint32_t i = 0; i < maxTimeoutRetires_ && *error == FabricErrorTemplates::kTimeout; i++) { *error = nullptr; diff --git a/common/cpp/src/asapo_fabric/common/task/fabric_alive_check_response_task.cpp b/common/cpp/src/asapo_fabric/common/task/fabric_alive_check_response_task.cpp index b841b0abe0ab6685e0f496988ceee0af991ef9f6..2875a1a289c230842be85a2b12ef9905fa78c152 100644 --- a/common/cpp/src/asapo_fabric/common/task/fabric_alive_check_response_task.cpp +++ b/common/cpp/src/asapo_fabric/common/task/fabric_alive_check_response_task.cpp @@ -9,7 +9,7 @@ void FabricAliveCheckResponseTask::RequeueSelf() { Error tmpError = nullptr; ParentContext()->HandleRawFiCommand(this, &tmpError, - fi_trecv, nullptr, 0, nullptr, FI_ADDR_UNSPEC, FI_ASAPO_TAG_ALIVE_CHECK, kRecvTaggedExactMatch); + fi_trecv, nullptr, 0, nullptr, FI_ADDR_UNSPEC, FI_ASAPO_TAG_ALIVE_CHECK, kRecvTaggedExactMatch); // Error is ignored } @@ -23,6 +23,6 @@ void FabricAliveCheckResponseTask::OnErrorCompletion(const fi_cq_err_entry*) { } FabricAliveCheckResponseTask::FabricAliveCheckResponseTask(FabricContextImpl* parentContext) -: FabricSelfRequeuingTask(parentContext) { + : FabricSelfRequeuingTask(parentContext) { } diff --git a/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp index d1485206c54c9cca9544ee026beda80d6747f363..44ed14d06149e743b8c8723fdbb29362539ac364 100644 --- a/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp +++ b/common/cpp/src/asapo_fabric/server/task/fabric_handshake_accepting_task.cpp @@ -7,7 +7,7 @@ using namespace asapo; using namespace fabric; FabricHandshakeAcceptingTask::FabricHandshakeAcceptingTask(FabricServerImpl* parentServerContext) -: FabricSelfRequeuingTask(parentServerContext) { + : FabricSelfRequeuingTask(parentServerContext) { } FabricServerImpl* FabricHandshakeAcceptingTask::ServerContext() { @@ -17,7 +17,7 @@ FabricServerImpl* FabricHandshakeAcceptingTask::ServerContext() { void FabricHandshakeAcceptingTask::RequeueSelf() { Error ignored; ServerContext()->HandleRawFiCommand(this, &ignored, - fi_recv, &handshake_payload_, sizeof(handshake_payload_), nullptr, FI_ADDR_UNSPEC); + fi_recv, &handshake_payload_, sizeof(handshake_payload_), nullptr, FI_ADDR_UNSPEC); } void FabricHandshakeAcceptingTask::OnCompletion(const fi_cq_tagged_entry*, FabricAddress) { @@ -40,15 +40,15 @@ void FabricHandshakeAcceptingTask::HandleAccept(Error* error) { std::string hostname; uint16_t port; std::tie(hostname, port) = - *(server->io__->SplitAddressToHostnameAndPort(handshake_payload_.hostnameAndPort)); + *(server->io__->SplitAddressToHostnameAndPort(handshake_payload_.hostnameAndPort)); FabricAddress tmpAddr; int ret = fi_av_insertsvc( - server->address_vector_, - hostname.c_str(), - std::to_string(port).c_str(), - &tmpAddr, - 0, - nullptr); + server->address_vector_, + hostname.c_str(), + std::to_string(port).c_str(), + &tmpAddr, + 0, + nullptr); if (ret != 1) { *error = ErrorFromFabricInternal("fi_av_insertsvc", ret); return; @@ -58,7 +58,7 @@ void FabricHandshakeAcceptingTask::HandleAccept(Error* error) { // TODO: This could slow down the whole complete queue process, maybe use another thread? // Send and forget server->HandleRawFiCommand(new FabricSelfDeletingTask(), error, - fi_send, nullptr, 0, nullptr, tmpAddr); + fi_send, nullptr, 0, nullptr, tmpAddr); } void FabricHandshakeAcceptingTask::OnError(const Error* error) {