Commit 9afe9dba authored by Eric Cano's avatar Eric Cano
Browse files

Added missing reporting of the file's blockId in migrations.

This implied adding file block ID tracking in TapeFile::WriteFile
parent 2cd03395
......@@ -59,8 +59,9 @@ MigrationReportPacker::~MigrationReportPacker(){
//reportCompletedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportCompletedJob(
const tapegateway::FileToMigrateStruct& migratedFile,unsigned long checksum) {
std::auto_ptr<Report> rep(new ReportSuccessful(migratedFile,checksum));
const tapegateway::FileToMigrateStruct& migratedFile,u_int32_t checksum,
u_int32_t blockId) {
std::auto_ptr<Report> rep(new ReportSuccessful(migratedFile,checksum,blockId));
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
......@@ -104,13 +105,15 @@ void MigrationReportPacker::reportStuckOn(FileStruct& file){
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& _this){
std::auto_ptr<tapegateway::FileMigratedNotificationStruct> successMigration(new tapegateway::FileMigratedNotificationStruct);
successMigration->setFseq(m_migratedFile.fseq());
successMigration->setFileTransactionId(m_migratedFile.fileTransactionId());
successMigration->setId(m_migratedFile.id());
successMigration->setNshost(m_migratedFile.nshost());
successMigration->setFileid(m_migratedFile.fileid());
successMigration->setBlockId0((m_blockId >> 24) & 0xFF);
successMigration->setBlockId1((m_blockId >> 16) & 0xFF);
successMigration->setBlockId2((m_blockId >> 8) & 0xFF);
successMigration->setBlockId3((m_blockId >> 0) & 0xFF);
//WARNING; Ad-hoc name of the ChecksumName !!");
successMigration->setChecksumName("adler32");
successMigration->setChecksum(m_checksum);
......
......@@ -51,9 +51,12 @@ public:
* of migratedFile
* @param migratedFile the file successfully migrated
* @param checksum the checksum we computed of the file we have just migrated
* @param blockId The tape logical object ID of the first block of the header
* of the file. This is 0 (instead of 1) for the first file on the tape (aka
* fseq = 1).
*/
void reportCompletedJob(const tapegateway::FileToMigrateStruct& migratedFile,
unsigned long checksum);
u_int32_t checksum, u_int32_t blockId);
/**
* Create into the MigrationReportPacker a report for the failled migration
......@@ -102,9 +105,11 @@ private:
class ReportSuccessful : public Report {
const FileStruct m_migratedFile;
const unsigned long m_checksum;
const uint32_t m_blockId;
public:
ReportSuccessful(const FileStruct& file,unsigned long checksum):
m_migratedFile(file),m_checksum(checksum){}
ReportSuccessful(const FileStruct& file,unsigned long checksum,
u_int32_t blockId):
m_migratedFile(file),m_checksum(checksum),m_blockId(blockId){}
virtual void execute(MigrationReportPacker& _this);
};
class ReportFlush : public Report {
......
......@@ -45,8 +45,8 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerNominal) {
tapegateway::FileToMigrateStruct migratedFile;
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
mrp.waitThread();
......@@ -69,9 +69,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFaillure) {
tapegateway::FileToMigrateStruct migratedFile;
tapegateway::FileToMigrateStruct failed;
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportFailedJob(failed,error,-1);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSessionWithErrors(error,-1);
......@@ -94,9 +94,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerFaillureGoodEnd) {
tapegateway::FileToMigrateStruct migratedFile;
tapegateway::FileToMigrateStruct failed;
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportFailedJob(failed,error,-1);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
......@@ -121,9 +121,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerGoodBadEnd) {
tapegateway::FileToMigrateStruct migratedFile;
tapegateway::FileToMigrateStruct failed;
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportCompletedJob(migratedFile,0,0);
mrp.reportFlush(statsCompress);
mrp.reportEndOfSessionWithErrors(error,-1);
......@@ -168,9 +168,9 @@ TEST(castor_tape_tapeserver_daemon, MigrationReportPackerOneByteFile) {
tapegateway::FileToMigrateStruct migrateNullFile;
migratedBigFile.setFileSize(0);
mrp.reportCompletedJob(migratedBigFile,0);
mrp.reportCompletedJob(migratedFileSmall,0);
mrp.reportCompletedJob(migrateNullFile,0);
mrp.reportCompletedJob(migratedBigFile,0,0);
mrp.reportCompletedJob(migratedFileSmall,0,0);
mrp.reportCompletedJob(migrateNullFile,0,0);
tapeserver::drives::compressionStats stats;
stats.toTape=(100000+1)/3;
......
......@@ -72,7 +72,7 @@ namespace daemon {
unsigned long ckSum = Payload::zeroAdler32();
int blockId = 0;
uint32_t memBlockId = 0;
try {
//we first check here to not even try to move the tape if a previous task has failed
//because the tape- could the very reason why the previous one failed,
......@@ -93,7 +93,7 @@ namespace daemon {
AutoReleaseBlock<MigrationMemoryManager> releaser(mb,m_memManager);
//will throw (thus exiting the loop) if something is wrong
checkErrors(mb,blockId,lc);
checkErrors(mb,memBlockId,lc);
ckSum = mb->m_payload.adler32(ckSum);
m_taskStats.checksumingTime += timer.secs(utils::Timer::resetCounter);
......@@ -102,7 +102,7 @@ namespace daemon {
m_taskStats.transferTime += timer.secs(utils::Timer::resetCounter);
m_taskStats.dataVolume += mb->m_payload.size();
++blockId;
++memBlockId;
}
//finish the writing of the file on tape
......@@ -111,7 +111,7 @@ namespace daemon {
m_taskStats.transferTime += timer.secs(utils::Timer::resetCounter);
m_taskStats.headerVolume += TapeSessionStats::headerVolumePerFile;
m_taskStats.filesCount ++;
reportPacker.reportCompletedJob(*m_fileToMigrate,ckSum);
reportPacker.reportCompletedJob(*m_fileToMigrate,ckSum,output->getBlockId());
m_taskStats.waitReportingTime += timer.secs(utils::Timer::resetCounter);
// Log the successful transfer
logWithStats(LOG_INFO, "File successfully transmitted to drive",
......@@ -158,13 +158,13 @@ namespace daemon {
//------------------------------------------------------------------------------
// checkErrors
//------------------------------------------------------------------------------
void TapeWriteTask::checkErrors(MemBlock* mb,int blockId,castor::log::LogContext& lc){
void TapeWriteTask::checkErrors(MemBlock* mb,int memBlockId,castor::log::LogContext& lc){
using namespace castor::log;
if(m_fileToMigrate->fileid() != static_cast<unsigned int>(mb->m_fileid)
|| blockId != mb->m_fileBlock || mb->isFailed() ){
|| memBlockId != mb->m_fileBlock || mb->isFailed() ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("expected_NSFBLOCKId", memBlockId)),
LogContext::ScopedParam(lc, Param("received_NSFBLOCKId", mb->m_fileBlock)),
LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed()))
};
......
......@@ -122,10 +122,10 @@ private:
* This function will check the consistency of the mem block and
* throw exception is something goes wrong
* @param mb The mem block to check
* @param blockId The block id the mem blopck should be at
* @param memBlockId The block id the mem blopck should be at
* @param lc FOr logging
*/
void checkErrors(MemBlock* mb,int blockId,castor::log::LogContext& lc);
void checkErrors(MemBlock* mb,int memBlockId,castor::log::LogContext& lc);
/**
* Function in charge of opening the WriteFile for m_fileToMigrate
......
......@@ -456,6 +456,12 @@ namespace castor {
hdr2.fill(m_currentBlockSize, m_session->m_compressionEnabled);
uhl1.fill(m_fileToMigrate.fseq(), m_currentBlockSize, m_session->getSiteName(),
m_session->getHostName(), m_session->m_drive.getDeviceInfo());
/* Before writing anything, we record the blockId of the file */
if (1 == ftm.fseq()) {
m_blockId = 0;
} else {
m_blockId = getPosition();
}
m_session->m_drive.writeBlock(&hdr1, sizeof(hdr1));
m_session->m_drive.writeBlock(&hdr2, sizeof(hdr2));
m_session->m_drive.writeBlock(&uhl1, sizeof(uhl1));
......@@ -467,6 +473,10 @@ namespace castor {
return m_session->m_drive.getPositionInfo().currentPosition;
}
uint32_t WriteFile::getBlockId() {
return m_blockId;
}
size_t WriteFile::getBlockSize() {
return m_currentBlockSize;
}
......
......@@ -479,6 +479,13 @@ namespace castor {
*/
uint32_t getPosition() ;
/**
* Retuns the block is of the first block of the header of the file.
* This is changed from 1 to 0 for the first file on the tape (fseq=1)
* @return blockId of the first tape block of the file's header.
*/
uint32_t getBlockId() ;
/**
* Get the block size (that was set at construction time)
* @return the block size in bytes.
......@@ -535,6 +542,12 @@ namespace castor {
* number of blocks written for the current file
*/
int m_numberOfBlocks;
/**
* BlockId of the file (tape block id of the first header block).
* This value is retried at open time.
*/
u_int32_t m_blockId;
};
}
} //end of namespace tape
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment