Skip to content
Snippets Groups Projects
Commit fbdf4e1a authored by Volodymyr Yurchenko's avatar Volodymyr Yurchenko
Browse files

Fix job fetching logic in RecallTaskInjector (#150)

parent 4e824f4f
No related branches found
No related tags found
No related merge requests found
...@@ -314,7 +314,7 @@ bool RecallTaskInjector::synchronousFetch(bool & noFilesToRecall) ...@@ -314,7 +314,7 @@ bool RecallTaskInjector::synchronousFetch(bool & noFilesToRecall)
} }
reqFiles -= m_files; reqFiles -= m_files;
uint64_t reqSize = (m_raoManager.useRAO() && m_raoManager.hasUDS()) ? 1024L * 1024 * 1024 * 1024 * 1024 : m_maxBatchBytes; uint64_t reqSize = 1024L * 1024 * 1024 * 1024 * 1024;
if (reqSize <= m_bytes) { if (reqSize <= m_bytes) {
return true; //No need to pop from the queue, injector already holds enough bytes, but we return there is still work to be done return true; //No need to pop from the queue, injector already holds enough bytes, but we return there is still work to be done
} }
......
...@@ -196,17 +196,32 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean ...@@ -196,17 +196,32 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::TapeReadTask * castor::tape::tapeserver::daemon::TapeReadTask *
castor::tape::tapeserver::daemon::TapeReadSingleThread::popAndRequestMoreJobs() { castor::tape::tapeserver::daemon::TapeReadSingleThread::popAndRequestMoreJobs() {
cta::threading::BlockingQueue<TapeReadTask *>::valueRemainingPair // Take the next task for the tape thread to execute and check how many left.
vrp = m_tasks.popGetSize(); // m_tasks queue gets more tasks when requestInjection() is called.
// If we just passed (down) the half full limit, ask for more // The queue may contain many small files that will be processed quickly
// (the remaining value is after pop) // or a few big files that take time. We define several thresholds to make injection in time
if (0 == vrp.remaining) {
// This is a last call: if the task injector comes up empty on this cta::threading::BlockingQueue<TapeReadTask *>::valueRemainingPair vrp = m_tasks.popGetSize();
// one, he'll call it the end. if (vrp.remaining == 0) {
// This is a last call: the task injector will make the last attempt to fetch more jobs.
// In any case, the injector thread will terminate
m_taskInjector->requestInjection(true); m_taskInjector->requestInjection(true);
} }
else if (vrp.remaining + 1 == m_maxFilesRequest / 2) { else if (vrp.remaining == m_maxFilesRequest / 2 - 1) {
// This is not a last call // This is not a last call: we just passed half of the maximum file limit.
// Probably there are many small files queued, that will be processed quickly,
// so we need to request injection of new batch of tasks early
m_taskInjector->requestInjection(false);
}
else if (vrp.remaining == 10) {
// This is not a last call: we are close to the end of current tasks queue.
// 10 is a magic number that will allow us to get new tasks
// before we are done with current batch
m_taskInjector->requestInjection(false);
}
else if (vrp.remaining == 1) {
// This is not a last call: given there is only one big file in the queue,
// it's time to request the next batch
m_taskInjector->requestInjection(false); m_taskInjector->requestInjection(false);
} }
return vrp.value; return vrp.value;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment