Commit e6186f56 authored by David COME's avatar David COME
Browse files

Added comments and fixed code indent of MigrationReportPacker, DiskReadTask, MigrationTaskInjector

and DiskReadThreadPool
parent c053df92
......@@ -36,6 +36,11 @@ namespace daemon {
class DiskReadTask :public DiskReadTaskInterface {
public:
/**
* @param destination The task that will consume data block we fill up
* @param file the file we are migrating. We acquire the ownership of the pointer
* @param numberOfBlock number of memory block we need read the whole file
*/
DiskReadTask(DataConsumer & destination,
tape::tapegateway::FileToMigrateStruct* file,size_t numberOfBlock);
......
......@@ -40,18 +40,34 @@ namespace daemon {
class MigrationTaskInjector;
class DiskReadThreadPool : public DiskThreadPoolInterface<DiskReadTaskInterface> {
public:
/**
* @param nbThread Number of thread for reading files
* @param maxFilesReq maximal number of files we might require
* within a single request to the task injectore
* @param maxBytesReq maximal number of bytes we might require
* within a single request a single request to the task injectore
* @param lc log context fpr logging purpose
*/
DiskReadThreadPool(int nbThread, unsigned int maxFilesReq,unsigned int maxBytesReq,
castor::log::LogContext lc);
~DiskReadThreadPool();
void startThreads();
void waitThreads();
virtual void push(DiskReadTaskInterface *t);
void finish();
void setTaskInjector(MigrationTaskInjector* injector){
m_injector = injector;
}
private:
/** Get the next task to execute and if there is not enough tasks in queue,
* it will ask the TaskInjector to get more job
* @return the next task to execute
*/
DiskReadTaskInterface* popAndRequestMore();
class DiskReadWorkerThread: private castor::tape::threading::Thread {
public:
DiskReadWorkerThread(DiskReadThreadPool & manager):
......
......@@ -51,6 +51,7 @@ public:
* Create into the MigrationReportPacker a report for the successful migration
* of migratedFile
* @param migratedFile the file successfully migrated
* @param checksum the checksum we computed of the file we have just migrated
*/
void reportCompletedJob(const tapegateway::FileToMigrateStruct& migratedFile,
unsigned long checksum);
......@@ -59,6 +60,8 @@ public:
* Create into the MigrationReportPacker a report for the failled migration
* of migratedFile
* @param migratedFile the file which failled
* @param msg the error message to the failure
* @param error_code the error code related to the failure
*/
void reportFailedJob(const tapegateway::FileToMigrateStruct& migratedFile,const std::string& msg,int error_code);
......@@ -145,7 +148,9 @@ private:
*/
bool m_errorHappened;
/* bool to keep the inner thread running. Is set at false
* when a end of session (with error) is called
*/
bool m_continue;
};
......
......@@ -64,21 +64,26 @@ namespace daemon {
for(std::vector<tapegateway::FileToMigrateStruct*>::const_iterator it= jobs.begin();it!=jobs.end();++it){
const u_signed64 fileSize = (*it)->fileSize();
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->fileid())),
LogContext::ScopedParam(m_lc, Param("NSFILESEQNUMBER", (*it)->fseq())),
LogContext::ScopedParam(m_lc, Param("NSFILENSHOST", (*it)->nshost())),
LogContext::ScopedParam(m_lc, Param("NSFILEPATH", (*it)->path()))
LogContext::ScopedParam(m_lc, Param("NSHOSTNAME", (*it)->nshost())),
LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->fileid())),
LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->fseq())),
LogContext::ScopedParam(m_lc, Param("path", (*it)->path()))
};
tape::utils::suppresUnusedVariable(sp);
m_lc.log(LOG_INFO, "Logged file to migrate");
const u_signed64 neededBlock = howManyBlocksNeeded(fileSize,blockCapacity);
std::auto_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock,removeOwningList((*it)->clone()),m_memManager));
std::auto_ptr<DiskReadTask> drt(new DiskReadTask(*twt,removeOwningList((*it)->clone()),neededBlock));
std::auto_ptr<TapeWriteTask> twt(
new TapeWriteTask(neededBlock,removeOwningList((*it)->clone()),m_memManager)
);
std::auto_ptr<DiskReadTask> drt(
new DiskReadTask(*twt,removeOwningList((*it)->clone()),neededBlock)
);
m_tapeWriter.push(twt.release());
m_diskReader.push(drt.release());
m_lc.log(LOG_INFO, "Logged file to migrate");
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment