/* * @project The CERN Tape Archive (CTA) * @copyright Copyright(C) 2003-2021 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "common/log/LogContext.hpp" #include "castor/tape/tapeserver/daemon/DiskReadTask.hpp" #include "common/Timer.hpp" namespace castor { namespace tape { namespace tapeserver { namespace daemon { //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ DiskReadTask::DiskReadTask(DataConsumer & destination, cta::ArchiveJob *archiveJob, size_t numberOfBlock,cta::threading::AtomicFlag& errorFlag): m_nextTask(destination),m_archiveJob(archiveJob), m_numberOfBlock(numberOfBlock),m_errorFlag(errorFlag) { m_archiveJobCachedInfo.remotePath = m_archiveJob->srcURL; m_archiveJobCachedInfo.fileId = m_archiveJob->archiveFile.archiveFileID; } //------------------------------------------------------------------------------ // DiskReadTask::execute //------------------------------------------------------------------------------ void DiskReadTask::execute(cta::log::LogContext& lc, cta::disk::DiskFileFactory & fileFactory, MigrationWatchDog & watchdog, const int threadID) { using cta::log::LogContext; using cta::log::Param; cta::utils::Timer localTime; cta::utils::Timer totalTime(localTime); size_t blockId=0; size_t migratingFileSize=m_archiveJob->archiveFile.fileSize; MemBlock* mb=NULL; // This out-of-try-catch variables allows us to record the stage of the // process we're in, and to count the error if it occurs. // We will not record errors for an empty string. This will allow us to // prevent counting where error happened upstream. std::string currentErrorToCount = ""; try{ //we first check here to not even try to open the disk if a previous task has failed //because the disk could the very reason why the previous one failed, //so dont do the same mistake twice ! checkMigrationFailing(); currentErrorToCount = "Error_diskOpenForRead"; std::unique_ptr sourceFile( fileFactory.createReadFile(m_archiveJob->srcURL)); cta::log::ScopedParamContainer URLcontext(lc); URLcontext.add("path", m_archiveJob->srcURL) .add("actualURL", sourceFile->URL()); currentErrorToCount = "Error_diskFileToReadSizeMismatch"; if(migratingFileSize != sourceFile->size()){ throw cta::exception::Exception("Mismatch between size given by the client " "and the real one"); } currentErrorToCount = ""; m_stats.openingTime+=localTime.secs(cta::utils::Timer::resetCounter); LogContext::ScopedParam sp(lc, Param("fileId",m_archiveJob->archiveFile.archiveFileID)); lc.log(cta::log::INFO,"Opened disk file for read"); watchdog.addParameter(cta::log::Param("stillOpenFileForThread"+ std::to_string((long long)threadID), sourceFile->URL())); while(migratingFileSize>0){ checkMigrationFailing(); mb = m_nextTask.getFreeBlock(); m_stats.waitFreeMemoryTime+=localTime.secs(cta::utils::Timer::resetCounter); //set metadata and read the data mb->m_fileid = m_archiveJob->archiveFile.archiveFileID; mb->m_fileBlock = blockId++; currentErrorToCount = "Error_diskRead"; migratingFileSize -= mb->m_payload.read(*sourceFile); m_stats.readWriteTime+=localTime.secs(cta::utils::Timer::resetCounter); m_stats.dataVolume += mb->m_payload.size(); //we either read at full capacity (ie size=capacity, i.e. fill up the block), // or if there different, it should be the end of the file=> migratingFileSize // should be 0. If it not, it is an error currentErrorToCount = "Error_diskUnexpectedSizeWhenReading"; if(mb->m_payload.size() != mb->m_payload.totalCapacity() && migratingFileSize>0){ std::string erroMsg = "Error while reading a file: memory block not filled up, but the file is not fully read yet"; // Log the error cta::log::ScopedParamContainer params(lc); params.add("bytesInBlock", mb->m_payload.size()) .add("BlockCapacity", mb->m_payload.totalCapacity()) .add("BytesNotYetRead", migratingFileSize); lc.log(cta::log::ERR, "Error while reading a file: memory block not filled up, but the file is not fully read yet"); // Mark the block as failed mb->markAsFailed(erroMsg,666); // Transmit to the tape write task, which will finish the session m_nextTask.pushDataBlock(mb); // Fail the disk side. throw cta::exception::Exception(erroMsg); } currentErrorToCount = ""; m_stats.checkingErrorTime += localTime.secs(cta::utils::Timer::resetCounter); // We are done with the block, push it to the write task m_nextTask.pushDataBlock(mb); mb=NULL; } //end of while(migratingFileSize>0) m_stats.filesCount++; m_stats.totalTime = totalTime.secs(); // We do not have delayed open like in disk writes, so time spent // transferring equals total time. m_stats.transferTime = m_stats.totalTime; logWithStat(cta::log::INFO, "File successfully read from disk", lc); } catch(const castor::tape::tapeserver::daemon::ErrorFlag&){ lc.log(cta::log::DEBUG,"DiskReadTask: a previous file has failed for migration " "Do nothing except circulating blocks"); circulateAllBlocks(blockId,mb); } catch(const cta::exception::Exception& e){ // Send the error for counting to the watchdog if (currentErrorToCount.size()) { watchdog.addToErrorCount(currentErrorToCount); } // We have to pump the blocks anyway, mark them failed and then pass them back // to TapeWriteTask // Otherwise they would be stuck into TapeWriteTask free block fifo // If we got here we had some job to do so there shall be at least one // block either at hand or available. // The tape write task, upon reception of the failed block will mark the // session as failed, hence signalling to the remaining disk read tasks to // cancel as nothing more will be written to tape. if (!mb) { mb=m_nextTask.getFreeBlock(); ++blockId; } mb->markAsFailed(e.getMessageValue(), 666); // TODO - Drop error code m_nextTask.pushDataBlock(mb); mb=NULL; cta::log::ScopedParamContainer spc(lc); spc.add("blockID",blockId) .add("exceptionMessage", e.getMessageValue()) .add("fileSize",m_archiveJob->archiveFile.fileSize); m_archiveJob->archiveFile.checksumBlob.addFirstChecksumToLog(spc); lc.log(cta::log::ERR,"Exception while reading a file"); //deal here the number of mem block circulateAllBlocks(blockId,mb); } //end of catch watchdog.deleteParameter("stillOpenFileForThread"+ std::to_string((long long)threadID)); } //------------------------------------------------------------------------------ // DiskReadTask::circulateAllBlocks //------------------------------------------------------------------------------ void DiskReadTask::circulateAllBlocks(size_t fromBlockId, MemBlock * mb){ size_t blockId = fromBlockId; while(blockIdm_fileid = m_archiveJob->archiveFile.archiveFileID; mb->markAsCancelled(); m_nextTask.pushDataBlock(mb); mb=NULL; } //end of while } //------------------------------------------------------------------------------ // logWithStat //------------------------------------------------------------------------------ void DiskReadTask::logWithStat(int level,const std::string& msg,cta::log::LogContext& lc){ cta::log::ScopedParamContainer params(lc); params.add("readWriteTime", m_stats.readWriteTime) .add("checksumingTime",m_stats.checksumingTime) .add("waitFreeMemoryTime",m_stats.waitFreeMemoryTime) .add("waitDataTime",m_stats.waitDataTime) .add("waitReportingTime",m_stats.waitReportingTime) .add("checkingErrorTime",m_stats.checkingErrorTime) .add("openingTime",m_stats.openingTime) .add("transferTime", m_stats.transferTime) .add("totalTime", m_stats.totalTime) .add("dataVolume", m_stats.dataVolume) .add("globalPayloadTransferSpeedMBps", m_stats.totalTime?1.0*m_stats.dataVolume/1000/1000/m_stats.totalTime:0) .add("diskPerformanceMBps", m_stats.transferTime?1.0*m_stats.dataVolume/1000/1000/m_stats.transferTime:0) .add("openRWCloseToTransferTimeRatio", m_stats.transferTime?(m_stats.openingTime+m_stats.readWriteTime+m_stats.closingTime)/m_stats.transferTime:0.0) .add("fileId",m_archiveJobCachedInfo.fileId) .add("path",m_archiveJobCachedInfo.remotePath); lc.log(level,msg); } const DiskStats DiskReadTask::getTaskStats() const{ return m_stats; } }}}}