Commit 511174b4 authored by Eric Cano's avatar Eric Cano
Browse files

Added finish() method to MigrationTaskInjector like in RecallTaskInjector.

It will allow flushing of potential leftover requests at the end of as session and a controlled thread end.
parent f831a93a
......@@ -105,6 +105,27 @@ namespace daemon {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request(maxFiles, byteSizeThreshold, lastCall));
}
bool MigrationTaskInjector::synchronousInjection(uint64_t maxFiles,
uint64_t byteSizeThreshold) {
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList>
filesToMigrateList(m_client.getFilesToMigrate(maxFiles,
byteSizeThreshold,reqReport));
if(NULL == filesToMigrateList.get()) {
m_lc.log(LOG_ERR, "No files to migrate: empty mount");
return false;
} else {
std::vector<tapegateway::FileToMigrateStruct*>& jobs=filesToMigrateList->filesToMigrate();
injectBulkMigrations(jobs);
return true;
}
}
void MigrationTaskInjector::finish(){
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request());
}
//------------------------------------------------------------------------------
void MigrationTaskInjector::WorkerThread::run(){
......@@ -138,8 +159,21 @@ namespace daemon {
while(m_parent.m_queue.size()>0){
Request req = m_parent.m_queue.pop();
}
}
} // end of while(1)
//-------------
m_parent.m_lc.log(LOG_DEBUG, "Finishing MigrationTaskInjector thread");
/* We want to finish at the first lastCall we encounter.
* But even after sending finish() to m_diskWriter and to m_tapeReader,
* m_diskWriter might still want some more task (the threshold could be crossed),
* so we discard everything that might still be in the queue
*/
bool stillReading =true;
while(stillReading) {
Request req = m_parent.m_queue.pop();
if (req.end) stillReading = false;
LogContext::ScopedParam sp(m_parent.m_lc, Param("lastCall", req.lastCall));
m_parent.m_lc.log(LOG_INFO,"In MigrationTaskInjector::WorkerThread::run(): popping extra request");
}
}
......
......@@ -67,8 +67,33 @@ public:
* Start the inner thread
*/
void startThreads();
/**
* Function for a feed-back loop purpose between MigrationTaskInjector and
* DiskReadThreadPool. When DiskReadThreadPool::popAndRequestMoreJobs detects
* it has not enough jobs to do to, it is class to push a request
* in order to (try) fill up the queue.
* @param maxFiles files count requested.
* @param maxBlocks total bytes count at least requested
* @param lastCall true if we want the new request to be a last call.
* See Request::lastCall
*/
void requestInjection(int maxFiles, int byteSizeThreshold, bool lastCall);
/**
* Contact the client to make sure there are really something to do
* Something = migration at most maxFiles or at least maxBytes
*
* @param maxFiles files count requested.
* @param byteSizeThreshold total bytes count at least requested
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousInjection(uint64_t maxFiles, uint64_t byteSizeThreshold);
/**
* Send an end token in the request queue. There should be no subsequent
* calls to requestInjection.
*/
void finish();
private:
/*Compute how many blocks are needed for a file of fileSize bytes*/
......@@ -86,7 +111,10 @@ private:
class Request {
public:
Request(int mf, int mb, bool lc):
nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc) {}
nbMaxFiles(mf), byteSizeThreshold(mb), lastCall(lc), end(false) {}
Request():
nbMaxFiles(-1), byteSizeThreshold(-1), lastCall(true),end(true) {}
const int nbMaxFiles;
const int byteSizeThreshold;
......@@ -97,6 +125,11 @@ private:
* and can send into all the different threads a signal .
*/
const bool lastCall;
/**
* True indicates the task injector will not receive any more request.
*/
const bool end;
};
class WorkerThread: public castor::tape::threading::Thread {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment