From fbdf4e1a837205d1402555f0aad769d66951d904 Mon Sep 17 00:00:00 2001 From: Volodymyr Yurchenko <volodymyr.yurchenko@cern.ch> Date: Tue, 11 Oct 2022 13:32:19 +0200 Subject: [PATCH] Fix job fetching logic in RecallTaskInjector (#150) --- .../tapeserver/daemon/RecallTaskInjector.cpp | 2 +- .../daemon/TapeReadSingleThread.cpp | 33 ++++++++++++++----- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp index 9ce3f4fe30..111718807a 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp @@ -314,7 +314,7 @@ bool RecallTaskInjector::synchronousFetch(bool & noFilesToRecall) } reqFiles -= m_files; - uint64_t reqSize = (m_raoManager.useRAO() && m_raoManager.hasUDS()) ? 1024L * 1024 * 1024 * 1024 * 1024 : m_maxBatchBytes; + uint64_t reqSize = 1024L * 1024 * 1024 * 1024 * 1024; if (reqSize <= m_bytes) { return true; //No need to pop from the queue, injector already holds enough bytes, but we return there is still work to be done } diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp index 9e0762aab0..e0bb7d9d28 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp @@ -196,17 +196,32 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean //------------------------------------------------------------------------------ castor::tape::tapeserver::daemon::TapeReadTask * castor::tape::tapeserver::daemon::TapeReadSingleThread::popAndRequestMoreJobs() { - cta::threading::BlockingQueue<TapeReadTask *>::valueRemainingPair - vrp = m_tasks.popGetSize(); - // If we just passed (down) the half full limit, ask for more - // (the remaining value is after pop) - if (0 == vrp.remaining) { - // This is a last call: if the task injector comes up empty on this - // one, he'll call it the end. + // Take the next task for the tape thread to execute and check how many left. + // m_tasks queue gets more tasks when requestInjection() is called. + // The queue may contain many small files that will be processed quickly + // or a few big files that take time. We define several thresholds to make injection in time + + cta::threading::BlockingQueue<TapeReadTask *>::valueRemainingPair vrp = m_tasks.popGetSize(); + if (vrp.remaining == 0) { + // This is a last call: the task injector will make the last attempt to fetch more jobs. + // In any case, the injector thread will terminate m_taskInjector->requestInjection(true); } - else if (vrp.remaining + 1 == m_maxFilesRequest / 2) { - // This is not a last call + else if (vrp.remaining == m_maxFilesRequest / 2 - 1) { + // This is not a last call: we just passed half of the maximum file limit. + // Probably there are many small files queued, that will be processed quickly, + // so we need to request injection of new batch of tasks early + m_taskInjector->requestInjection(false); + } + else if (vrp.remaining == 10) { + // This is not a last call: we are close to the end of current tasks queue. + // 10 is a magic number that will allow us to get new tasks + // before we are done with current batch + m_taskInjector->requestInjection(false); + } + else if (vrp.remaining == 1) { + // This is not a last call: given there is only one big file in the queue, + // it's time to request the next batch m_taskInjector->requestInjection(false); } return vrp.value; -- GitLab