Commit fa8c0f67 authored by David COME's avatar David COME
Browse files

Bugfix : tapeserver was crashing when a tape was already in the drive

parent 2603eb76
...@@ -92,7 +92,7 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() { ...@@ -92,7 +92,7 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
m_lc.log(LOG_INFO, "Starting DiskWriteWorkerThread"); m_lc.log(LOG_INFO, "Starting DiskWriteWorkerThread");
std::auto_ptr<DiskWriteTask> task; std::auto_ptr<DiskWriteTask> task;
while(1) { while(1) {
task.reset(m_parentThreadPool. m_tasks.pop()); task.reset(m_parentThreadPool.m_tasks.pop());
if (NULL!=task.get()) { if (NULL!=task.get()) {
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc)) { if(false==task->execute(m_parentThreadPool.m_reporter,m_lc)) {
++m_parentThreadPool.m_failedWriteCount; ++m_parentThreadPool.m_failedWriteCount;
......
...@@ -178,12 +178,16 @@ void RecallTaskInjector::WorkerThread::run() ...@@ -178,12 +178,16 @@ void RecallTaskInjector::WorkerThread::run()
* m_diskWriter might still want some more task (the threshold could be crossed), * m_diskWriter might still want some more task (the threshold could be crossed),
* so we discard everything that might still be in the queue * so we discard everything that might still be in the queue
*/ */
bool stillReading =true; if(_this.m_queue.size()>0) {
while(stillReading) { bool stillReading =true;
Request req = _this.m_queue.pop(); while(stillReading) {
if (req.end) stillReading = false; Request req = _this.m_queue.pop();
LogContext::ScopedParam sp(_this.m_lc, Param("lastCall", req.lastCall)); if (req.end){
_this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): popping extra request"); stillReading = false;
}
LogContext::ScopedParam sp(_this.m_lc, Param("lastCall", req.lastCall));
_this.m_lc.log(LOG_INFO,"In RecallJobInjector::WorkerThread::run(): popping extra request");
}
} }
} }
......
...@@ -78,7 +78,38 @@ public: ...@@ -78,7 +78,38 @@ public:
} }
private: private:
class TapeCleaning{
TapeReadSingleThread& m_this;
public:
TapeCleaning(TapeReadSingleThread& parent):m_this(parent){}
~TapeCleaning(){
try{
// Do the final cleanup
m_this.m_drive.unloadTape();
m_this.m_logContext.log(LOG_INFO, "TapeUnloaded");
// And return the tape to the library
m_this.m_rmc.unmountTape(m_this.m_volInfo.vid, m_this.m_drive.librarySlot);
m_this.m_logContext.log(LOG_INFO, "unmountTape");
m_this.m_gsr.tapeUnmounted();
// We now acknowledge to the task injector that read reached the end. There
// will hence be no more requests for more.
m_this.m_taskInjector->finish();
//then we log/notify
m_this.m_logContext.log(LOG_INFO, "Finishing Tape Read Thread. Just signalled task injector of the end");
//then we terminate the global status reporter
m_this.m_gsr.finish();
}
catch(const castor::exception::Exception& ex){
castor::log::ScopedParamContainer scoped(m_this.m_logContext);
scoped.add("exception_message", ex.getMessageValue())
.add("exception_code",ex.code());
m_this.m_logContext.log(LOG_ERR, "Ex in TapeCleaming");
}
}
};
/** /**
* Pop a task from its tasks and if there is not enough tasks left, it will * Pop a task from its tasks and if there is not enough tasks left, it will
* ask the task injector for more * ask the task injector for more
...@@ -100,69 +131,110 @@ private: ...@@ -100,69 +131,110 @@ private:
return vrp.value; return vrp.value;
} }
void mountTape(){
/** castor::log::ScopedParamContainer scoped(m_logContext);
* This function is from Thread, it is the function that will do all the job scoped.add("vid",m_volInfo.vid)
*/ .add("drive_Slot",m_drive.librarySlot);
virtual void run() {
m_logContext.pushOrReplace(log::Param("thread", "tapeRead"));
std::auto_ptr<castor::tape::tapeFile::ReadSession> rs;
try { try {
// Before anything, the tape should be mounted m_rmc.mountTape(m_volInfo.vid, m_drive.librarySlot,
m_rmc.mountTape(m_volInfo.vid, m_drive.librarySlot, legacymsg::RmcProxy::MOUNT_MODE_READONLY); legacymsg::RmcProxy::MOUNT_MODE_READONLY);
m_logContext.log(LOG_INFO, "Tape Mounted");
//wait for drive to be ready }
m_drive.waitUntilReady(600); catch (castor::exception::Exception & ex) {
scoped.add("exception_message", ex.getMessageValue())
// Then we have to initialise the tape read session .add("exception_code",ex.code());
rs.reset(new castor::tape::tapeFile::ReadSession(m_drive,m_volInfo)); m_logContext.log(LOG_ERR, "Failed to mount the tape for reading");
throw;
//and then report }
m_logContext.log(LOG_INFO, "Tape read session session successfully started"); }
m_gsr.tapeMountedForRead(); void waitForDrive(){
} catch (castor::exception::Exception & ex) { try {
//wait for drive to be ready
m_drive.waitUntilReady(600);
}
catch (castor::exception::Exception & ex) {
castor::log::ScopedParamContainer scoped(m_logContext); castor::log::ScopedParamContainer scoped(m_logContext);
scoped.add("exception_message", ex.getMessageValue()) scoped.add("exception_message", ex.getMessageValue())
.add("exception_code",ex.code()); .add("exception_code",ex.code());
m_logContext.log(LOG_ERR, "Failed to start tape read session"); m_logContext.log(LOG_ERR, "Drive not ready after a 600s timeout");
// TODO: log and unroll the session throw;
// TODO: add an unroll mode to the tape read task. (Similar to exec, but pushing blocks marked in error)
} }
// Then we will loop on the tasks as they get from }
// the task injector
while(1) { /**
// NULL indicated the end of work * Try to open an tapeFile::ReadSession, if it fails, we got an exception.
TapeReadTask * task = popAndRequestMoreJobs(); * Return an std::auto_ptr will ensure the callee will have the ownershipe
m_logContext.log(LOG_DEBUG, "TapeReadThread: just got one more job"); * of the object through auto_ptr's copy constructor
if (task) { * @return
task->execute(*rs, m_logContext); */
delete task; std::auto_ptr<castor::tape::tapeFile::ReadSession> openReadSession(){
m_filesProcessed++; try{
} else { std::auto_ptr<castor::tape::tapeFile::ReadSession> rs(
break; new castor::tape::tapeFile::ReadSession(m_drive,m_volInfo));
m_logContext.log(LOG_DEBUG, "Created tapeFile::ReadSession with success");
return rs;
}catch(castor::exception::Exception & ex){
castor::log::ScopedParamContainer scoped(m_logContext);
scoped.add("exception_message", ex.getMessageValue())
.add("exception_code",ex.code());
m_logContext.log(LOG_ERR, "Failed to tapeFile::ReadSession");
throw;
} }
} }
/**
* This function is from Thread, it is the function that will do all the job
*/
virtual void run() {
m_logContext.pushOrReplace(log::Param("thread", "tapeRead"));
// We now acknowledge to the task injector that read reached the end. There try{
// will hence be no more requests for more. (last thread turns off the light) //after this point, the tape ins loaded, so
m_taskInjector->finish(); //it has to be unloaded, unmounted at all cost -> RAII
//will also take care of the GlobalStatusReporer and of RecallTaskInjector
TapeCleaning tapeCleaner(*this);
// Before anything, the tape should be mounted
mountTape();
waitForDrive();
// Then we have to initialise the tape read session
std::auto_ptr<castor::tape::tapeFile::ReadSession> rs(openReadSession());
//and then report
m_logContext.log(LOG_INFO, "Tape read session session successfully started");
m_gsr.tapeMountedForRead();
// Then we will loop on the tasks as they get from
// the task injector
while(1) {
// NULL indicated the end of work
TapeReadTask * task = popAndRequestMoreJobs();
m_logContext.log(LOG_DEBUG, "TapeReadThread: just got one more job");
if (task) {
task->execute(*rs, m_logContext);
delete task;
m_filesProcessed++;
} else {
break;
}
}
}catch(const castor::exception::Exception& e){
//we can only end there because
//moundTape, waitForDrive or crating the ReadSession failed
//that means we cant do anything because the environment is wrong
// so we have to delete all task and return
//does this need to be done in all cases ? If yes, RAII IT !!! m_logContext.log(LOG_ERR, "Tape read session failed to start");
//TODOOOOOOO while(1){
// Do the final cleanup TapeReadTask* task=m_tasks.pop();
m_drive.unloadTape(); if(!task) {
// And return the tape to the library break;
m_rmc.unmountTape(m_volInfo.vid, m_drive.librarySlot); }
task->reportErrorToDiskTask();
//then we log/notigy delete task;
m_logContext.log(LOG_DEBUG, "Finishing Tape Read Thread. Just signalled task injector of the end"); }
m_gsr.tapeUnmounted(); }
//then we terminate the global status reporter
m_gsr.finish();
} }
/** /**
......
...@@ -132,7 +132,7 @@ public: ...@@ -132,7 +132,7 @@ public:
mb->m_fSeq = m_fileToRecall->fseq(); mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileid = m_fileToRecall->fileid(); mb->m_fileid = m_fileToRecall->fileid();
} }
//mark the block failed and push it //mark the block failed and push it
mb->markAsFailed(); mb->markAsFailed();
m_fifo.pushDataBlock(mb); m_fifo.pushDataBlock(mb);
...@@ -144,6 +144,18 @@ public: ...@@ -144,6 +144,18 @@ public:
m_fifo.pushDataBlock(NULL); m_fifo.pushDataBlock(NULL);
lc.log(LOG_DEBUG, "File read completed"); lc.log(LOG_DEBUG, "File read completed");
} }
void reportErrorToDiskTask(){
MemBlock* mb =m_mm.getFreeBlock();
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileid = m_fileToRecall->fileid();
//mark the block failed and push it
mb->markAsFailed();
m_fifo.pushDataBlock(mb);
m_fifo.pushDataBlock(NULL);
}
private: private:
/** /**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment