From 32a5d881088167d4a36cea1b77c0a73a9adcd75e Mon Sep 17 00:00:00 2001 From: Cedric CAFFY <cedric.caffy@cern.ch> Date: Wed, 12 Feb 2020 17:02:34 +0100 Subject: [PATCH] Corrected a race between the RecallTaskInjector and the TapeReadSingleThread If the RecallTaskInjector had to do RAO query that takes too much time, the TapeReadSingleThread could start its infinite loop with no Recall task in it, this would cause the stopping of the RecallTaskInjector. Thus, even if there are Retrieve jobs in the queue, as the RecallTaskInjector is stopped, the TapeReadSingleThread will not be feed with Read tasks and will terminate. The remaining jobs in the queue will trigger a new mount --- .../tapeserver/daemon/RecallTaskInjector.cpp | 34 +++++++++++++++++-- .../tapeserver/daemon/RecallTaskInjector.hpp | 19 +++++++++++ .../daemon/TapeReadSingleThread.cpp | 5 +++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp index 1bb4914432..6652dee5f5 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp @@ -49,7 +49,8 @@ RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm, m_thread(*this),m_memManager(mm), m_tapeReader(tapeReader),m_diskWriter(diskWriter), m_retrieveMount(retrieveMount),m_lc(lc),m_maxFiles(maxFiles),m_maxBytes(byteSizeThreshold), - m_useRAO(false) {} + m_useRAO(false), + m_firstTasksInjectedFuture(m_firstTasksInjectedPromise.get_future()){} //------------------------------------------------------------------------------ //destructor //------------------------------------------------------------------------------ @@ -112,6 +113,29 @@ void RecallTaskInjector::setPromise() { throw cta::exception::Exception(std::string("In RecallTaskInjector::setPromise() got std::exception: ") + exc.what()); } } + +/** + * Idempotently tell the TapeReadSingleThread + * that the first TapeRead tasks have been injected. + * If we do not do that, as the RecallTaskInjector has to do some + * RAO query, the TapeReadSingleThread would start without any tasks + * and will tell the RecallTaskInjector to stop + */ +void RecallTaskInjector::setFirstTasksInjectedPromise() { + if(!m_promiseFirstTaskInjectedSet){ + m_firstTasksInjectedPromise.set_value(); + m_promiseFirstTaskInjectedSet = true; + } +} + +/** + * This method is used by the TapeReadSingleThread to wait + * the first injection of TapeRead tasks + */ +void RecallTaskInjector::waitForFirstTasksInjectedPromise(){ + m_firstTasksInjectedFuture.wait(); +} + //------------------------------------------------------------------------------ //injectBulkRecalls //------------------------------------------------------------------------------ @@ -196,9 +220,14 @@ void RecallTaskInjector::injectBulkRecalls() { m_tapeReader.push(trt); m_lc.log(cta::log::INFO, "Created tasks for recalling a file"); } + if(njobs > 0){ + //At least one task has been created, we tell the TapeReadSingleThread that + //it can start its infinite loop + setFirstTasksInjectedPromise(); + } m_lc.log(cta::log::INFO, recallOrderLog.str()); m_jobs.clear(); - LogContext::ScopedParam sp03(m_lc, Param("nbFile", m_jobs.size())); + LogContext::ScopedParam sp03(m_lc, Param("nbFile", njobs)); m_lc.log(cta::log::INFO, "Finished processing batch of recall tasks from client"); } //------------------------------------------------------------------------------ @@ -273,6 +302,7 @@ void RecallTaskInjector::WorkerThread::run() if (m_parent.m_useRAO) { /* RecallTaskInjector is waiting to have access to the drive in order * to perform the RAO query; + * This waitForPromise() call means that the drive is mounted */ m_parent.waitForPromise(); try { diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.hpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.hpp index be04742a1d..a71901f021 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.hpp @@ -128,6 +128,20 @@ public: void waitForPromise(); void setPromise(); + + /** + * This method will tell the TapeReadSingleThread that the + * first batch of tasks has been injected + * by the RecallTaskInjector + */ + void setFirstTasksInjectedPromise(); + + /** + * This method will be called by the TapeReadSingleThread + * so that TapeReadSingleThread will wait the first batch + * of tasks to be injected by the RecallTaskInjector + */ + void waitForFirstTasksInjectedPromise(); private: /** @@ -239,6 +253,11 @@ private: */ std::promise<void> m_raoPromise; std::future<void> m_raoFuture; + + std::promise<void> m_firstTasksInjectedPromise; + std::future<void> m_firstTasksInjectedFuture; + + bool m_promiseFirstTaskInjectedSet = false; }; } //end namespace daemon diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp index 439a856f49..7ca950f561 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp @@ -323,6 +323,11 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter); // Then we will loop on the tasks as they get from // the task injector + + // We wait the task injector to finish inserting its first batch + // before launching the loop. + // We do it with a promise + m_taskInjector->waitForFirstTasksInjectedPromise(); std::unique_ptr<TapeReadTask> task; m_rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Transferring); while(true) { -- GitLab