Commit 2b157e07 authored by David COME's avatar David COME
Browse files

Handle now exception in {Migration/Recall}TaskInjector if we cant get information from the client.

No answer = end of session
parent f69df258
......@@ -145,8 +145,25 @@ namespace daemon {
castor::tape::threading::MutexLocker ml(&m_producerProtection);
m_queue.push(Request());
}
//------------------------------------------------------------------------------
//signalEndDataMovement
//------------------------------------------------------------------------------
void MigrationTaskInjector::signalEndDataMovement(){
//first send the end signal to the threads
m_tapeWriter.finish();
m_diskReader.finish();
m_memManager.finish();
}
//------------------------------------------------------------------------------
//deleteAllTasks
//------------------------------------------------------------------------------
void MigrationTaskInjector::deleteAllTasks(){
//discard all the tasks !!
while(m_queue.size()>0){
m_queue.pop();
}
}
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void MigrationTaskInjector::WorkerThread::run(){
......@@ -159,14 +176,14 @@ namespace daemon {
}
Request req = m_parent.m_queue.pop();
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList> filesToMigrateList(m_parent.m_client.getFilesToMigrate(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
std::auto_ptr<tapegateway::FilesToMigrateList> filesToMigrateList(
m_parent.m_client.getFilesToMigrate(req.nbMaxFiles, req.byteSizeThreshold,reqReport)
);
if(NULL==filesToMigrateList.get()){
if (req.lastCall) {
m_parent.m_lc.log(LOG_INFO,"No more file to migrate: triggering the end of session.\n");
m_parent.m_tapeWriter.finish();
m_parent.m_diskReader.finish();
m_parent.m_memManager.finish();
m_parent.signalEndDataMovement();
break;
} else {
m_parent.m_lc.log(LOG_INFO,"In MigrationTaskInjector::WorkerThread::run(): got empty list, but not last call");
......@@ -181,15 +198,22 @@ namespace daemon {
m_parent.m_lc.log(LOG_ERR,"In MigrationTaskInjector::WorkerThread::run(): a task screw up, "
"finishing and discarding all tasks ");
//first send the end signal to the threads
m_parent.m_tapeWriter.finish();
m_parent.m_diskReader.finish();
m_parent.signalEndDataMovement();
m_parent.deleteAllTasks();
}
catch(const castor::exception::Exception& ex){
//we end up there because we could not talk to the client
//discard all the tasks !!
while(m_parent.m_queue.size()>0){
m_parent.m_queue.pop();
log::ScopedParamContainer container( m_parent.m_lc);
container.add("exception code",ex.code())
.add("exception message",ex.getMessageValue());
m_parent.m_lc.logBacktrace(LOG_ERR,ex.backtrace());
m_parent.m_lc.log(LOG_ERR,"In MigrationTaskInjector::WorkerThread::run(): "
"could not retrieve a list of file to migrate. End of session");
m_parent.signalEndDataMovement();
m_parent.deleteAllTasks();
}
} // end of while(1)
//-------------
m_parent.m_lc.log(LOG_INFO, "Finishing MigrationTaskInjector thread");
/* We want to finish at the first lastCall we encounter.
......
......@@ -114,6 +114,16 @@ private:
return fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1);
}
/**
* It will signal to the disk read thread pool, tape write single thread
* and to the mem manager they have to stop their threads(s)
*/
void signalEndDataMovement();
/**
* It will delete all remaining tasks
*/
void deleteAllTasks();
/**
* A request of files to migrate. We request EITHER
* - a maximum of nbMaxFiles files
......
......@@ -138,55 +138,86 @@ bool RecallTaskInjector::synchronousInjection()
return true;
}
}
//------------------------------------------------------------------------------
//signalEndDataMovement
//------------------------------------------------------------------------------
void RecallTaskInjector::signalEndDataMovement(){
//first send the end signal to the threads
m_tapeReader.finish();
m_diskWriter.finish();
}
//------------------------------------------------------------------------------
//deleteAllTasks
//------------------------------------------------------------------------------
void RecallTaskInjector::deleteAllTasks(){
//discard all the tasks !!
while(m_queue.size()>0){
m_queue.pop();
}
}
//------------------------------------------------------------------------------
//WorkerThread::run
//------------------------------------------------------------------------------
void RecallTaskInjector::WorkerThread::run()
{
using castor::log::LogContext;
_this.m_lc.pushOrReplace(Param("thread", "recallTaskInjector"));
_this.m_lc.log(LOG_DEBUG, "Starting RecallTaskInjector thread");
m_parent.m_lc.pushOrReplace(Param("thread", "recallTaskInjector"));
m_parent.m_lc.log(LOG_DEBUG, "Starting RecallTaskInjector thread");
try{
while (1) {
Request req = _this.m_queue.pop();
_this.m_lc.log(LOG_INFO,"RecallJobInjector:run: about to call client interface\n");
Request req = m_parent.m_queue.pop();
m_parent.m_lc.log(LOG_INFO,"RecallJobInjector:run: about to call client interface\n");
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToRecallList> filesToRecallList(_this.m_client.getFilesToRecall(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
std::auto_ptr<tapegateway::FilesToRecallList> filesToRecallList(m_parent.m_client.getFilesToRecall(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
LogContext::ScopedParam sp01(_this.m_lc, Param("transactionId", reqReport.transactionId));
LogContext::ScopedParam sp02(_this.m_lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp03(_this.m_lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
LogContext::ScopedParam sp01(m_parent.m_lc, Param("transactionId", reqReport.transactionId));
LogContext::ScopedParam sp02(m_parent.m_lc, Param("connectDuration", reqReport.connectDuration));
LogContext::ScopedParam sp03(m_parent.m_lc, Param("sendRecvDuration", reqReport.sendRecvDuration));
if (NULL == filesToRecallList.get()) {
if (req.lastCall) {
_this.m_lc.log(LOG_INFO,"No more file to recall: triggering the end of session.\n");
_this.m_tapeReader.finish();
_this.m_diskWriter.finish();
m_parent.m_lc.log(LOG_INFO,"No more file to recall: triggering the end of session.\n");
m_parent.signalEndDataMovement();
break;
} else {
_this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): got empty list, but not last call. NoOp.\n");
m_parent.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): got empty list, but not last call. NoOp.\n");
}
} else {
std::vector<tapegateway::FileToRecallStruct*>& jobs= filesToRecallList->filesToRecall();
_this.injectBulkRecalls(jobs);
m_parent.injectBulkRecalls(jobs);
}
} // end of while(1)
} //end of try
catch(const castor::exception::Exception& ex){
//we end up there because we could not talk to the client
log::ScopedParamContainer container( m_parent.m_lc);
container.add("exception code",ex.code())
.add("exception message",ex.getMessageValue());
m_parent.m_lc.logBacktrace(LOG_ERR,ex.backtrace());
m_parent.m_lc.log(LOG_ERR,"In RecallJobInjector::WorkerThread::run(): "
"could not retrieve a list of file to recall. End of session");
m_parent.signalEndDataMovement();
m_parent.deleteAllTasks();
}
//-------------
_this.m_lc.log(LOG_DEBUG, "Finishing RecallTaskInjector thread");
m_parent.m_lc.log(LOG_DEBUG, "Finishing RecallTaskInjector thread");
/* We want to finish at the first lastCall we encounter.
* But even after sending finish() to m_diskWriter and to m_tapeReader,
* m_diskWriter might still want some more task (the threshold could be crossed),
* so we discard everything that might still be in the queue
*/
if(_this.m_queue.size()>0) {
if(m_parent.m_queue.size()>0) {
bool stillReading =true;
while(stillReading) {
Request req = _this.m_queue.pop();
Request req = m_parent.m_queue.pop();
if (req.end){
stillReading = false;
}
LogContext::ScopedParam sp(_this.m_lc, Param("lastCall", req.lastCall));
_this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): popping extra request");
LogContext::ScopedParam sp(m_parent.m_lc, Param("lastCall", req.lastCall));
m_parent.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): popping extra request");
}
}
}
......
......@@ -110,6 +110,16 @@ public:
*/
void startThreads();
private:
/**
* It will signal to the disk read thread pool, tape write single thread
* and to the mem manager they have to stop their threads(s)
*/
void signalEndDataMovement();
/**
* It will delete all remaining tasks
*/
void deleteAllTasks();
/**
* Create all the tape-read and write-disk tasks for set of files to retrieve
......@@ -149,10 +159,10 @@ private:
class WorkerThread: public castor::tape::threading::Thread {
public:
WorkerThread(RecallTaskInjector & rji): _this(rji) {}
WorkerThread(RecallTaskInjector & rji): m_parent(rji) {}
virtual void run();
private:
RecallTaskInjector & _this;
RecallTaskInjector & m_parent;
} m_thread;
///The memory manager for accessing memory blocks.
RecallMemoryManager & m_memManager;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment