diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp index 1bb4914432bc45b6ae8750a4e997767a88ab7585..6652dee5f5945097a41c35126a0827d16c87d031 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp @@ -49,7 +49,8 @@ RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm, m_thread(*this),m_memManager(mm), m_tapeReader(tapeReader),m_diskWriter(diskWriter), m_retrieveMount(retrieveMount),m_lc(lc),m_maxFiles(maxFiles),m_maxBytes(byteSizeThreshold), - m_useRAO(false) {} + m_useRAO(false), + m_firstTasksInjectedFuture(m_firstTasksInjectedPromise.get_future()){} //------------------------------------------------------------------------------ //destructor //------------------------------------------------------------------------------ @@ -112,6 +113,29 @@ void RecallTaskInjector::setPromise() { throw cta::exception::Exception(std::string("In RecallTaskInjector::setPromise() got std::exception: ") + exc.what()); } } + +/** + * Idempotently tell the TapeReadSingleThread + * that the first TapeRead tasks have been injected. + * If we do not do that, as the RecallTaskInjector has to do some + * RAO query, the TapeReadSingleThread would start without any tasks + * and will tell the RecallTaskInjector to stop + */ +void RecallTaskInjector::setFirstTasksInjectedPromise() { + if(!m_promiseFirstTaskInjectedSet){ + m_firstTasksInjectedPromise.set_value(); + m_promiseFirstTaskInjectedSet = true; + } +} + +/** + * This method is used by the TapeReadSingleThread to wait + * the first injection of TapeRead tasks + */ +void RecallTaskInjector::waitForFirstTasksInjectedPromise(){ + m_firstTasksInjectedFuture.wait(); +} + //------------------------------------------------------------------------------ //injectBulkRecalls //------------------------------------------------------------------------------ @@ -196,9 +220,14 @@ void RecallTaskInjector::injectBulkRecalls() { m_tapeReader.push(trt); m_lc.log(cta::log::INFO, "Created tasks for recalling a file"); } + if(njobs > 0){ + //At least one task has been created, we tell the TapeReadSingleThread that + //it can start its infinite loop + setFirstTasksInjectedPromise(); + } m_lc.log(cta::log::INFO, recallOrderLog.str()); m_jobs.clear(); - LogContext::ScopedParam sp03(m_lc, Param("nbFile", m_jobs.size())); + LogContext::ScopedParam sp03(m_lc, Param("nbFile", njobs)); m_lc.log(cta::log::INFO, "Finished processing batch of recall tasks from client"); } //------------------------------------------------------------------------------ @@ -273,6 +302,7 @@ void RecallTaskInjector::WorkerThread::run() if (m_parent.m_useRAO) { /* RecallTaskInjector is waiting to have access to the drive in order * to perform the RAO query; + * This waitForPromise() call means that the drive is mounted */ m_parent.waitForPromise(); try { diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.hpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.hpp index be04742a1d5c42e8465b201e01b990d3aa5a6caf..a71901f021516dded1dcbd6ede8cf61aedf4e5ee 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.hpp @@ -128,6 +128,20 @@ public: void waitForPromise(); void setPromise(); + + /** + * This method will tell the TapeReadSingleThread that the + * first batch of tasks has been injected + * by the RecallTaskInjector + */ + void setFirstTasksInjectedPromise(); + + /** + * This method will be called by the TapeReadSingleThread + * so that TapeReadSingleThread will wait the first batch + * of tasks to be injected by the RecallTaskInjector + */ + void waitForFirstTasksInjectedPromise(); private: /** @@ -239,6 +253,11 @@ private: */ std::promise<void> m_raoPromise; std::future<void> m_raoFuture; + + std::promise<void> m_firstTasksInjectedPromise; + std::future<void> m_firstTasksInjectedFuture; + + bool m_promiseFirstTaskInjectedSet = false; }; } //end namespace daemon diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp index 439a856f496925acf29a7aba4471bc647173cf0e..7ca950f561b0c2c8c986e6c374cc00e5a4cd2990 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp @@ -323,6 +323,11 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter); // Then we will loop on the tasks as they get from // the task injector + + // We wait the task injector to finish inserting its first batch + // before launching the loop. + // We do it with a promise + m_taskInjector->waitForFirstTasksInjectedPromise(); std::unique_ptr<TapeReadTask> task; m_rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Transferring); while(true) {