Commit 8ae0f68f authored by David COME's avatar David COME
Browse files

Added a mechanism to pass into a deteriorated mode in case of error while migrating :

If a Diskread ou TapeWrite task fails, all others stop and clean memory blocks while the task injector discards all remaining requests
parent d131830b
......@@ -48,8 +48,10 @@ namespace tapeserver {
namespace daemon {
DiskReadTask::DiskReadTask(DataConsumer & destination,
tape::tapegateway::FileToMigrateStruct* file,size_t numberOfBlock):
m_nextTask(destination),m_migratedFile(file),m_numberOfBlock(numberOfBlock)
tape::tapegateway::FileToMigrateStruct* file,
size_t numberOfBlock,castor::tape::threading::AtomicFlag& errorFlag):
m_nextTask(destination),m_migratedFile(file),
m_numberOfBlock(numberOfBlock),m_errorFlag(errorFlag)
{}
void DiskReadTask::execute(log::LogContext& lc) {
......@@ -57,12 +59,20 @@ namespace daemon {
size_t migratingFileSize=m_migratedFile->fileSize();
try{
tape::diskFile::ReadFile sourceFile(m_migratedFile->path());
log::LogContext::ScopedParam sp(lc, log::Param("filePath",m_migratedFile->path()));
lc.log(LOG_INFO,"Opened file on disk for migration ");
while(migratingFileSize>0){
//if a task has signaled an error, we stop our job
if(m_errorFlag){
throw castor::tape::exceptions::ErrorFlag();
}
MemBlock* const mb = m_nextTask.getFreeBlock();
AutoPushBlock push(mb,m_nextTask);
//set metadata and read the data
mb->m_fileid = m_migratedFile->fileid();
mb->m_fileBlock = blockId++;
......@@ -76,24 +86,39 @@ namespace daemon {
}
} //end of while(migratingFileSize>0)
}
catch(const castor::tape::exceptions::ErrorFlag&){
lc.log(LOG_INFO,"DiskReadTask: a previous file has failed for migration "
"Do nothing except circulating blocks");
circulateAllBlocks(blockId);
}
catch(const castor::tape::Exception& e){
//signal to all others task that this session is screwed
m_errorFlag.set();
//we have to pump the blocks anyway, mark them failed and then pass them back to TapeWrite
//Otherwise they would be stuck into TapeWriteTask free block fifo
using log::LogContext;
using log::Param;
LogContext::ScopedParam sp(lc, Param("blockID",blockId));
lc.log(LOG_ERR,e.getMessageValue());
LogContext::ScopedParam sp0(lc, Param("exceptionCode",e.code()));
LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue()));
lc.log(LOG_ERR,"Exception while reading a file");
//deal here the number of mem block
while(blockId<m_numberOfBlock) {
MemBlock * mb = m_nextTask.getFreeBlock();
mb->m_failed=true;
m_nextTask.pushDataBlock(mb);
++blockId;
} //end of while
circulateAllBlocks(blockId);
} //end of catch
}
void DiskReadTask::circulateAllBlocks(size_t fromBlockId){
size_t blockId = fromBlockId;
while(blockId<m_numberOfBlock) {
MemBlock * mb = m_nextTask.getFreeBlock();
mb->m_failed=true;
m_nextTask.pushDataBlock(mb);
++blockId;
} //end of while
}
}}}}
......@@ -28,7 +28,7 @@
#include "castor/tape/tapeserver/daemon/DataFifo.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
......@@ -42,10 +42,13 @@ public:
* @param numberOfBlock number of memory block we need read the whole file
*/
DiskReadTask(DataConsumer & destination,
tape::tapegateway::FileToMigrateStruct* file,size_t numberOfBlock);
tape::tapegateway::FileToMigrateStruct* file,size_t numberOfBlock,
castor::tape::threading::AtomicFlag& errorFlag);
virtual void execute(log::LogContext& lc);
private:
void circulateAllBlocks(size_t fromBlockId);
/**
* The task (a TapeWriteTask) that will handle the read blocks
*/
......@@ -60,6 +63,8 @@ private:
* The number of memory block we will need to read the whole file
*/
size_t m_numberOfBlock;
castor::tape::threading::AtomicFlag& m_errorFlag;
};
}}}}
......
......@@ -75,10 +75,10 @@ namespace daemon {
const u_signed64 neededBlock = howManyBlocksNeeded(fileSize,blockCapacity);
std::auto_ptr<TapeWriteTask> twt(
new TapeWriteTask(neededBlock,removeOwningList((*it)->clone()),m_memManager)
new TapeWriteTask(neededBlock,removeOwningList((*it)->clone()),m_memManager,m_errorFlag)
);
std::auto_ptr<DiskReadTask> drt(
new DiskReadTask(*twt,removeOwningList((*it)->clone()),neededBlock)
new DiskReadTask(*twt,removeOwningList((*it)->clone()),neededBlock,m_errorFlag)
);
m_tapeWriter.push(twt.release());
......@@ -108,27 +108,38 @@ namespace daemon {
//------------------------------------------------------------------------------
void MigrationTaskInjector::WorkerThread::run(){
_this.m_lc.pushOrReplace(Param("thread", "MigrationTaskInjector"));
_this.m_lc.log(LOG_DEBUG, "Starting MigrationTaskInjector thread");
while(1){
Request req = _this.m_queue.pop();
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList> filesToMigrateList(_this.m_client.getFilesToMigrate(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
if(NULL==filesToMigrateList.get()){
if (req.lastCall) {
_this.m_lc.log(LOG_INFO,"No more file to migrate: triggering the end of session.\n");
_this.m_tapeWriter.finish();
_this.m_diskReader.finish();
break;
m_parent.m_lc.pushOrReplace(Param("thread", "MigrationTaskInjector"));
m_parent.m_lc.log(LOG_DEBUG, "Starting MigrationTaskInjector thread");
try{
while(1){
if(m_parent.m_errorFlag){
throw castor::tape::exceptions::ErrorFlag();
}
Request req = m_parent.m_queue.pop();
client::ClientProxy::RequestReport reqReport;
std::auto_ptr<tapegateway::FilesToMigrateList> filesToMigrateList(m_parent.m_client.getFilesToMigrate(req.nbMaxFiles, req.byteSizeThreshold,reqReport));
if(NULL==filesToMigrateList.get()){
if (req.lastCall) {
m_parent.m_lc.log(LOG_INFO,"No more file to migrate: triggering the end of session.\n");
m_parent.m_tapeWriter.finish();
m_parent.m_diskReader.finish();
break;
} else {
m_parent.m_lc.log(LOG_INFO,"In MigrationTaskInjector::WorkerThread::run(): got empty list, but not last call");
}
} else {
_this.m_lc.log(LOG_INFO,"In MigrationTaskInjector::WorkerThread::run(): got empty list, but not last call");
m_parent.injectBulkMigrations(filesToMigrateList->filesToMigrate());
}
} else {
_this.injectBulkMigrations(filesToMigrateList->filesToMigrate());
} //end of while(1)
}//end of try
catch(const castor::tape::exceptions::ErrorFlag&){
//discard all the tasks !!
while(m_parent.m_queue.size()>0){
Request req = m_parent.m_queue.pop();
}
}
_this.m_lc.log(LOG_DEBUG, "Finishing MigrationTaskInjector thread");
m_parent.m_lc.log(LOG_DEBUG, "Finishing MigrationTaskInjector thread");
}
......
......@@ -34,6 +34,7 @@
#include "castor/tape/tapeserver/client/ClientInterface.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/daemon/TaskInjector.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
namespace castor{
namespace tape{
namespace tapeserver{
......@@ -100,10 +101,10 @@ private:
class WorkerThread: public castor::tape::threading::Thread {
public:
WorkerThread(MigrationTaskInjector & rji): _this(rji) {}
WorkerThread(MigrationTaskInjector & rji): m_parent(rji) {}
virtual void run();
private:
MigrationTaskInjector & _this;
MigrationTaskInjector & m_parent;
} m_thread;
MemoryManager & m_memManager;
......@@ -120,6 +121,8 @@ private:
castor::tape::threading::Mutex m_producerProtection;
castor::tape::threading::BlockingQueue<Request> m_queue;
castor::tape::threading::AtomicFlag m_errorFlag;
};
} //end namespace daemon
......
......@@ -30,6 +30,8 @@
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
#include "castor/tape/tapeserver/file/File.hpp"
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
#include "castor/tape/tapeserver/exception/Exception.hpp"
namespace {
unsigned long initAdler32Checksum() {
return adler32(0L,Z_NULL,0);
......@@ -41,8 +43,10 @@ namespace tapeserver {
namespace daemon {
TapeWriteTask::TapeWriteTask(int blockCount, tapegateway::FileToMigrateStruct* file,MemoryManager& mm):
m_fileToMigrate(file),m_memManager(mm), m_fifo(blockCount),m_blockCount(blockCount)
TapeWriteTask::TapeWriteTask(int blockCount, tapegateway::FileToMigrateStruct* file,
MemoryManager& mm,castor::tape::threading::AtomicFlag& errorFlag):
m_fileToMigrate(file),m_memManager(mm), m_fifo(blockCount),
m_blockCount(blockCount),m_errorFlag(errorFlag)
{
mm.addClient(&m_fifo);
}
......@@ -58,10 +62,12 @@ namespace daemon {
unsigned long ckSum = initAdler32Checksum();
int blockId = 0;
try {
std::auto_ptr<castor::tape::tapeFile::WriteFile> output(openWriteFile(session,lc));
while(!m_fifo.finished()) {
if(m_errorFlag){
throw castor::tape::exceptions::ErrorFlag();
}
MemBlock* const mb = m_fifo.popDataBlock();
AutoReleaseBlock<MemoryManager> releaser(mb,m_memManager);
......@@ -77,22 +83,33 @@ namespace daemon {
lc.log(LOG_ERR,"received a bad block for writing");
throw castor::tape::Exception("received a bad block for writing");
}
ckSum = mb->m_payload.adler32(ckSum);
mb->m_payload.write(*output);
++blockId;
}
output->close();
reportPacker.reportCompletedJob(*m_fileToMigrate,ckSum);
}
catch(const castor::tape::exceptions::ErrorFlag&){
lc.log(LOG_INFO,"TapeWriteTask: a previous file has failed for migration "
"Do nothing except circulating blocks");
circulateMemBlocks();
}
catch(const castor::tape::Exception& e){
m_errorFlag.set();
//we can end up there because
//we failed to open the WriteFile
//we received a bad block or a block written failed
//close failed
LogContext::ScopedParam sp(lc, Param("exceptionCode",e.code()));
LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue()));
lc.log(LOG_ERR,"Circulating blocks into TapeWriteTask::execute");
while(!m_fifo.finished()) {
m_memManager.releaseBlock(m_fifo.popDataBlock());
}
circulateMemBlocks();
reportPacker.reportFailedJob(*m_fileToMigrate,e.getMessageValue(),e.code());
}
}
}
MemBlock * TapeWriteTask::getFreeBlock() {
......@@ -112,17 +129,22 @@ namespace daemon {
std::auto_ptr<tapeFile::WriteFile> TapeWriteTask::openWriteFile(
tape::tapeFile::WriteSession & session, log::LogContext& lc){
std::auto_ptr<tape::tapeFile::WriteFile> output;
try{
output.reset(new tape::tapeFile::WriteFile(&session, *m_fileToMigrate,m_memManager.blockCapacity()));
lc.log(LOG_DEBUG, "Successfully opened the tape file for writing");
}
catch(const castor::exception::Exception & ex){
lc.log(LOG_ERR, "Failed to open tape file for writing");
throw;
}
return output;
}
std::auto_ptr<tape::tapeFile::WriteFile> output;
try{
output.reset(new tape::tapeFile::WriteFile(&session, *m_fileToMigrate,m_memManager.blockCapacity()));
lc.log(LOG_DEBUG, "Successfully opened the tape file for writing");
}
catch(const castor::exception::Exception & ex){
lc.log(LOG_ERR, "Failed to open tape file for writing");
throw;
}
return output;
}
void TapeWriteTask::circulateMemBlocks(){
while(!m_fifo.finished()) {
m_memManager.releaseBlock(m_fifo.popDataBlock());
}
}
}}}}
......@@ -30,7 +30,7 @@
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
......@@ -50,7 +50,8 @@ public:
* @param blockCount: number of memory blocks (TODO:?)
* @param mm: reference to the memory manager in use
*/
TapeWriteTask(int blockCount, tape::tapegateway::FileToMigrateStruct* file,MemoryManager& mm);
TapeWriteTask(int blockCount, tape::tapegateway::FileToMigrateStruct* file,
MemoryManager& mm,castor::tape::threading::AtomicFlag& errorFlag);
/**
......@@ -83,6 +84,8 @@ public:
virtual ~TapeWriteTask();
private:
void circulateMemBlocks();
/**
* Function in charge of opening the WriteFile for m_fileToMigrate
* Throw an exception it it fails
......@@ -117,6 +120,8 @@ private:
* The number of memory blocks to be used
*/
int m_blockCount;
castor::tape::threading::AtomicFlag& m_errorFlag;
};
}}}}
......
......@@ -53,6 +53,12 @@ namespace tape {
EndOfFile(const std::string & w): castor::exception::Exception(w) {}
virtual ~EndOfFile() throw() {}
};
class ErrorFlag : public castor::exception::Exception {
public:
ErrorFlag(): castor::exception::Exception("Internal exception, should not be seen") {}
virtual ~ErrorFlag() throw() {}
};
}
} //namespace tape
} //namespace castor
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment