Commit c36a2119 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for cancellation of write tasks. This is used to flush tasks...

Added support for cancellation of write tasks. This is used to flush tasks when tape read session is lost after a failed positioning.
parent 2cc1c651
......@@ -51,7 +51,12 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc) {
while(1) {
if(MemBlock* const mb = m_fifo.pop()) {
AutoReleaseBlock<RecallMemoryManager> releaser(mb,m_memManager);
if(mb->m_cancelled) {
// If the tape side got cancelled, we report nothing and count
// it as a success.
lc.log(LOG_DEBUG, "File transfer cancelled");
return true;
}
if(m_recallingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
|| blockId != mb->m_fileBlock || mb->m_failed ){
LogContext::ScopedParam sp[]={
......@@ -72,6 +77,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc) {
else
break;
} //end of while(1)
lc.log(LOG_DEBUG, "File successfully transfered.");
reporter.reportCompletedJob(*m_recallingFile,checksum);
return true;
} //end of try
......
......@@ -59,6 +59,18 @@ public:
m_fileBlock = -1;
m_tapeFileBlock = -1;
}
/**
* Mark the block as cancelled: this indicates the writer thread that
* the read was skipped due to previous, unrelated errors, and that this
* file will not be processed at all (and hence should not be reported about).
* This is mainly used for the tape read case, when positioning is confused
* (when positioning by fSeq, there's nothing we can do).
*/
void markAsCancelled(){
m_cancelled = true;
m_fileBlock = -1;
m_tapeFileBlock = -1;
}
/**
* Reset all the members.
* Numerical ones are set at -1.and m_failed to false.
......@@ -69,6 +81,7 @@ public:
m_fSeq = -1;
m_tapeFileBlock = -1;
m_failed=false;
m_cancelled=false;
m_payload.reset();
}
/** Unique memory block id */
......@@ -94,6 +107,10 @@ public:
/** Flag indicating to the receiver that the file read failed */
bool m_failed;
/** Flag indicating that the transfer was cancelled, usually due to a
previous failure. */
bool m_cancelled;
};
......
......@@ -176,6 +176,11 @@ void RecallTaskInjector::WorkerThread::run()
try{
while (1) {
Request req = m_parent.m_queue.pop();
if (req.end) {
m_parent.m_lc.log(LOG_INFO,"Received a end notification from tape thread: triggering the end of session.");
m_parent.signalEndDataMovement();
break;
}
m_parent.m_lc.log(LOG_DEBUG,"RecallJobInjector:run: about to call client interface");
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToRecallList> filesToRecallList(m_parent.m_client.getFilesToRecall(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
......@@ -209,7 +214,7 @@ void RecallTaskInjector::WorkerThread::run()
m_parent.signalEndDataMovement();
m_parent.deleteAllTasks();
}
}
//-------------
m_parent.m_lc.log(LOG_DEBUG, "Finishing RecallTaskInjector thread");
/* We want to finish at the first lastCall we encounter.
......
......@@ -290,7 +290,7 @@ private:
if(!task) {
break;
}
task->reportErrorToDiskTask();
task->reportCancellationToDiskTask();
delete task;
}
}
......
......@@ -162,35 +162,38 @@ public:
lc2.logBacktrace(LOG_ERR, ex.backtrace());
}
//if we end up there because openReadFile brought us here
//then mb is not valid, we need to get a block
//that will be done in reportErrorToDiskTask()
//or directly call reportErrorToDiskTask with the mem block
if(!mb) {
reportErrorToDiskTask();
}
else{
reportErrorToDiskTask(mb);
}
// mb might or might not be allocated at this point, but
// reportErrorToDiskTask will deal with the allocation if required.
reportErrorToDiskTask(mb);
} //end of catch
watchdog.fileFinished();
}
/**
* Get a valid block and ask to to do the report to the disk write task
*/
void reportErrorToDiskTask(){
/**
* Get a valid block and ask to cancel the disk write task
*/
void reportCancellationToDiskTask(){
MemBlock* mb =m_mm.getFreeBlock();
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileid = m_fileToRecall->fileid();
reportErrorToDiskTask(mb);
//mark the block cancelled and push it (plus signal the end)
mb->markAsCancelled();
m_fifo.pushDataBlock(mb);
m_fifo.pushDataBlock(NULL);
}
private:
/**
* Do the actual report to the disk write task
* @param mb We assume that mb is a valid mem block
*/
void reportErrorToDiskTask(MemBlock* mb){
//mark the block failed and push it
void reportErrorToDiskTask(MemBlock* mb = NULL){
//If we are not provided with a block, allocate it and
// fill it up
if (!mb) {
mb=m_mm.getFreeBlock();
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileid = m_fileToRecall->fileid();
}
//mark the block failed and push it (plus signal the end)
mb->markAsFailed();
m_fifo.pushDataBlock(mb);
m_fifo.pushDataBlock(NULL);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment