diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index 6d87b229e2a63b430410f97b69a12e2b1fab1b5e..a55e83e38f966b829a67dffaa081715ec7e139a4 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -123,7 +123,7 @@ std::string cta::ArchiveJob::reportURL() noexcept { try { return exceptionThrowingReportURL(); } catch (exception::Exception& ex) { - return ex.what(); + return ex.getMessageValue(); } catch (...) { return "In ArchiveJob::reportURL(): unknown exception"; } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataConsumer.hpp b/tapeserver/castor/tape/tapeserver/daemon/DataConsumer.hpp index 288924cbfd96335c1320fba85abdcf299f4fe62f..5434a854ad29e5cd62f5316b5aadce1b0584b9b7 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataConsumer.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataConsumer.hpp @@ -27,7 +27,7 @@ namespace tape { namespace tapeserver { namespace daemon { -// Antcipated declaration to hasten compilation +// Anticipated declaration to hasten compilation class MemBlock; class DataConsumer { @@ -48,7 +48,7 @@ public: /** * Destructor */ - virtual ~DataConsumer() {} + virtual ~DataConsumer() = default; }; } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataPipeline.hpp b/tapeserver/castor/tape/tapeserver/daemon/DataPipeline.hpp index 2b1f78b16e2d8da7126758729b9067b9d96410e1..ac639ad777800b8a7871807b4590ebd06535433e 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataPipeline.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataPipeline.hpp @@ -48,10 +48,10 @@ public: * Constructor * @param bn :how many memory block we want in the fifo (its size) */ - DataPipeline(int bn) : m_blocksNeeded(bn), m_freeBlocksProvided(0), - m_dataBlocksPushed(0), m_dataBlocksPopped(0) {}; - - ~DataPipeline() throw() { + explicit DataPipeline(uint64_t bn) : + m_blocksNeeded(bn), m_freeBlocksProvided(0), m_dataBlocksPushed(0), m_dataBlocksPopped(0){}; + + ~DataPipeline() noexcept { cta::threading::MutexLocker ml(m_freeBlockProviderProtection); } @@ -129,7 +129,7 @@ public: */ bool finished() { // No need to lock because only one int variable is read. - //TODO : are we sure the operation is atomic ? It is plateform dependant + //TODO : are we sure the operation is atomic ? It is platform dependant cta::threading::MutexLocker ml(m_countersMutex); return m_dataBlocksPopped >= m_blocksNeeded; } @@ -139,16 +139,16 @@ private: cta::threading::Mutex m_freeBlockProviderProtection; ///the number of memory blocks we want to be provided to the object (its size). - const int m_blocksNeeded; + const uint64_t m_blocksNeeded; ///how many blocks have been currently provided - volatile int m_freeBlocksProvided; + volatile uint64_t m_freeBlocksProvided; ///how many data blocks have been currently pushed - volatile int m_dataBlocksPushed; + volatile uint64_t m_dataBlocksPushed; ///how many data blocks have been currently taken - volatile int m_dataBlocksPopped; + volatile uint64_t m_dataBlocksPopped; ///thread sage storage of all free blocks cta::threading::BlockingQueue<MemBlock *> m_freeBlocks; diff --git a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp index 017cd5a4dab5caec882d18a45d9101c4fd5fd22a..a36ac417fb5fe8145a2d9bc8425c4541e908a353 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DataTransferSessionTest.cpp @@ -1109,8 +1109,8 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) { uint8_t data[1000]; size_t archiveFileSize = sizeof(data); castor::tape::SCSI::Structures::zeroStruct(&data); - int fseq; - for (fseq = 1; fseq <= MAX_RECALLS; fseq++) { + + for (int fseq = 1; fseq <= MAX_RECALLS; fseq++) { expectedRAOFseqOrder[fseq / MAX_BULK_RECALLS].push_back(std::to_string(fseq)); // Create a path to a remote destination file std::ostringstream remoteFilePath; @@ -1298,10 +1298,10 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecallLinearAlgorithm) { uint8_t data[1000]; size_t archiveFileSize = sizeof(data); castor::tape::SCSI::Structures::zeroStruct(&data); - int fseq; + // For the RAO orders we will have two rao calls : first with 30 files, // the second with 20 files - for (fseq = 1; fseq <= MAX_RECALLS; fseq++) { + for (int fseq = 1; fseq <= MAX_RECALLS; fseq++) { expectedRAOOrder[fseq / MAX_BULK_RECALLS].push_back(std::to_string(fseq)); // Create a path to a remote destination file std::ostringstream remoteFilePath; @@ -1485,10 +1485,10 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecallRAOAlgoDoesNotExistS uint8_t data[1000]; size_t archiveFileSize = sizeof(data); castor::tape::SCSI::Structures::zeroStruct(&data); - int fseq; + // For the RAO orders we will have two rao calls : first with 30 files, // the second with 20 files - for (fseq = 1; fseq <= MAX_RECALLS; fseq++) { + for (int fseq = 1; fseq <= MAX_RECALLS; fseq++) { expectedRAOOrder[fseq / MAX_BULK_RECALLS].push_back(std::to_string(fseq)); // Create a path to a remote destination file std::ostringstream remoteFilePath; @@ -1676,10 +1676,10 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecallSLTFRAOAlgorithm) { uint8_t data[1000]; size_t archiveFileSize = sizeof(data); castor::tape::SCSI::Structures::zeroStruct(&data); - int fseq; + // For the RAO orders we will have two rao calls : first with 30 files, // the second with 20 files - for (fseq = 1; fseq <= MAX_RECALLS; fseq++) { + for (int fseq = 1; fseq <= MAX_RECALLS; fseq++) { expectedRAOOrder[fseq / MAX_BULK_RECALLS].push_back(std::to_string(fseq)); // Create a path to a remote destination file std::ostringstream remoteFilePath; @@ -2333,30 +2333,8 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongFileSizeMigration) { mockSys.fake.m_pathToDrive["/dev/nst0"]->rewind(); // Create the files and schedule the archivals - - //First a file with wrong checksum - { - int fseq = 1; - sourceFiles.emplace_back(std::make_unique<unitTests::TempFile>()); - sourceFiles.back()->randomFill(1000); - remoteFilePaths.push_back(sourceFiles.back()->path()); - // Schedule the archival of the file - cta::common::dataStructures::ArchiveRequest ar; - ar.checksumBlob.insert(cta::checksum::ADLER32, sourceFiles.back()->adler32()); - ar.storageClass=s_storageClassName; - ar.srcURL=std::string("file://") + sourceFiles.back()->path(); - ar.requester.name = requester.username; - ar.requester.group = "group"; - ar.fileSize = 900; // Wrong file size - ar.diskFileID = std::to_string(fseq); - ar.diskFileInfo.path = "y"; - ar.diskFileInfo.owner_uid = DISK_FILE_OWNER_UID; - ar.diskFileInfo.gid = DISK_FILE_GID; - const auto archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, ar.storageClass, ar.requester, logContext); - archiveFileIds.push_back(archiveFileId); - scheduler.queueArchiveWithGivenId(archiveFileId,s_diskInstance,ar,logContext); - } - for(int fseq=2; fseq <= 10 ; fseq ++) { + const int problematicFseq = 1; + for (int fseq=1; fseq <= 10 ; fseq ++) { // Create a source file. sourceFiles.emplace_back(std::make_unique<unitTests::TempFile>()); sourceFiles.back()->randomFill(1000); @@ -2368,7 +2346,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongFileSizeMigration) { ar.srcURL=std::string("file://") + sourceFiles.back()->path(); ar.requester.name = requester.username; ar.requester.group = "group"; - ar.fileSize = 1000; + ar.fileSize = (fseq != problematicFseq) ? 1000 : 900; // 900 is wrong reported size ar.diskFileID = std::to_string(fseq); ar.diskFileInfo.path = "y"; ar.diskFileInfo.owner_uid = DISK_FILE_OWNER_UID; @@ -2506,6 +2484,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongChecksumMigration) { // Tempfiles are in this scope so they are kept alive std::list<std::unique_ptr<unitTests::TempFile>> sourceFiles; std::list<uint64_t> archiveFileIds; + const uint64_t problematicFseq = 5; { // Label the tape castor::tape::tapeFile::LabelSession::label(mockSys.fake.m_pathToDrive["/dev/nst0"], s_vid, false); @@ -2515,35 +2494,17 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongChecksumMigration) { // Create the files and schedule the archivals //First a file with wrong checksum - { - int fseq = 1; - sourceFiles.emplace_back(std::make_unique<unitTests::TempFile>()); - sourceFiles.back()->randomFill(1000); - remoteFilePaths.push_back(sourceFiles.back()->path()); - // Schedule the archival of the file - cta::common::dataStructures::ArchiveRequest ar; - ar.checksumBlob.insert(cta::checksum::ADLER32, sourceFiles.back()->adler32() + 1); // Wrong reported checksum - ar.storageClass=s_storageClassName; - ar.srcURL=std::string("file://") + sourceFiles.back()->path(); - ar.requester.name = requester.username; - ar.requester.group = "group"; - ar.fileSize = 1000; - ar.diskFileID = std::to_string(fseq); - ar.diskFileInfo.path = "y"; - ar.diskFileInfo.owner_uid = DISK_FILE_OWNER_UID; - ar.diskFileInfo.gid = DISK_FILE_GID; - const auto archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, ar.storageClass, ar.requester, logContext); - archiveFileIds.push_back(archiveFileId); - scheduler.queueArchiveWithGivenId(archiveFileId,s_diskInstance,ar,logContext); - } - for(int fseq=2; fseq <= 10 ; fseq ++) { + for (uint64_t fseq=1; fseq <= 10 ; fseq ++) { // Create a source file. sourceFiles.emplace_back(std::make_unique<unitTests::TempFile>()); sourceFiles.back()->randomFill(1000); remoteFilePaths.push_back(sourceFiles.back()->path()); // Schedule the archival of the file cta::common::dataStructures::ArchiveRequest ar; - ar.checksumBlob.insert(cta::checksum::ADLER32, sourceFiles.back()->adler32()); + ar.checksumBlob.insert(cta::checksum::ADLER32, (fseq != problematicFseq) ? + sourceFiles.back()->adler32() : // Correct reported checksum + sourceFiles.back()->adler32() + 1); // Wrong reported checksum + ar.storageClass=s_storageClassName; ar.srcURL=std::string("file://") + sourceFiles.back()->path(); ar.requester.name = requester.username; @@ -2593,12 +2554,17 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongChecksumMigration) { std::string logToCheck = logger.getLog(); logToCheck += ""; ASSERT_EQ(s_vid, sess.getVid()); - - auto afiiter = archiveFileIds.begin(); - // None of the files made it to the catalogue - for(__attribute__ ((unused)) auto & sf: sourceFiles) { - auto afi = *(afiiter++); - ASSERT_THROW(catalogue.ArchiveFile()->getArchiveFileById(afi), cta::exception::Exception); + + std::cout << logToCheck << std::endl; + + for (const auto & fileNumber : archiveFileIds) { + if (fileNumber < problematicFseq) { + // Files queued without the wrong checksum made it to the catalogue + auto afs = catalogue.ArchiveFile()->getArchiveFileById(fileNumber); + } else { + // Remaining files were re-queued + ASSERT_THROW(catalogue.ArchiveFile()->getArchiveFileById(fileNumber), cta::exception::Exception); + } } // Check logs for drive statistics @@ -2677,6 +2643,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongFilesizeInMiddleOfBatchM // Tempfiles are in this scope so they are kept alive std::list<std::unique_ptr<unitTests::TempFile>> sourceFiles; std::list<uint64_t> archiveFileIds; + const uint64_t problematicFseq = 5; { // Label the tape castor::tape::tapeFile::LabelSession::label(mockSys.fake.m_pathToDrive["/dev/nst0"], s_vid, false); @@ -2684,51 +2651,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongFilesizeInMiddleOfBatchM mockSys.fake.m_pathToDrive["/dev/nst0"]->rewind(); // Create the files and schedule the archivals - for(int fseq=1; fseq <= 4 ; fseq ++) { - // Create a source file. - sourceFiles.emplace_back(std::make_unique<unitTests::TempFile>()); - sourceFiles.back()->randomFill(1000); - remoteFilePaths.push_back(sourceFiles.back()->path()); - // Schedule the archival of the file - cta::common::dataStructures::ArchiveRequest ar; - ar.checksumBlob.insert(cta::checksum::ADLER32, sourceFiles.back()->adler32()); - ar.storageClass=s_storageClassName; - ar.srcURL=std::string("file://") + sourceFiles.back()->path(); - ar.requester.name = requester.username; - ar.requester.group = "group"; - ar.fileSize = 1000; - ar.diskFileID = std::to_string(fseq); - ar.diskFileInfo.path = "y"; - ar.diskFileInfo.owner_uid = DISK_FILE_OWNER_UID; - ar.diskFileInfo.gid = DISK_FILE_GID; - const auto archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, ar.storageClass, ar.requester, logContext); - archiveFileIds.push_back(archiveFileId); - scheduler.queueArchiveWithGivenId(archiveFileId,s_diskInstance,ar,logContext); - } - { - //Create a file with wrong file size in the middle of the jobs - int fseq = 5; - sourceFiles.emplace_back(std::make_unique<unitTests::TempFile>()); - sourceFiles.back()->randomFill(1000); - remoteFilePaths.push_back(sourceFiles.back()->path()); - // Schedule the archival of the file - cta::common::dataStructures::ArchiveRequest ar; - ar.checksumBlob.insert(cta::checksum::ADLER32, sourceFiles.back()->adler32()); - ar.storageClass=s_storageClassName; - ar.srcURL=std::string("file://") + sourceFiles.back()->path(); - ar.requester.name = requester.username; - ar.requester.group = "group"; - ar.fileSize = 900; // Wrong reported size - ar.diskFileID = std::to_string(fseq); - ar.diskFileInfo.path = "y"; - ar.diskFileInfo.owner_uid = DISK_FILE_OWNER_UID; - ar.diskFileInfo.gid = DISK_FILE_GID; - const auto archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, ar.storageClass, ar.requester, logContext); - archiveFileIds.push_back(archiveFileId); - scheduler.queueArchiveWithGivenId(archiveFileId,s_diskInstance,ar,logContext); - } - // Create the rest of the files and schedule the archivals - for(int fseq=6; fseq <= 10 ; fseq ++) { + for (uint64_t fseq=1; fseq <= 10 ; fseq ++) { // Create a source file. sourceFiles.emplace_back(std::make_unique<unitTests::TempFile>()); sourceFiles.back()->randomFill(1000); @@ -2740,7 +2663,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongFilesizeInMiddleOfBatchM ar.srcURL=std::string("file://") + sourceFiles.back()->path(); ar.requester.name = requester.username; ar.requester.group = "group"; - ar.fileSize = 1000; + ar.fileSize = (fseq != problematicFseq) ? 1000 : 900; // 900 is wrong reported size ar.diskFileID = std::to_string(fseq); ar.diskFileInfo.path = "y"; ar.diskFileInfo.owner_uid = DISK_FILE_OWNER_UID; @@ -2786,10 +2709,10 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongFilesizeInMiddleOfBatchM logToCheck += ""; ASSERT_EQ(s_vid, sess.getVid()); auto afiiter = archiveFileIds.begin(); - int fseq = 1; + uint64_t fseq = 1; for(auto & sf: sourceFiles) { auto afi = *(afiiter++); - if (fseq != 5) { + if (fseq != problematicFseq) { // Files queued without the wrong file size made it to the catalogue auto afs = catalogue.ArchiveFile()->getArchiveFileById(afi); ASSERT_EQ(1, afs.tapeFiles.size()); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.cpp index 722f9ccae47b5351c7529b496d52b5715e5b950d..62d8128a9703c3465db36350528d44b07ea21674 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.cpp @@ -213,7 +213,7 @@ void DiskWriteTask::releaseAllBlock(){ //------------------------------------------------------------------------------ // checkErrors //------------------------------------------------------------------------------ - void DiskWriteTask::checkErrors(MemBlock* mb,int blockId,cta::log::LogContext& lc){ + void DiskWriteTask::checkErrors(MemBlock* mb, uint64_t blockId, cta::log::LogContext& lc){ using namespace cta::log; if(m_retrieveJob->retrieveRequest.archiveFileID != mb->m_fileid || blockId != mb->m_fileBlock || mb->isFailed() ){ diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.hpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.hpp index 7dc4ad4bc377b79267e19cc23f6d558c424fbc52..14674169e554a58a1a36e99fb70003745be28b0a 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTask.hpp @@ -91,7 +91,7 @@ private: * @param blockId The block id the mem blopck should be at * @param lc FOr logging */ - void checkErrors(MemBlock* mb,int blockId,cta::log::LogContext& lc); + void checkErrors(MemBlock* mb, uint64_t blockId, cta::log::LogContext& lc); /** * In case of error, it will spin on the blocks until we reach the end diff --git a/tapeserver/castor/tape/tapeserver/daemon/MemBlock.hpp b/tapeserver/castor/tape/tapeserver/daemon/MemBlock.hpp index cf0e035869e59a4359e601f303556fd4c7270234..2a1177c0d7f2c25dbe325a2ecd5b69041efeafa9 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MemBlock.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MemBlock.hpp @@ -77,18 +77,15 @@ public: * @param id the block ID for its whole life * @param capacity the capacity (in byte) of the embed payload */ - MemBlock(const int id, const size_t capacity) : - m_memoryBlockId(id),m_payload(capacity){ - reset(); - } - + MemBlock(const uint32_t id, const uint32_t capacity) : m_memoryBlockId(id), m_payload(capacity) { reset(); } + /** * Get the error message from the context, * Throw an exception if there is no context * @return */ std::string errorMsg() const { - if(m_context.get()) { + if (m_context) { return m_context->m_errorMsg; } @@ -125,7 +122,7 @@ public: * m_failed is true, m_fileBlock and m_tapeFileBlock are set at -1 * Other members do not change */ - void markAsFailed(const std::string msg){ + void markAsFailed(const std::string& msg){ m_context.reset(new AlterationContext(msg,AlterationContext::Failed)); m_fileBlock = -1; m_tapeFileBlock = -1; @@ -153,7 +150,7 @@ public: * Reset all the members. * Numerical ones are set at -1.and m_failed to false. */ - void reset() throw() { + void reset() noexcept { m_fileid = -1; m_fileBlock = -1; m_fSeq = -1; @@ -165,7 +162,7 @@ public: m_context.reset(); } /** Unique memory block id */ - const int m_memoryBlockId; + const uint32_t m_memoryBlockId; /** handle to the raw data to be migrated/recalled */ Payload m_payload; @@ -174,16 +171,16 @@ public: uint64_t m_fileid; /** number of the memory-chunk of the current file we are manipulating */ - int m_fileBlock; + uint64_t m_fileBlock; /** order of file on the tape */ - uint32_t m_fSeq; + uint64_t m_fSeq; /** Sequence number of the first tape block file in this memory block */ - int m_tapeFileBlock; + size_t m_tapeFileBlock; /** Size of the tape blocks, allowing sanity checks on the disk write side in recalls */ - int m_tapeBlockSize; + size_t m_tapeBlockSize; }; diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationMemoryManager.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationMemoryManager.cpp index 8ef0a61de47e539708104338eebb7ff850e7297d..68f44606e4b697062acc56af01ad5f00ffd812ea 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationMemoryManager.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationMemoryManager.cpp @@ -27,14 +27,12 @@ namespace daemon { //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ -MigrationMemoryManager::MigrationMemoryManager(const size_t numberOfBlocks, - const size_t blockSize, cta::log::LogContext lc) -: - m_blockCapacity(blockSize), m_totalNumberOfBlocks(0), - m_totalMemoryAllocated(0), m_blocksProvided(0), - m_blocksReturned(0), m_lc(lc) -{ - for (size_t i = 0; i < numberOfBlocks; i++) { +MigrationMemoryManager::MigrationMemoryManager(const uint32_t numberOfBlocks, const uint32_t blockSize, + const cta::log::LogContext& lc) : +m_blockCapacity(blockSize), +m_totalNumberOfBlocks(0), m_totalMemoryAllocated(0), m_blocksProvided(0), m_blocksReturned(0), m_lc(lc) { + + for (uint32_t i = 0; i < numberOfBlocks; i++) { m_freeBlocks.push(new MemBlock(i, blockSize)); m_totalNumberOfBlocks++; m_totalMemoryAllocated += blockSize; @@ -45,7 +43,7 @@ MigrationMemoryManager::MigrationMemoryManager(const size_t numberOfBlocks, //------------------------------------------------------------------------------ // MigrationMemoryManager::~MigrationMemoryManager //------------------------------------------------------------------------------ -MigrationMemoryManager::~MigrationMemoryManager() throw() { +MigrationMemoryManager::~MigrationMemoryManager() noexcept { // Make sure the thread is finished: this should be done by the caller, // who should have called waitThreads. // castor::server::Thread::wait(); @@ -86,8 +84,7 @@ void MigrationMemoryManager::addClient(DataPipeline* c) //------------------------------------------------------------------------------ // MigrationMemoryManager::areBlocksAllBack //------------------------------------------------------------------------------ -bool MigrationMemoryManager::areBlocksAllBack() -throw(){ +bool MigrationMemoryManager::areBlocksAllBack() noexcept{ return m_totalNumberOfBlocks == m_freeBlocks.size(); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp index bf34aa0a0ba728cbf30335338315d827db435242..a4ce919756e39a99f694e20e05adb74eb9ae3177 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp @@ -46,9 +46,7 @@ public: * @param numberOfBlocks: number of blocks to allocate * @param blockSize: size of each block */ - MigrationMemoryManager(const size_t numberOfBlocks, const size_t blockSize, - cta::log::LogContext lc) - ; + MigrationMemoryManager(const uint32_t numberOfBlocks, const uint32_t blockSize, const cta::log::LogContext& lc); /** * @@ -60,7 +58,7 @@ public: * Are all sheep back to the farm? * @return */ - bool areBlocksAllBack() throw(); + bool areBlocksAllBack() noexcept; /** * Start serving clients (in the dedicated thread) @@ -93,10 +91,9 @@ public: /** * Destructor */ - ~MigrationMemoryManager() throw(); + ~MigrationMemoryManager() noexcept override; + private: - - const size_t m_blockCapacity; /** @@ -144,7 +141,7 @@ private: /** * Thread routine: pops a client and provides him blocks until he is happy! */ - void run() ; + void run() override; }; diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp index 4e42c9c476a3bacc5fc9d18d513b8d497c9c901c..7ddd7283b9f4945e9c893a71616b70938c8c1415 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp @@ -16,221 +16,227 @@ */ #include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp" + #include "castor/tape/tapeserver/daemon/ErrorFlag.hpp" +#include <memory> + using cta::log::LogContext; using cta::log::Param; -namespace castor{ -namespace tape{ -namespace tapeserver{ +namespace castor { +namespace tape { +namespace tapeserver { namespace daemon { //------------------------------------------------------------------------------ //Constructor //------------------------------------------------------------------------------ - MigrationTaskInjector::MigrationTaskInjector(MigrationMemoryManager & mm, - DiskReadThreadPool & diskReader, - TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,cta::ArchiveMount &archiveMount, - uint64_t maxFiles, uint64_t byteSizeThreshold,cta::log::LogContext lc): - m_thread(*this),m_memManager(mm),m_tapeWriter(tapeWriter), - m_diskReader(diskReader),m_archiveMount(archiveMount),m_lc(lc), - m_maxFiles(maxFiles), m_maxBytes(byteSizeThreshold) - { - - } - - +MigrationTaskInjector::MigrationTaskInjector(MigrationMemoryManager& mm, DiskReadThreadPool& diskReader, + TapeSingleThreadInterface<TapeWriteTask>& tapeWriter, + cta::ArchiveMount& archiveMount, uint64_t maxFiles, + uint64_t byteSizeThreshold, const cta::log::LogContext& lc) : +m_thread(*this), +m_memManager(mm), m_tapeWriter(tapeWriter), m_diskReader(diskReader), m_archiveMount(archiveMount), m_lc(lc), +m_maxFiles(maxFiles), m_maxBytes(byteSizeThreshold) {} + //------------------------------------------------------------------------------ //injectBulkMigrations //------------------------------------------------------------------------------ - void MigrationTaskInjector::injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs){ - - const uint64_t blockCapacity = m_memManager.blockCapacity(); - for(auto it= jobs.begin();it!=jobs.end();++it){ - const uint64_t fileSize = (*it)->archiveFile.fileSize; - LogContext::ScopedParam sp[]={ - LogContext::ScopedParam(m_lc, Param("fileId", (*it)->archiveFile.archiveFileID)), - LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->tapeFile.fSeq)), - LogContext::ScopedParam(m_lc, Param("path", (*it)->srcURL)) - }; - tape::utils::suppresUnusedVariable(sp); - - const uint64_t neededBlock = howManyBlocksNeeded(fileSize,blockCapacity); - - // We give owner ship on the archive job to the tape write task (as last user). - // disk read task gets a bare pointer. - // TODO: could be changed as a shared_ptr. - auto archiveJobPtr=it->get(); - std::unique_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock, it->release(), m_memManager, m_errorFlag)); - std::unique_ptr<DiskReadTask> drt; - // We will skip the disk read task creation for zero-length files. Tape write task will handle the request and mark it as an - // error. - if (fileSize) drt.reset(new DiskReadTask(*twt, archiveJobPtr, neededBlock, m_errorFlag)); - - m_tapeWriter.push(twt.release()); - if (fileSize) m_diskReader.push(drt.release()); - m_lc.log(cta::log::INFO, "Created tasks for migrating a file"); - if (!fileSize) - m_lc.log(cta::log::WARNING, "Skipped disk read task creation for zero-length file."); +void MigrationTaskInjector::injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs) { + + const size_t blockCapacity = m_memManager.blockCapacity(); + for (auto& job : jobs) { + const uint64_t fileSize = job->archiveFile.fileSize; + LogContext::ScopedParam sp[] = {LogContext::ScopedParam(m_lc, Param("fileId", job->archiveFile.archiveFileID)), + LogContext::ScopedParam(m_lc, Param("fSeq", job->tapeFile.fSeq)), + LogContext::ScopedParam(m_lc, Param("path", job->srcURL))}; + tape::utils::suppresUnusedVariable(sp); + + const uint64_t neededBlock = howManyBlocksNeeded(fileSize, blockCapacity); + + // We give owner ship on the archive job to the tape write task (as last user). + // disk read task gets a bare pointer. + // TODO: could be changed as a shared_ptr. + auto archiveJobPtr = job.get(); + std::unique_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock, job.release(), m_memManager, m_errorFlag)); + std::unique_ptr<DiskReadTask> drt; + // We will skip the disk read task creation for zero-length files. Tape write task will handle the request and mark it as an + // error. + if (fileSize) { + drt = std::make_unique<DiskReadTask>(*twt, archiveJobPtr, neededBlock, m_errorFlag); + } + + m_tapeWriter.push(twt.release()); + m_lc.log(cta::log::INFO, "Created tasks for migrating a file"); + if (fileSize) { + m_diskReader.push(drt.release()); + } + else { + m_lc.log(cta::log::WARNING, "Skipped disk read task creation for zero-length file."); } - LogContext::ScopedParam(m_lc, Param("numbnerOfFiles", jobs.size())); - m_lc.log(cta::log::INFO, "Finished creating tasks for migrating"); } - + LogContext::ScopedParam sp(m_lc, Param("numberOfFiles", jobs.size())); + m_lc.log(cta::log::INFO, "Finished creating tasks for migrating"); +} + //------------------------------------------------------------------------------ //injectBulkMigrations //------------------------------------------------------------------------------ - void MigrationTaskInjector::waitThreads(){ - m_thread.wait(); - } - +void MigrationTaskInjector::waitThreads() { + m_thread.wait(); +} + //------------------------------------------------------------------------------ //injectBulkMigrations //------------------------------------------------------------------------------ - void MigrationTaskInjector::startThreads(){ - m_thread.start(); - } - +void MigrationTaskInjector::startThreads() { + m_thread.start(); +} + //------------------------------------------------------------------------------ //requestInjection //------------------------------------------------------------------------------ - void MigrationTaskInjector::requestInjection( bool lastCall) { - cta::threading::MutexLocker ml(m_producerProtection); - m_queue.push(Request(m_maxFiles, m_maxBytes, lastCall)); +void MigrationTaskInjector::requestInjection(bool lastCall) { + cta::threading::MutexLocker ml(m_producerProtection); + m_queue.push(Request(m_maxFiles, m_maxBytes, lastCall)); +} - } //------------------------------------------------------------------------------ //synchronousInjection -//------------------------------------------------------------------------------ - bool MigrationTaskInjector::synchronousInjection(bool & noFilesToMigrate) { - std::list<std::unique_ptr<cta::ArchiveJob> > jobs; - noFilesToMigrate = false; - try { - //First popping of files, we multiply the number of popped files / bytes by 2 to avoid multiple mounts on Repack - //(it is applied to ArchiveForUser and ArchiveForRepack batches) - jobs = m_archiveMount.getNextJobBatch(2 * m_maxFiles, 2 * m_maxBytes,m_lc); - } catch (cta::exception::Exception & ex) { - cta::log::ScopedParamContainer scoped(m_lc); - scoped.add("transactionId", m_archiveMount.getMountTransactionId()) - .add("byteSizeThreshold",m_maxBytes) - .add("maxFiles", m_maxFiles) - .add("message", ex.getMessageValue()); - m_lc.log(cta::log::ERR, "Failed to getFilesToMigrate"); - return false; - } - cta::log::ScopedParamContainer scoped(m_lc); - scoped.add("byteSizeThreshold",m_maxBytes) - .add("maxFiles", m_maxFiles); - if(jobs.empty()) { - noFilesToMigrate = true; - m_lc.log(cta::log::WARNING, "No files to migrate: empty mount"); - return false; - } else { - m_firstFseqToWrite = jobs.front()->tapeFile.fSeq; - injectBulkMigrations(jobs); - return true; - } +//------------------------------------------------------------------------------ +bool MigrationTaskInjector::synchronousInjection(bool& noFilesToMigrate) { + std::list<std::unique_ptr<cta::ArchiveJob>> jobs; + noFilesToMigrate = false; + try { + //First popping of files, we multiply the number of popped files / bytes by 2 to avoid multiple mounts on Repack + //(it is applied to ArchiveForUser and ArchiveForRepack batches) + jobs = m_archiveMount.getNextJobBatch(2 * m_maxFiles, 2 * m_maxBytes, m_lc); + } catch (cta::exception::Exception& ex) { + cta::log::ScopedParamContainer scoped(m_lc); + scoped.add("transactionId", m_archiveMount.getMountTransactionId()) + .add("byteSizeThreshold", m_maxBytes) + .add("maxFiles", m_maxFiles) + .add("message", ex.getMessageValue()); + m_lc.log(cta::log::ERR, "Failed to getFilesToMigrate"); + return false; } + cta::log::ScopedParamContainer scoped(m_lc); + scoped.add("byteSizeThreshold", m_maxBytes).add("maxFiles", m_maxFiles); + if (jobs.empty()) { + noFilesToMigrate = true; + m_lc.log(cta::log::WARNING, "No files to migrate: empty mount"); + return false; + } + else { + m_firstFseqToWrite = jobs.front()->tapeFile.fSeq; + injectBulkMigrations(jobs); + return true; + } +} + //------------------------------------------------------------------------------ //finish -//------------------------------------------------------------------------------ - void MigrationTaskInjector::finish(){ - cta::threading::MutexLocker ml(m_producerProtection); - m_queue.push(Request()); - } +//------------------------------------------------------------------------------ +void MigrationTaskInjector::finish() { + cta::threading::MutexLocker ml(m_producerProtection); + m_queue.push(Request()); +} + //------------------------------------------------------------------------------ //signalEndDataMovement -//------------------------------------------------------------------------------ - void MigrationTaskInjector::signalEndDataMovement(){ - //first send the end signal to the threads - m_tapeWriter.finish(); - m_diskReader.finish(); - m_memManager.finish(); - } +//------------------------------------------------------------------------------ +void MigrationTaskInjector::signalEndDataMovement() { + //first send the end signal to the threads + m_tapeWriter.finish(); + m_diskReader.finish(); + m_memManager.finish(); +} //------------------------------------------------------------------------------ //WorkerThread::run //------------------------------------------------------------------------------ - void MigrationTaskInjector::WorkerThread::run(){ - m_parent.m_lc.pushOrReplace(Param("thread", "MigrationTaskInjector")); - m_parent.m_lc.log(cta::log::INFO, "Starting MigrationTaskInjector thread"); - try{ - while(1){ - if(m_parent.m_errorFlag){ - throw castor::tape::tapeserver::daemon::ErrorFlag(); +void MigrationTaskInjector::WorkerThread::run() { + m_parent.m_lc.pushOrReplace(Param("thread", "MigrationTaskInjector")); + m_parent.m_lc.log(cta::log::INFO, "Starting MigrationTaskInjector thread"); + try { + while (true) { + if (m_parent.m_errorFlag) { + throw castor::tape::tapeserver::daemon::ErrorFlag(); + } + Request req = m_parent.m_queue.pop(); + auto jobs = m_parent.m_archiveMount.getNextJobBatch(req.filesRequested, req.bytesRequested, m_parent.m_lc); + uint64_t files = jobs.size(); + uint64_t bytes = 0; + for (auto& j : jobs) bytes += j->archiveFile.fileSize; + if (jobs.empty()) { + if (req.lastCall) { + m_parent.m_lc.log(cta::log::INFO, "No more file to migrate: triggering the end of session."); + m_parent.signalEndDataMovement(); + break; } - Request req = m_parent.m_queue.pop(); - auto jobs = m_parent.m_archiveMount.getNextJobBatch(req.filesRequested, req.bytesRequested, m_parent.m_lc); - uint64_t files=jobs.size(); - uint64_t bytes=0; - for (auto & j:jobs) bytes+=j->archiveFile.fileSize; - if(jobs.empty()){ - if (req.lastCall) { - m_parent.m_lc.log(cta::log::INFO,"No more file to migrate: triggering the end of session."); - m_parent.signalEndDataMovement(); - break; - } else { - m_parent.m_lc.log(cta::log::INFO,"In MigrationTaskInjector::WorkerThread::run(): got empty list, but not last call"); - } - } else { - - // Inject the tasks - m_parent.injectBulkMigrations(jobs); - // Decide on continuation - if(files < req.filesRequested / 2 && bytes < req.bytesRequested) { - // The client starts to dribble files at a low rate. Better finish - // the session now, so we get a clean batch on a later mount. - cta::log::ScopedParamContainer params(m_parent.m_lc); - params.add("filesRequested", req.filesRequested) - .add("bytesRequested", req.bytesRequested) - .add("filesReceived", files) - .add("bytesReceived", bytes); - m_parent.m_lc.log(cta::log::INFO, "Got less than half the requested work to do: triggering the end of session."); - m_parent.signalEndDataMovement(); - break; - } + else { + m_parent.m_lc.log(cta::log::INFO, + "In MigrationTaskInjector::WorkerThread::run(): got empty list, but not last call"); } - } //end of while(1) - }//end of try - catch(const castor::tape::tapeserver::daemon::ErrorFlag&){ - //we end up there because a task screw up somewhere - m_parent.m_lc.log(cta::log::INFO,"In MigrationTaskInjector::WorkerThread::run(): a task failed, " - "indicating finish of run"); - - m_parent.signalEndDataMovement(); - } - catch(const cta::exception::Exception& ex){ - //we end up there because we could not talk to the client - - cta::log::ScopedParamContainer container( m_parent.m_lc); - container.add("exception message",ex.getMessageValue()); - m_parent.m_lc.logBacktrace(cta::log::INFO, ex.backtrace()); - m_parent.m_lc.log(cta::log::ERR,"In MigrationTaskInjector::WorkerThread::run(): " - "could not retrieve a list of file to migrate, indicating finish of run"); - - m_parent.signalEndDataMovement(); - } - //------------- - m_parent.m_lc.log(cta::log::INFO, "Finishing MigrationTaskInjector thread"); - /* We want to finish at the first lastCall we encounter. + } + else { + + // Inject the tasks + m_parent.injectBulkMigrations(jobs); + // Decide on continuation + if (files < req.filesRequested / 2 && bytes < req.bytesRequested) { + // The client starts to dribble files at a low rate. Better finish + // the session now, so we get a clean batch on a later mount. + cta::log::ScopedParamContainer params(m_parent.m_lc); + params.add("filesRequested", req.filesRequested) + .add("bytesRequested", req.bytesRequested) + .add("filesReceived", files) + .add("bytesReceived", bytes); + m_parent.m_lc.log(cta::log::INFO, + "Got less than half the requested work to do: triggering the end of session."); + m_parent.signalEndDataMovement(); + break; + } + } + } //end of while(1) + } catch (const castor::tape::tapeserver::daemon::ErrorFlag&) { + //we end up there because a task screw up somewhere + m_parent.m_lc.log(cta::log::INFO, "In MigrationTaskInjector::WorkerThread::run(): a task failed, " + "indicating finish of run"); + + m_parent.signalEndDataMovement(); + } catch (const cta::exception::Exception& ex) { + //we end up there because we could not talk to the client + + cta::log::ScopedParamContainer container(m_parent.m_lc); + container.add("exception message", ex.getMessageValue()); + m_parent.m_lc.logBacktrace(cta::log::INFO, ex.backtrace()); + m_parent.m_lc.log(cta::log::ERR, "In MigrationTaskInjector::WorkerThread::run(): " + "could not retrieve a list of file to migrate, indicating finish of run"); + + m_parent.signalEndDataMovement(); + } + //------------- + m_parent.m_lc.log(cta::log::INFO, "Finishing MigrationTaskInjector thread"); + /* We want to finish at the first lastCall we encounter. * But even after sending finish() to m_diskWriter and to m_tapeReader, * m_diskWriter might still want some more task (the threshold could be crossed), * so we discard everything that might still be in the queue */ - bool stillReading =true; - while(stillReading) { - Request req = m_parent.m_queue.pop(); - if (req.end) stillReading = false; - LogContext::ScopedParam sp(m_parent.m_lc, Param("lastCall", req.lastCall)); - m_parent.m_lc.log(cta::log::INFO,"In MigrationTaskInjector::WorkerThread::run(): popping extra request"); - } + bool stillReading = true; + while (stillReading) { + Request req = m_parent.m_queue.pop(); + if (req.end) stillReading = false; + LogContext::ScopedParam sp(m_parent.m_lc, Param("lastCall", req.lastCall)); + m_parent.m_lc.log(cta::log::INFO, "In MigrationTaskInjector::WorkerThread::run(): popping extra request"); } +} - uint64_t MigrationTaskInjector::firstFseqToWrite() const { - return m_firstFseqToWrite; - } -} //end namespace daemon -} //end namespace tapeserver -} //end namespace tape -} //end namespace castor +uint64_t MigrationTaskInjector::firstFseqToWrite() const { + return m_firstFseqToWrite; +} +} //end namespace daemon +} //end namespace tapeserver +} //end namespace tape +} //end namespace castor diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp index 15100e19f60d74cdee8b2a7058c3c0e9f985ec5a..f18457ad6008be76e6de24919961c48d19b52cae 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp @@ -55,7 +55,7 @@ public: MigrationTaskInjector(MigrationMemoryManager & mm, DiskReadThreadPool & diskReader, TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,cta::ArchiveMount &archiveMount, - uint64_t maxFiles, uint64_t byteSizeThreshold,cta::log::LogContext lc); + uint64_t maxFiles, uint64_t byteSizeThreshold,const cta::log::LogContext& lc); /** * Wait for the inner thread to finish @@ -122,8 +122,9 @@ private: void injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs); /*Compute how many blocks are needed for a file of fileSize bytes*/ - size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){ - return fileSize/blockCapacity + ((fileSize%blockCapacity==0) ? 0 : 1); + uint64_t howManyBlocksNeeded(uint64_t fileSize, size_t blockCapacity){ + const auto extraBlock = ((fileSize%blockCapacity) == 0) ? 0 : 1; + return fileSize/blockCapacity + extraBlock; } /** diff --git a/tapeserver/castor/tape/tapeserver/daemon/Payload.hpp b/tapeserver/castor/tape/tapeserver/daemon/Payload.hpp index 59bf65acfe5f3133bfe58b9c59b4fcc70a4f4702..016eec3a4600253cf59f0c9f8f76f8e5d3a57d5b 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/Payload.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/Payload.hpp @@ -41,13 +41,12 @@ class Payload Payload(const Payload&); Payload& operator=(const Payload&); public: - Payload(size_t capacity): - m_data(new (std::nothrow) unsigned char[capacity]),m_totalCapacity(capacity),m_size(0) { - if(nullptr == m_data) { + Payload(uint32_t capacity) : m_data(new(std::nothrow) unsigned char[capacity]), m_totalCapacity(capacity), m_size(0) { + if (nullptr == m_data) { throw cta::exception::MemException("Failed to allocate memory for a new MemBlock!"); } } - + ~Payload(){ delete[] m_data; } diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeReadTask.hpp b/tapeserver/castor/tape/tapeserver/daemon/TapeReadTask.hpp index de5022ee851e65d5793f4e8ba29d293317ab2a16..41d4ffe34405fc66531c510f02006af44f7f6b32 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeReadTask.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeReadTask.hpp @@ -85,8 +85,8 @@ public: bool stillReading = true; //for counting how many mem blocks have used and how many tape blocks //(because one mem block can hold several tape blocks - int fileBlock = 0; - int tapeBlock = 0; + uint64_t fileBlock = 0; + size_t tapeBlock = 0; // This out-of-try-catch variables allows us to record the stage of the // process we're in, and to count the error if it occurs. // We will not record errors for an empty string. This will allow us to diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp index 432dc4d79508bfe3f779d3ac60b7c34a38029b6e..74c6aa510b2487ac0d15d83091b5e199b36fc539 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.cpp @@ -15,8 +15,7 @@ * submit itself to any jurisdiction. */ -#include <memory> -#include <string> +#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp" #include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp" #include "castor/tape/tapeserver/daemon/DataConsumer.hpp" @@ -26,11 +25,13 @@ #include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp" #include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp" #include "castor/tape/tapeserver/daemon/TapeSessionStats.hpp" -#include "castor/tape/tapeserver/daemon/TapeWriteTask.hpp" #include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp" #include "common/exception/Errnum.hpp" #include "common/exception/Exception.hpp" +#include <memory> +#include <string> + namespace castor { namespace tape { namespace tapeserver { @@ -39,104 +40,75 @@ namespace daemon { //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ - TapeWriteTask::TapeWriteTask(int blockCount, cta::ArchiveJob *archiveJob, - MigrationMemoryManager& mm,cta::threading::AtomicFlag& errorFlag): - m_archiveJob(archiveJob),m_memManager(mm), m_fifo(blockCount), - m_blockCount(blockCount),m_errorFlag(errorFlag), - m_archiveFile(m_archiveJob->archiveFile), m_tapeFile(m_archiveJob->tapeFile), - m_srcURL(m_archiveJob->srcURL) - { - //register its fifo to the memory manager as a client in order to get mem block - // This should not be done in the case of a zero length file. - if (archiveJob->archiveFile.fileSize) mm.addClient(&m_fifo); +TapeWriteTask::TapeWriteTask(uint64_t blockCount, cta::ArchiveJob* archiveJob, MigrationMemoryManager& mm, + cta::threading::AtomicFlag& errorFlag) : +m_archiveJob(archiveJob), +m_memManager(mm), m_fifo(blockCount), m_blockCount(blockCount), m_errorFlag(errorFlag), +m_archiveFile(m_archiveJob->archiveFile), m_tapeFile(m_archiveJob->tapeFile), m_srcURL(m_archiveJob->srcURL) { + // Register its fifo to the memory manager as a client in order to get mem block + // This should not be done in the case of a zero length file. + if (archiveJob->archiveFile.fileSize) { + mm.addClient(&m_fifo); } +} + //------------------------------------------------------------------------------ // fileSize //------------------------------------------------------------------------------ - uint64_t TapeWriteTask::fileSize() { - return m_archiveFile.fileSize; - } +uint64_t TapeWriteTask::fileSize() { + return m_archiveFile.fileSize; +} + //------------------------------------------------------------------------------ // execute -//------------------------------------------------------------------------------ - void TapeWriteTask::execute(const std::unique_ptr<castor::tape::tapeFile::WriteSession> &session, - MigrationReportPacker & reportPacker, MigrationWatchDog & watchdog, - cta::log::LogContext& lc, cta::utils::Timer & timer) { - using cta::log::LogContext; - using cta::log::Param; - using cta::log::ScopedParamContainer; - // Add to our logs the informations on the file - ScopedParamContainer params(lc); - params.add("fileId",m_archiveJob->archiveFile.archiveFileID) - .add("fileSize",m_archiveJob->archiveFile.fileSize) - .add("fSeq",m_archiveJob->tapeFile.fSeq) - .add("diskURL",m_archiveJob->srcURL); - - // We will clock the stats for the file itself, and eventually add those - // stats to the session's. - cta::utils::Timer localTime; - unsigned long ckSum = Payload::zeroAdler32(); - uint32_t memBlockId = 0; - - // This out-of-try-catch variables allows us to record the stage of the - // process we're in, and to count the error if it occurs. - // We will not record errors for an empty string. This will allow us to - // prevent counting where error happened upstream. - std::string currentErrorToCount = "Error_tapeFSeqOutOfSequenceForWrite"; - session->validateNextFSeq(m_archiveJob->tapeFile.fSeq); - try { - //try to open the session - currentErrorToCount = "Error_tapeWriteHeader"; - watchdog.notifyBeginNewJob(m_archiveJob->archiveFile.archiveFileID, m_archiveJob->tapeFile.fSeq); - std::unique_ptr<castor::tape::tapeFile::FileWriter> output(openFileWriter(session,lc)); - m_LBPMode = output->getLBPMode(); - m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter); - m_taskStats.headerVolume += TapeSessionStats::headerVolumePerFile; - // We are not error sources here until we actually write. - currentErrorToCount = ""; - bool firstBlock = true; - while(!m_fifo.finished()) { - MemBlock* const mb = m_fifo.popDataBlock(); - m_taskStats.waitDataTime += timer.secs(cta::utils::Timer::resetCounter); - AutoReleaseBlock<MigrationMemoryManager> releaser(mb,m_memManager); - - // Special treatment for 1st block. If disk failed to provide anything, we can skip the file - // by leaving a placeholder on the tape (at minimal tape space cost), so we can continue - // the tape session (and save a tape mount!). - if (firstBlock && mb->isFailed()) { - currentErrorToCount = "Error_tapeWriteData"; - const char blank[]="This file intentionally left blank: leaving placeholder after failing to read from disk."; - output->write(blank, sizeof(blank)); - m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter); - watchdog.notify(sizeof(blank)); - currentErrorToCount = "Error_tapeWriteTrailer"; - output->close(); - currentErrorToCount = ""; - // Possibly failing writes are finished. We can continue this in catch for skip. outside of the loop. - throw Skip(mb->errorMsg()); - } - firstBlock = false; - - //will throw (thus exiting the loop) if something is wrong - checkErrors(mb,memBlockId,lc); - - ckSum = mb->m_payload.adler32(ckSum); - m_taskStats.checksumingTime += timer.secs(cta::utils::Timer::resetCounter); - currentErrorToCount = "Error_tapeWriteData"; - mb->m_payload.write(*output); - currentErrorToCount = ""; - - m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter); - m_taskStats.dataVolume += mb->m_payload.size(); - watchdog.notify(mb->m_payload.size()); - ++memBlockId; - } - - // If, after the FIFO is finished, we are still in the first block, we are in the presence of a 0-length file. - // This also requires a placeholder. - if (firstBlock) { +//------------------------------------------------------------------------------ +void TapeWriteTask::execute(const std::unique_ptr<castor::tape::tapeFile::WriteSession>& session, + MigrationReportPacker& reportPacker, MigrationWatchDog& watchdog, cta::log::LogContext& lc, + cta::utils::Timer& timer) { + using cta::log::LogContext; + using cta::log::Param; + using cta::log::ScopedParamContainer; + // Add to our logs the informations on the file + ScopedParamContainer params(lc); + params.add("fileId", m_archiveJob->archiveFile.archiveFileID) + .add("fileSize", m_archiveJob->archiveFile.fileSize) + .add("fSeq", m_archiveJob->tapeFile.fSeq) + .add("diskURL", m_archiveJob->srcURL); + + // We will clock the stats for the file itself, and eventually add those + // stats to the session's. + cta::utils::Timer localTime; + unsigned long ckSum = Payload::zeroAdler32(); + uint64_t memBlockId = 0; + + // This out-of-try-catch variables allows us to record the stage of the + // process we're in, and to count the error if it occurs. + // We will not record errors for an empty string. This will allow us to + // prevent counting where error happened upstream. + std::string currentErrorToCount = "Error_tapeFSeqOutOfSequenceForWrite"; + session->validateNextFSeq(m_archiveJob->tapeFile.fSeq); + try { + // Try to open the session + currentErrorToCount = "Error_tapeWriteHeader"; + watchdog.notifyBeginNewJob(m_archiveJob->archiveFile.archiveFileID, m_archiveJob->tapeFile.fSeq); + std::unique_ptr<castor::tape::tapeFile::FileWriter> output(openFileWriter(session, lc)); + m_LBPMode = output->getLBPMode(); + m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter); + m_taskStats.headerVolume += TapeSessionStats::headerVolumePerFile; + // We are not error sources here until we actually write. + currentErrorToCount = ""; + bool firstBlock = true; + while (!m_fifo.finished()) { + MemBlock* const mb = m_fifo.popDataBlock(); + m_taskStats.waitDataTime += timer.secs(cta::utils::Timer::resetCounter); + AutoReleaseBlock<MigrationMemoryManager> releaser(mb, m_memManager); + + // Special treatment for 1st block. If disk failed to provide anything, we can skip the file + // by leaving a placeholder on the tape (at minimal tape space cost), so we can continue + // the tape session (and save a tape mount!). + if (firstBlock && mb->isFailed()) { currentErrorToCount = "Error_tapeWriteData"; - const char blank[]="This file intentionally left blank: zero-length file cannot be recorded to tape."; + const char blank[] = "This file intentionally left blank: leaving placeholder after failing to read from disk."; output->write(blank, sizeof(blank)); m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter); watchdog.notify(sizeof(blank)); @@ -144,242 +116,277 @@ namespace daemon { output->close(); currentErrorToCount = ""; // Possibly failing writes are finished. We can continue this in catch for skip. outside of the loop. - throw Skip("In TapeWriteTask::execute(): inserted a placeholder for zero length file."); + throw Skip(mb->errorMsg()); } - - //finish the writing of the file on tape - //put the trailer - currentErrorToCount = "Error_tapeWriteTrailer"; - output->close(); + firstBlock = false; + + // Will throw (thus exiting the loop) if something is wrong + checkErrors(mb, memBlockId, lc); + + ckSum = mb->m_payload.adler32(ckSum); + m_taskStats.checksumingTime += timer.secs(cta::utils::Timer::resetCounter); + currentErrorToCount = "Error_tapeWriteData"; + mb->m_payload.write(*output); currentErrorToCount = ""; + m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter); - m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile; - m_taskStats.filesCount ++; - // Record the fSeq in the tape session - session->reportWrittenFSeq(m_archiveJob->tapeFile.fSeq); - m_archiveJob->tapeFile.checksumBlob.insert(cta::checksum::ADLER32, ckSum); - m_archiveJob->tapeFile.fileSize = m_taskStats.dataVolume; - m_archiveJob->tapeFile.blockId = output->getBlockId(); - reportPacker.reportCompletedJob(std::move(m_archiveJob), lc); - m_taskStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter); - m_taskStats.totalTime = localTime.secs(); - // Log the successful transfer - logWithStats(cta::log::INFO, "File successfully transmitted to drive",lc); - } - catch(const castor::tape::tapeserver::daemon::ErrorFlag&){ - // We end up there because another task has failed - // so we just log, circulate blocks and don't even send a report - lc.log(cta::log::DEBUG,"TapeWriteTask: a previous file has failed for migration " - "Do nothing except circulating blocks"); - circulateMemBlocks(); - - // We throw again because we want TWST to stop all tasks from execution - // and go into a degraded mode operation. - throw; + m_taskStats.dataVolume += mb->m_payload.size(); + watchdog.notify(mb->m_payload.size()); + ++memBlockId; } - catch(const Skip& s) { - // We failed to read anything from the file. We can get rid of any block from the queue to - // recycle them, and pass the report to the report packer. After than, we can carry on with - // the write session-> - circulateMemBlocks(); - watchdog.addToErrorCount("Info_fileSkipped"); + + // If, after the FIFO is finished, we are still in the first block, we are in the presence of a 0-length file. + // This also requires a placeholder. + if (firstBlock) { + currentErrorToCount = "Error_tapeWriteData"; + const char blank[] = "This file intentionally left blank: zero-length file cannot be recorded to tape."; + output->write(blank, sizeof(blank)); m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter); - m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile; - m_taskStats.filesCount ++; - // Record the fSeq in the tape session - session->reportWrittenFSeq(m_archiveJob->tapeFile.fSeq); - reportPacker.reportSkippedJob(std::move(m_archiveJob), s, lc); - m_taskStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter); - m_taskStats.totalTime = localTime.secs(); - // Log the successful transfer - logWithStats(cta::log::INFO, "Left placeholder on tape after skipping unreadable file.", lc); - } catch(const RecoverableMigrationErrorException &e) { - //The disk reading failed due to a size missmatch or wrong checksum - //just want to report a failed job and proceed with the mount - if(currentErrorToCount.size()) { + watchdog.notify(sizeof(blank)); + currentErrorToCount = "Error_tapeWriteTrailer"; + output->close(); + currentErrorToCount = ""; + // Possibly failing writes are finished. We can continue this in catch for skip. outside of the loop. + throw Skip("In TapeWriteTask::execute(): inserted a placeholder for zero length file."); + } + + // Finish the writing of the file on tape + // ut the trailer + currentErrorToCount = "Error_tapeWriteTrailer"; + output->close(); + currentErrorToCount = ""; + m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter); + m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile; + m_taskStats.filesCount++; + // Record the fSeq in the tape session + session->reportWrittenFSeq(m_archiveJob->tapeFile.fSeq); + m_archiveJob->tapeFile.checksumBlob.insert(cta::checksum::ADLER32, ckSum); + m_archiveJob->tapeFile.fileSize = m_taskStats.dataVolume; + m_archiveJob->tapeFile.blockId = output->getBlockId(); + reportPacker.reportCompletedJob(std::move(m_archiveJob), lc); + m_taskStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter); + m_taskStats.totalTime = localTime.secs(); + // Log the successful transfer + logWithStats(cta::log::INFO, "File successfully transmitted to drive", lc); + } catch (const castor::tape::tapeserver::daemon::ErrorFlag&) { + // We end up there because another task has failed + // so we just log, circulate blocks and don't even send a report + lc.log(cta::log::DEBUG, "TapeWriteTask: a previous file has failed for migration " + "Do nothing except circulating blocks"); + circulateMemBlocks(); + + // We throw again because we want TWST to stop all tasks from execution + // and go into a degraded mode operation. + throw; + } catch (const Skip& s) { + // We failed to read anything from the file. We can get rid of any block from the queue to + // recycle them, and pass the report to the report packer. After than, we can carry on with + // the write session-> + circulateMemBlocks(); + watchdog.addToErrorCount("Info_fileSkipped"); + m_taskStats.readWriteTime += timer.secs(cta::utils::Timer::resetCounter); + m_taskStats.headerVolume += TapeSessionStats::trailerVolumePerFile; + m_taskStats.filesCount++; + // Record the fSeq in the tape session + session->reportWrittenFSeq(m_archiveJob->tapeFile.fSeq); + reportPacker.reportSkippedJob(std::move(m_archiveJob), s, lc); + m_taskStats.waitReportingTime += timer.secs(cta::utils::Timer::resetCounter); + m_taskStats.totalTime = localTime.secs(); + // Log the successful transfer + logWithStats(cta::log::INFO, "Left placeholder on tape after skipping unreadable file.", lc); + } catch (const RecoverableMigrationErrorException& e) { + // The disk reading failed due to a size mismatch or wrong checksum + // just want to report a failed job and proceed with the mount + if (!currentErrorToCount.empty()) { + watchdog.addToErrorCount(currentErrorToCount); + } + // Log and circulate blocks + LogContext::ScopedParam sp(lc, Param("exceptionCode", cta::log::ERR)); + LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue())); + lc.log(cta::log::ERR, "An error occurred for this file, but migration will proceed as error is recoverable"); + circulateMemBlocks(); + // Record the fSeq in the tape session + session->reportWrittenFSeq(m_archiveJob->tapeFile.fSeq); + reportPacker.reportFailedJob(std::move(m_archiveJob), e, lc); + lc.log(cta::log::INFO, "Left placeholder on tape after skipping unreadable file."); + return; + + } catch (const cta::exception::Exception& e) { + // We can end up there because + // We failed to open the FileWriter + // We received a bad block or a block written failed + // close failed + + // First set the error flag: we can't proceed any further with writes. + m_errorFlag.set(); + + // If we reached the end of tape, this is not an error (ENOSPC) + try { + // If it's not the error we're looking for, we will go about our business + // in the catch section. dynamic cast will throw, and we'll do ourselves + // if the error code is not the one we want. + const auto& en = dynamic_cast<const cta::exception::Errnum&>(e); + if (en.errorNumber() != ENOSPC) { + throw; + } + // This is indeed the end of the tape. Not an error. + watchdog.setErrorCount("Info_tapeFilledUp", 1); + reportPacker.reportTapeFull(lc); + } catch (...) { + // The error is not an ENOSPC, so it is, indeed, an error. + // If we got here with a new error, currentErrorToCount will be non-empty, + // and we will pass the error name to the watchdog. + if (!currentErrorToCount.empty()) { watchdog.addToErrorCount(currentErrorToCount); } - //log and circulate blocks - LogContext::ScopedParam sp(lc, Param("exceptionCode", cta::log::ERR)); - LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue())); - lc.log( cta::log::ERR,"An error occurred for this file, but migration will proceed as error is recoverable"); - circulateMemBlocks(); - reportPacker.reportFailedJob(std::move(m_archiveJob),e, lc); - return; - - } catch(const cta::exception::Exception& e){ - //we can end up there because - //we failed to open the FileWriter - //we received a bad block or a block written failed - //close failed - - //first set the error flag: we can't proceed any further with writes. - m_errorFlag.set(); - - // If we reached the end of tape, this is not an error (ENOSPC) - try { - // If it's not the error we're looking for, we will go about our business - // in the catch section. dynamic cast will throw, and we'll do ourselves - // if the error code is not the one we want. - const cta::exception::Errnum & en = - dynamic_cast<const cta::exception::Errnum &>(e); - if(en.errorNumber()!= ENOSPC) { - throw 0; - } - // This is indeed the end of the tape. Not an error. - watchdog.setErrorCount("Info_tapeFilledUp",1); - reportPacker.reportTapeFull(lc); - } catch (...) { - // The error is not an ENOSPC, so it is, indeed, an error. - // If we got here with a new error, currentErrorToCount will be non-empty, - // and we will pass the error name to the watchdog. - if(currentErrorToCount.size()) { - watchdog.addToErrorCount(currentErrorToCount); - } + } + + // Log and circulate blocks + // We want to report internal error most of the time to avoid wrong + // interpretation down the chain. Nevertheless, if the exception + // if of type Errnum AND the errorCode is ENOSPC, we will propagate it. + // This is how we communicate the fact that a tape is full to the client. + // We also change the log level to INFO for the case of end of tape. + int errorLevel = cta::log::ERR; + bool doReportJobError = true; + try { + const auto& errnum = dynamic_cast<const cta::exception::Errnum&>(e); + if (ENOSPC == errnum.errorNumber()) { + errorLevel = cta::log::INFO; + doReportJobError = false; } + } catch (...) {} + LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue())); + lc.log(errorLevel, "An error occurred for this file. End of migrations."); + circulateMemBlocks(); + if (doReportJobError) reportPacker.reportFailedJob(std::move(m_archiveJob), e, lc); + + // We throw again because we want TWST to stop all tasks from execution + // and go into a degraded mode operation. + throw; + } + watchdog.fileFinished(); +} - //log and circulate blocks - // We want to report internal error most of the time to avoid wrong - // interpretation down the chain. Nevertheless, if the exception - // if of type Errnum AND the errorCode is ENOSPC, we will propagate it. - // This is how we communicate the fact that a tape is full to the client. - // We also change the log level to INFO for the case of end of tape. - int errorLevel = cta::log::ERR; - bool doReportJobError = true; - try { - const auto & errnum = dynamic_cast<const cta::exception::Errnum &> (e); - if (ENOSPC == errnum.errorNumber()) { - errorLevel = cta::log::INFO; - doReportJobError = false; - } - } catch (...) {} - LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue())); - lc.log(errorLevel,"An error occurred for this file. End of migrations."); - circulateMemBlocks(); - if (doReportJobError) reportPacker.reportFailedJob(std::move(m_archiveJob),e, lc); - - //we throw again because we want TWST to stop all tasks from execution - //and go into a degraded mode operation. - throw; - } - watchdog.fileFinished(); - } //------------------------------------------------------------------------------ // getFreeBlock -//------------------------------------------------------------------------------ - MemBlock * TapeWriteTask::getFreeBlock() { - return m_fifo.getFreeBlock(); - } +//------------------------------------------------------------------------------ +MemBlock* TapeWriteTask::getFreeBlock() { + return m_fifo.getFreeBlock(); +} + //------------------------------------------------------------------------------ // checkErrors -//------------------------------------------------------------------------------ - void TapeWriteTask::checkErrors(MemBlock* mb,int memBlockId,cta::log::LogContext& lc){ - using namespace cta::log; - if(m_archiveJob->archiveFile.archiveFileID != mb->m_fileid - || memBlockId != mb->m_fileBlock - || mb->isFailed() - || mb->isCanceled()) { - LogContext::ScopedParam sp[]={ - LogContext::ScopedParam(lc, Param("received_archiveFileID", mb->m_fileid)), - LogContext::ScopedParam(lc, Param("expected_NSBLOCKId", memBlockId)), - LogContext::ScopedParam(lc, Param("received_NSBLOCKId", mb->m_fileBlock)), - LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed())) - }; - tape::utils::suppresUnusedVariable(sp); - std::string errorMsg; - if(mb->isFailed()){ - //blocks are marked as failed by the DiskReadTask due to a size mismatch - //or wrong checksums - //both errors should just result in skipping the migration of the file - //so we use a different exception to distinguish this case - errorMsg=mb->errorMsg(); - m_errorFlag.set(); - LogContext::ScopedParam sp1(lc, Param("errorMessage", errorMsg)); - lc.log(cta::log::ERR, "Error while reading a file"); - throw RecoverableMigrationErrorException(errorMsg); - } else if (mb->isCanceled()) { - errorMsg="Received a block marked as cancelled"; - } else{ - errorMsg="Mismatch between expected and received file id or blockid"; - } - // Set the error flag for the session (in case of mismatch) +//------------------------------------------------------------------------------ +void TapeWriteTask::checkErrors(MemBlock* mb, uint64_t memBlockId, cta::log::LogContext& lc) { + using namespace cta::log; + if (m_archiveJob->archiveFile.archiveFileID != mb->m_fileid || memBlockId != mb->m_fileBlock || mb->isFailed() + || mb->isCanceled()) { + LogContext::ScopedParam sp[] = {LogContext::ScopedParam(lc, Param("received_archiveFileID", mb->m_fileid)), + LogContext::ScopedParam(lc, Param("expected_NSBLOCKId", memBlockId)), + LogContext::ScopedParam(lc, Param("received_NSBLOCKId", mb->m_fileBlock)), + LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed()))}; + tape::utils::suppresUnusedVariable(sp); + std::string errorMsg; + if (mb->isFailed()) { + //blocks are marked as failed by the DiskReadTask due to a size mismatch + //or wrong checksums + //both errors should just result in skipping the migration of the file + //so we use a different exception to distinguish this case + errorMsg = mb->errorMsg(); m_errorFlag.set(); - lc.log(cta::log::ERR,errorMsg); - throw cta::exception::Exception(errorMsg); + LogContext::ScopedParam sp1(lc, Param("errorMessage", errorMsg)); + lc.log(cta::log::ERR, "Error while reading a file"); + throw RecoverableMigrationErrorException(errorMsg); + } + else if (mb->isCanceled()) { + errorMsg = "Received a block marked as cancelled"; + } + else { + errorMsg = "Mismatch between expected and received file id or blockid"; } + // Set the error flag for the session (in case of mismatch) + m_errorFlag.set(); + lc.log(cta::log::ERR, errorMsg); + throw cta::exception::Exception(errorMsg); } +} + //------------------------------------------------------------------------------ // pushDataBlock -//------------------------------------------------------------------------------ - void TapeWriteTask::pushDataBlock(MemBlock *mb) { - cta::threading::MutexLocker ml(m_producerProtection); - m_fifo.pushDataBlock(mb); - } - +//------------------------------------------------------------------------------ +void TapeWriteTask::pushDataBlock(MemBlock* mb) { + cta::threading::MutexLocker ml(m_producerProtection); + m_fifo.pushDataBlock(mb); +} + //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ - TapeWriteTask::~TapeWriteTask() { - cta::threading::MutexLocker ml(m_producerProtection); - } +TapeWriteTask::~TapeWriteTask() { + cta::threading::MutexLocker ml(m_producerProtection); +} + //------------------------------------------------------------------------------ // openFileWriter //------------------------------------------------------------------------------ - std::unique_ptr<tapeFile::FileWriter> TapeWriteTask::openFileWriter( - const std::unique_ptr<tape::tapeFile::WriteSession>& session, cta::log::LogContext& lc) { - std::unique_ptr<tape::tapeFile::FileWriter> output; - try { - const uint64_t tapeBlockSize = 256*1024; - output.reset(new tape::tapeFile::FileWriter(session, *m_archiveJob, tapeBlockSize)); - lc.log(cta::log::DEBUG, "Successfully opened the tape file for writing"); - } - catch (const cta::exception::Exception & ex) { - cta::log::LogContext::ScopedParam sp(lc, cta::log::Param("exceptionMessage", ex.getMessageValue())); - lc.log(cta::log::ERR, "Failed to open tape file for writing"); - throw; - } - return output; +std::unique_ptr<tapeFile::FileWriter> + TapeWriteTask::openFileWriter(const std::unique_ptr<tape::tapeFile::WriteSession>& session, + cta::log::LogContext& lc) { + std::unique_ptr<tape::tapeFile::FileWriter> output; + try { + const uint64_t tapeBlockSize = 256 * 1024; + output = std::make_unique<tape::tapeFile::FileWriter>(session, *m_archiveJob, tapeBlockSize); + lc.log(cta::log::DEBUG, "Successfully opened the tape file for writing"); + } catch (const cta::exception::Exception& ex) { + cta::log::LogContext::ScopedParam sp(lc, cta::log::Param("exceptionMessage", ex.getMessageValue())); + lc.log(cta::log::ERR, "Failed to open tape file for writing"); + throw; } + return output; +} + //------------------------------------------------------------------------------ // circulateMemBlocks //------------------------------------------------------------------------------ - void TapeWriteTask::circulateMemBlocks() { - while (!m_fifo.finished()) { - m_memManager.releaseBlock(m_fifo.popDataBlock()); - // watchdog.notify(); - } +void TapeWriteTask::circulateMemBlocks() { + while (!m_fifo.finished()) { + m_memManager.releaseBlock(m_fifo.popDataBlock()); + // watchdog.notify(); } +} - void TapeWriteTask::logWithStats(int level, const std::string& msg, - cta::log::LogContext& lc) const { - cta::log::ScopedParamContainer params(lc); - params.add("readWriteTime", m_taskStats.readWriteTime) - .add("checksumingTime", m_taskStats.checksumingTime) - .add("waitDataTime", m_taskStats.waitDataTime) - .add("waitReportingTime", m_taskStats.waitReportingTime) - .add("transferTime", m_taskStats.transferTime()) - .add("totalTime", m_taskStats.totalTime) - .add("dataVolume", m_taskStats.dataVolume) - .add("headerVolume", m_taskStats.headerVolume) - .add("driveTransferSpeedMBps", m_taskStats.totalTime? - 1.0*(m_taskStats.dataVolume+m_taskStats.headerVolume) - /1000/1000/m_taskStats.totalTime:0.0) - .add("payloadTransferSpeedMBps", m_taskStats.totalTime? - 1.0*m_taskStats.dataVolume/1000/1000/m_taskStats.totalTime:0.0) - .add("fileSize", m_archiveFile.fileSize) - .add("fileId", m_archiveFile.archiveFileID) - .add("fSeq", m_tapeFile.fSeq) - .add("reconciliationTime", m_archiveFile.reconciliationTime) - .add("LBPMode", m_LBPMode); +void TapeWriteTask::logWithStats(int level, const std::string& msg, cta::log::LogContext& lc) const { + cta::log::ScopedParamContainer params(lc); + params.add("readWriteTime", m_taskStats.readWriteTime) + .add("checksumingTime", m_taskStats.checksumingTime) + .add("waitDataTime", m_taskStats.waitDataTime) + .add("waitReportingTime", m_taskStats.waitReportingTime) + .add("transferTime", m_taskStats.transferTime()) + .add("totalTime", m_taskStats.totalTime) + .add("dataVolume", m_taskStats.dataVolume) + .add("headerVolume", m_taskStats.headerVolume) + .add("driveTransferSpeedMBps", m_taskStats.totalTime ? 1.0 * (m_taskStats.dataVolume + m_taskStats.headerVolume) + / 1000 / 1000 / m_taskStats.totalTime : + 0.0) + .add("payloadTransferSpeedMBps", + m_taskStats.totalTime ? 1.0 * m_taskStats.dataVolume / 1000 / 1000 / m_taskStats.totalTime : 0.0) + .add("fileSize", m_archiveFile.fileSize) + .add("fileId", m_archiveFile.archiveFileID) + .add("fSeq", m_tapeFile.fSeq) + .add("reconciliationTime", m_archiveFile.reconciliationTime) + .add("LBPMode", m_LBPMode); + + lc.log(level, msg); +} - lc.log(level, msg); - } //------------------------------------------------------------------------------ // getTaskStats //------------------------------------------------------------------------------ const TapeSessionStats TapeWriteTask::getTaskStats() const { return m_taskStats; } -}}}} - - +} // namespace daemon +} // namespace tapeserver +} // namespace tape +} // namespace castor diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.hpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.hpp index 1e4b45d94fba23582dd50ae7a8037b1cfe88ff08..07a19a018ed1caaf4139eac35ef4f335172345a9 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteTask.hpp @@ -17,7 +17,6 @@ #pragma once - #include "castor/tape/tapeserver/daemon/DataConsumer.hpp" #include "castor/tape/tapeserver/daemon/DataPipeline.hpp" #include "castor/tape/tapeserver/daemon/MigrationMemoryManager.hpp" @@ -61,15 +60,14 @@ public: * @param blockCount: number of memory blocks (TODO:?) * @param mm: reference to the memory manager in use */ - TapeWriteTask(int blockCount, cta::ArchiveJob *archiveJob, - MigrationMemoryManager& mm,cta::threading::AtomicFlag& errorFlag); - - + TapeWriteTask(uint64_t blockCount, cta::ArchiveJob* archiveJob, MigrationMemoryManager& mm, + cta::threading::AtomicFlag& errorFlag); + /** * @return the size of the file in byte */ virtual uint64_t fileSize(); - + /** * Main execution routine * @param session @@ -77,64 +75,64 @@ public: * @param lc For logging * @param timer */ - virtual void execute(const std::unique_ptr<castor::tape::tapeFile::WriteSession> &session, - MigrationReportPacker & reportPacker, MigrationWatchDog & watchdog, - cta::log::LogContext& lc, cta::utils::Timer & timer); - + virtual void execute(const std::unique_ptr<castor::tape::tapeFile::WriteSession>& session, + MigrationReportPacker& reportPacker, MigrationWatchDog& watchdog, cta::log::LogContext& lc, + cta::utils::Timer& timer); + private: /** Utility class used in execute()'s implementation*/ - class Skip: public std::string { + class Skip : public std::string { public: - template<typename T> Skip(const T&t): std::string(t) {} + template<typename T> + explicit Skip(const T& t) : std::string(t) {} }; - + public: /** * Used to reclaim used memory blocks * @return the recyclable memory block */ - virtual MemBlock * getFreeBlock(); - + MemBlock* getFreeBlock() override; + /** * This is to enqueue one memory block full of data to be written on tape * @param mb: the memory block in question */ - virtual void pushDataBlock(MemBlock *mb) ; - + void pushDataBlock(MemBlock* mb) override; + /** * Destructor */ - virtual ~TapeWriteTask(); + ~TapeWriteTask() override; /** * Should only be called in case of error !! * Just pop data block and put in back into the memory manager */ void circulateMemBlocks(); - + /** * Return the task stats. Should only be called after execute * @return */ - const TapeSessionStats getTaskStats() const ; + const TapeSessionStats getTaskStats() const; private: /** * Log all localStats' stats + m_fileToMigrate's parameters * into lc with msg at the given level */ - void logWithStats(int level, const std::string& msg, - cta::log::LogContext& lc) const; - + void logWithStats(int level, const std::string& msg, cta::log::LogContext& lc) const; + /** * This function will check the consistency of the mem block and * throw exception is something goes wrong * @param mb The mem block to check - * @param memBlockId The block id the mem blopck should be at + * @param memBlockId The block id the mem block should be at * @param lc FOr logging */ - void checkErrors(MemBlock* mb,int memBlockId,cta::log::LogContext& lc); - + void checkErrors(MemBlock* mb, uint64_t memBlockId, cta::log::LogContext& lc); + /** * Function in charge of opening the FileWriter for m_fileToMigrate * Throw an exception it it fails @@ -142,40 +140,40 @@ private: * @param lc for logging purpose * @return the FileWriter if everything went well */ - std::unique_ptr<castor::tape::tapeFile::FileWriter> openFileWriter( - const std::unique_ptr<castor::tape::tapeFile::WriteSession>& session, cta::log::LogContext& lc); + std::unique_ptr<castor::tape::tapeFile::FileWriter> + openFileWriter(const std::unique_ptr<castor::tape::tapeFile::WriteSession>& session, cta::log::LogContext& lc); /** * All we need to know about the file we are migrating */ std::unique_ptr<cta::ArchiveJob> m_archiveJob; - + /** * reference to the memory manager in use */ - MigrationMemoryManager & m_memManager; - + MigrationMemoryManager& m_memManager; + /** * The fifo containing the memory blocks holding data to be written to tape */ DataPipeline m_fifo; - + /** * Mutex forcing serial access to the fifo */ cta::threading::Mutex m_producerProtection; - + /** * The number of memory blocks to be used */ - int m_blockCount; - + uint64_t m_blockCount; + /** * A shared flag among the the tasks and the task injector, set as true as soon * as task failed to do its job */ cta::threading::AtomicFlag& m_errorFlag; - + /** * Stats */ @@ -185,22 +183,24 @@ private: * LBP mode tracking */ std::string m_LBPMode; - + /** * The NS archive file information */ cta::common::dataStructures::ArchiveFile m_archiveFile; - + /** * The file archive result for the NS */ cta::common::dataStructures::TapeFile m_tapeFile; - + /** * The remote file information */ - std::string m_srcURL; + std::string m_srcURL; }; -}}}} - +} // namespace daemon +} // namespace tapeserver +} // namespace tape +} // namespace castor diff --git a/tapeserver/castor/tape/tapeserver/file/WriteSession.hpp b/tapeserver/castor/tape/tapeserver/file/WriteSession.hpp index 110017ffb85af3ab507e8403f695bfa12f82eac5..14fa851118ca3c52f46c5af58ee58d5cf5db7591 100644 --- a/tapeserver/castor/tape/tapeserver/file/WriteSession.hpp +++ b/tapeserver/castor/tape/tapeserver/file/WriteSession.hpp @@ -78,23 +78,23 @@ public: */ const bool m_useLbp; - inline std::string getSiteName() throw() { + inline std::string getSiteName() noexcept { return m_siteName; } - inline std::string getHostName() throw() { + inline std::string getHostName() noexcept { return m_hostName; } - inline void setCorrupted() throw() { + inline void setCorrupted() noexcept { m_corrupted = true; } - inline bool isCorrupted() throw() { + inline bool isCorrupted() noexcept { return m_corrupted; } - inline bool isTapeWithLbp() throw() { + inline bool isTapeWithLbp() noexcept { return m_detectedLbp; } @@ -126,12 +126,10 @@ public: * This is to be used by the tapeWriteTask right before writing the file * @param nextFSeq The fSeq we are about to write. */ - inline void validateNextFSeq(int nextFSeq) const { + inline void validateNextFSeq(uint64_t nextFSeq) const { if (nextFSeq != m_lastWrittenFSeq + 1) { - cta::exception::Exception e; - e.getMessage() << "In WriteSession::validateNextFSeq: wrong fSeq sequence: lastWrittenFSeq=" - << m_lastWrittenFSeq << " nextFSeq=" << nextFSeq; - throw e; + throw cta::exception::Exception("In WriteSession::validateNextFSeq: wrong fSeq sequence: lastWrittenFSeq=" + + std::to_string(m_lastWrittenFSeq) + " nextFSeq=" + std::to_string(nextFSeq)); } } @@ -141,12 +139,11 @@ public: * file. * @param writtenFSeq the fSeq of the file */ - inline void reportWrittenFSeq(int writtenFSeq) { + inline void reportWrittenFSeq(uint64_t writtenFSeq) { if (writtenFSeq != m_lastWrittenFSeq + 1) { - cta::exception::Exception e; - e.getMessage() << "In WriteSession::reportWrittenFSeq: wrong fSeq reported: lastWrittenFSeq=" - << m_lastWrittenFSeq << " writtenFSeq=" << writtenFSeq; - throw e; + throw cta::exception::Exception("In WriteSession::reportWrittenFSeq: wrong fSeq reported: lastWrittenFSeq=" + + std::to_string(m_lastWrittenFSeq) + + " writtenFSeq=" + std::to_string(writtenFSeq)); } m_lastWrittenFSeq = writtenFSeq; } @@ -180,7 +177,7 @@ private: /** * keep track of the fSeq we are writing to tape */ - int m_lastWrittenFSeq; + uint64_t m_lastWrittenFSeq; /** * set to true in case the write operations do (or try to do) something illegal