Commit b5fc3302 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Corrected a race between the RecallTaskInjector and the TapeReadSingleThread

If the RecallTaskInjector had to do RAO query that takes too much time, the TapeReadSingleThread could start its infinite loop with no Recall task in it,
this would cause the stopping of the RecallTaskInjector. Thus, even if there are Retrieve jobs in the queue, as the RecallTaskInjector is stopped, the
TapeReadSingleThread will not be feed with Read tasks and will terminate. The remaining jobs in the queue will trigger a new mount
parent 6548f022
......@@ -49,7 +49,8 @@ RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm,
m_thread(*this),m_memManager(mm),
m_tapeReader(tapeReader),m_diskWriter(diskWriter),
m_retrieveMount(retrieveMount),m_lc(lc),m_maxFiles(maxFiles),m_maxBytes(byteSizeThreshold),
m_useRAO(false) {}
m_useRAO(false),
m_firstTasksInjectedFuture(m_firstTasksInjectedPromise.get_future()){}
//------------------------------------------------------------------------------
//destructor
//------------------------------------------------------------------------------
......@@ -112,6 +113,29 @@ void RecallTaskInjector::setPromise() {
throw cta::exception::Exception(std::string("In RecallTaskInjector::setPromise() got std::exception: ") + exc.what());
}
}
/**
* Idempotently tell the TapeReadSingleThread
* that the first TapeRead tasks have been injected.
* If we do not do that, as the RecallTaskInjector has to do some
* RAO query, the TapeReadSingleThread would start without any tasks
* and will tell the RecallTaskInjector to stop
*/
void RecallTaskInjector::setFirstTasksInjectedPromise() {
if(!m_promiseFirstTaskInjectedSet){
m_firstTasksInjectedPromise.set_value();
m_promiseFirstTaskInjectedSet = true;
}
}
/**
* This method is used by the TapeReadSingleThread to wait
* the first injection of TapeRead tasks
*/
void RecallTaskInjector::waitForFirstTasksInjectedPromise(){
m_firstTasksInjectedFuture.wait();
}
//------------------------------------------------------------------------------
//injectBulkRecalls
//------------------------------------------------------------------------------
......@@ -196,9 +220,14 @@ void RecallTaskInjector::injectBulkRecalls() {
m_tapeReader.push(trt);
m_lc.log(cta::log::INFO, "Created tasks for recalling a file");
}
if(njobs > 0){
//At least one task has been created, we tell the TapeReadSingleThread that
//it can start its infinite loop
setFirstTasksInjectedPromise();
}
m_lc.log(cta::log::INFO, recallOrderLog.str());
m_jobs.clear();
LogContext::ScopedParam sp03(m_lc, Param("nbFile", m_jobs.size()));
LogContext::ScopedParam sp03(m_lc, Param("nbFile", njobs));
m_lc.log(cta::log::INFO, "Finished processing batch of recall tasks from client");
}
//------------------------------------------------------------------------------
......@@ -273,6 +302,7 @@ void RecallTaskInjector::WorkerThread::run()
if (m_parent.m_useRAO) {
/* RecallTaskInjector is waiting to have access to the drive in order
* to perform the RAO query;
* This waitForPromise() call means that the drive is mounted
*/
m_parent.waitForPromise();
try {
......
......@@ -128,6 +128,20 @@ public:
void waitForPromise();
void setPromise();
/**
* This method will tell the TapeReadSingleThread that the
* first batch of tasks has been injected
* by the RecallTaskInjector
*/
void setFirstTasksInjectedPromise();
/**
* This method will be called by the TapeReadSingleThread
* so that TapeReadSingleThread will wait the first batch
* of tasks to be injected by the RecallTaskInjector
*/
void waitForFirstTasksInjectedPromise();
private:
/**
......@@ -239,6 +253,11 @@ private:
*/
std::promise<void> m_raoPromise;
std::future<void> m_raoFuture;
std::promise<void> m_firstTasksInjectedPromise;
std::future<void> m_firstTasksInjectedFuture;
bool m_promiseFirstTaskInjectedSet = false;
};
} //end namespace daemon
......
......@@ -323,6 +323,11 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
m_stats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter);
// Then we will loop on the tasks as they get from
// the task injector
// We wait the task injector to finish inserting its first batch
// before launching the loop.
// We do it with a promise
m_taskInjector->waitForFirstTasksInjectedPromise();
std::unique_ptr<TapeReadTask> task;
m_rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Transferring);
while(true) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment