From e6c60d4381c193c3246d5f0ec22e46bd7f2ee0c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Hannappel?= Date: Fri, 29 Apr 2022 13:11:03 +0200 Subject: [PATCH 1/3] simplify thread handling, no more specal case for 1 thread --- src/ewmscp.cpp | 48 +++++++++++++++--------------------------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/src/ewmscp.cpp b/src/ewmscp.cpp index b1dded2..b1e501b 100644 --- a/src/ewmscp.cpp +++ b/src/ewmscp.cpp @@ -544,22 +544,21 @@ int main(int argc, const char* argv[]) { outputHandler::base::newHandler(outputHandlerName) ); - if ((nThreads > 1) || reqProv->isFollowMode()) { - for (unsigned int i = 0; i < nThreads; i++) { - qt.workers.emplace_front(std::thread(copyRequest::base::processQueue, - std::ref(qt.requests), - std::ref(qt.results), - std::ref(qt.delayedRequests))); - } - - qt.printer = std::thread(printResults, std::ref(qt.results), std::ref(qt.requests), - std::ref(hashStream), std::ref(logStream), std::ref(statStream)); - std::signal(SIGUSR1, sigUsrHandler); - std::signal(SIGUSR2, sigUsrHandler); - stopRequest::instantiate(reqProv->isFollowMode() ? - stopRequest::handlerIdType::requestProvider : - stopRequest::handlerIdType::processLoop); + for (unsigned int i = 0; i < nThreads; i++) { + qt.workers.emplace_front(std::thread(copyRequest::base::processQueue, + std::ref(qt.requests), + std::ref(qt.results), + std::ref(qt.delayedRequests))); } + + qt.printer = std::thread(printResults, std::ref(qt.results), std::ref(qt.requests), + std::ref(hashStream), std::ref(logStream), std::ref(statStream)); + std::signal(SIGUSR1, sigUsrHandler); + std::signal(SIGUSR2, sigUsrHandler); + stopRequest::instantiate(reqProv->isFollowMode() ? + stopRequest::handlerIdType::requestProvider : + stopRequest::handlerIdType::processLoop); + if (reqProv->isFollowMode()) { continueOnError = true; } @@ -570,24 +569,7 @@ int main(int argc, const char* argv[]) { reqProv->printMappings(logStream); } reqProv->processSources(sources); - if (reqProv->isFollowMode()) { - copyRequest::fileInWork::waitForAllInstancesGone(); - } else { - if (qt.workers.empty()) { - copyRequest::perThreadData threadData; - while (copyRequest::base::checkForInstances()) { - auto request = qt.requests.dequeue(); - request->process(threadData); - request->printResults(hashStream, logStream); - copyRequest::base::retry(request, qt.delayedRequests); - } - if (printStatAnyway) { - copyRequest::base::getStatPrinter()(statStream); - } - } else { - copyRequest::base::waitForAllInstancesGone(); - } - } + copyRequest::fileInWork::waitForAllInstancesGone(); stopRequest::ThrowUpReasonably(); } catch (const std::exception& e) { errMsg::emit(errMsg::level::crit, errMsg::location(), -- GitLab From 296f415ed60fe13f26450c7117764034e65e83a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Hannappel?= Date: Fri, 29 Apr 2022 17:23:27 +0200 Subject: [PATCH 2/3] fix wait, depends on request provider type --- src/copyRequest.cpp | 4 ---- src/copyRequest.h | 1 - src/ewmscp.cpp | 3 +++ 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/copyRequest.cpp b/src/copyRequest.cpp index a7a4551..fb2bc14 100644 --- a/src/copyRequest.cpp +++ b/src/copyRequest.cpp @@ -1424,10 +1424,6 @@ void copyRequest::base::waitForAllInstancesGone() { objectCountCondVar.wait(lock); } } -bool copyRequest::base::checkForInstances() { - std::unique_lock lock(objectCountMutex); - return (objectCount > 0); -} void copyRequest::base::getSuffix(const std::string& path, std::string& suffix) { diff --git a/src/copyRequest.h b/src/copyRequest.h index b591d6c..ede596e 100644 --- a/src/copyRequest.h +++ b/src/copyRequest.h @@ -365,7 +365,6 @@ namespace copyRequest { static bool retry(std::unique_ptr& request, timedQueue& delayedRequests); static void waitForAllInstancesGone(); - static bool checkForInstances(); private: static std::mutex objectCountMutex; static unsigned objectCount; diff --git a/src/ewmscp.cpp b/src/ewmscp.cpp index b1e501b..2f9cb5a 100644 --- a/src/ewmscp.cpp +++ b/src/ewmscp.cpp @@ -569,6 +569,9 @@ int main(int argc, const char* argv[]) { reqProv->printMappings(logStream); } reqProv->processSources(sources); + if (! reqProv->isFollowMode()) { + copyRequest::base::waitForAllInstancesGone(); + } copyRequest::fileInWork::waitForAllInstancesGone(); stopRequest::ThrowUpReasonably(); } catch (const std::exception& e) { -- GitLab From 56421619fabd9d744cc37bee2adf3decfd075c9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrgen=20Hannappel?= Date: Fri, 29 Apr 2022 17:49:17 +0200 Subject: [PATCH 3/3] fix error handling: throw stop requests up --- src/ewmscp.cpp | 72 ++++++++++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/src/ewmscp.cpp b/src/ewmscp.cpp index 2f9cb5a..e7a4609 100644 --- a/src/ewmscp.cpp +++ b/src/ewmscp.cpp @@ -535,44 +535,46 @@ int main(int argc, const char* argv[]) { dstRef = target; } - queuesAndThreads qt; + { + queuesAndThreads qt; - auto reqProv = requestProvider::newProvider(requestProviderName, - qt.requests, qt.delayedRequests, parents, - inputHandler::base::newHandler(inputHandlerName), - outputHandler::base::newHandler(outputHandlerName) - ); - - for (unsigned int i = 0; i < nThreads; i++) { - qt.workers.emplace_front(std::thread(copyRequest::base::processQueue, - std::ref(qt.requests), - std::ref(qt.results), - std::ref(qt.delayedRequests))); - } - - qt.printer = std::thread(printResults, std::ref(qt.results), std::ref(qt.requests), - std::ref(hashStream), std::ref(logStream), std::ref(statStream)); - std::signal(SIGUSR1, sigUsrHandler); - std::signal(SIGUSR2, sigUsrHandler); - stopRequest::instantiate(reqProv->isFollowMode() ? - stopRequest::handlerIdType::requestProvider : - stopRequest::handlerIdType::processLoop); - - if (reqProv->isFollowMode()) { - continueOnError = true; - } - qt.delayer = std::thread(delayRequest, - std::ref(qt.delayedRequests), std::ref(qt.requests)); - reqProv->prepareMappings(sources, destination); - if (verbose) { - reqProv->printMappings(logStream); - } - reqProv->processSources(sources); - if (! reqProv->isFollowMode()) { - copyRequest::base::waitForAllInstancesGone(); + auto reqProv = requestProvider::newProvider(requestProviderName, + qt.requests, qt.delayedRequests, parents, + inputHandler::base::newHandler(inputHandlerName), + outputHandler::base::newHandler(outputHandlerName) + ); + + for (unsigned int i = 0; i < nThreads; i++) { + qt.workers.emplace_front(std::thread(copyRequest::base::processQueue, + std::ref(qt.requests), + std::ref(qt.results), + std::ref(qt.delayedRequests))); + } + + qt.printer = std::thread(printResults, std::ref(qt.results), std::ref(qt.requests), + std::ref(hashStream), std::ref(logStream), std::ref(statStream)); + std::signal(SIGUSR1, sigUsrHandler); + std::signal(SIGUSR2, sigUsrHandler); + stopRequest::instantiate(reqProv->isFollowMode() ? + stopRequest::handlerIdType::requestProvider : + stopRequest::handlerIdType::processLoop); + + if (reqProv->isFollowMode()) { + continueOnError = true; + } + qt.delayer = std::thread(delayRequest, + std::ref(qt.delayedRequests), std::ref(qt.requests)); + reqProv->prepareMappings(sources, destination); + if (verbose) { + reqProv->printMappings(logStream); + } + reqProv->processSources(sources); + if (! reqProv->isFollowMode()) { + copyRequest::base::waitForAllInstancesGone(); + } + copyRequest::fileInWork::waitForAllInstancesGone(); } - copyRequest::fileInWork::waitForAllInstancesGone(); stopRequest::ThrowUpReasonably(); } catch (const std::exception& e) { errMsg::emit(errMsg::level::crit, errMsg::location(), -- GitLab