/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2003-2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "common/log/LogContext.hpp"
#include "castor/tape/tapeserver/daemon/DiskWriteTask.hpp"
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "common/Timer.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
DiskWriteTask::DiskWriteTask(cta::RetrieveJob *retrieveJob, RecallMemoryManager& mm):
m_retrieveJob(retrieveJob),m_memManager(mm){}
//------------------------------------------------------------------------------
// DiskWriteTask::execute
//------------------------------------------------------------------------------
bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext& lc,
cta::disk::DiskFileFactory & fileFactory, RecallWatchDog & watchdog,
const int threadID) {
using cta::log::LogContext;
using cta::log::Param;
cta::utils::Timer localTime;
cta::utils::Timer totalTime(localTime);
cta::utils::Timer transferTime(localTime);
cta::log::ScopedParamContainer URLcontext(lc);
URLcontext.add("fileId",m_retrieveJob->retrieveRequest.archiveFileID)
.add("dstURL", m_retrieveJob->retrieveRequest.dstURL)
.add("fSeq",m_retrieveJob->selectedTapeFile().fSeq);
m_stats.dstURL = m_retrieveJob->retrieveRequest.dstURL;
m_stats.fileId = m_retrieveJob->retrieveRequest.archiveFileID;
// This out-of-try-catch variables allows us to record the stage of the
// process we're in, and to count the error if it occurs.
// We will not record errors for an empty string. This will allow us to
// prevent counting where error happened upstream.
std::string currentErrorToCount = "";
bool isVerifyOnly(false);
try{
currentErrorToCount = "";
// Placeholder for the disk file. We will open it only
// after getting a first correct memory block.
std::unique_ptr writeFile;
int blockId = 0;
unsigned long checksum = Payload::zeroAdler32();
while(1) {
if(MemBlock* const mb = m_fifo.pop()) {
m_stats.waitDataTime+=localTime.secs(cta::utils::Timer::resetCounter);
AutoReleaseBlock releaser(mb,m_memManager);
if(mb->isVerifyOnly()) {
// For verifyOnly, there is no disk file to write. Ignore the memory block and continue.
isVerifyOnly = true;
continue;
} else if(mb->isCanceled()) {
// If the tape side got canceled, we report nothing and count
// it as a success.
lc.log(cta::log::DEBUG, "File transfer canceled");
return true;
}
//will throw (thus exiting the loop) if something is wrong
checkErrors(mb,blockId,lc);
m_stats.checkingErrorTime += localTime.secs(cta::utils::Timer::resetCounter);
// If we got that far on the first pass, it's now good enough to open
// the disk file for writing...
if (!writeFile.get()) {
lc.log(cta::log::DEBUG, "About to open disk file for writing");
// Synchronise the counter with the open time counter.
currentErrorToCount = "Error_diskOpenForWrite";
transferTime = localTime;
writeFile.reset(fileFactory.createWriteFile(m_retrieveJob->retrieveRequest.dstURL));
URLcontext.add("actualURL", writeFile->URL());
lc.log(cta::log::INFO, "Opened disk file for writing");
m_stats.openingTime+=localTime.secs(cta::utils::Timer::resetCounter);
watchdog.addParameter(cta::log::Param("stillOpenFileForThread"+
std::to_string((long long)threadID), writeFile->URL()));
}
// Write the data.
currentErrorToCount = "Error_diskWrite";
m_stats.dataVolume+=mb->m_payload.size();
if (mb->m_payload.size())
mb->m_payload.write(*writeFile);
m_stats.readWriteTime+=localTime.secs(cta::utils::Timer::resetCounter);
checksum = mb->m_payload.adler32(checksum);
m_stats.checksumingTime+=localTime.secs(cta::utils::Timer::resetCounter);
currentErrorToCount = "";
blockId++;
//end if block non NULL
} else if(isVerifyOnly) {
// No file to close, we are done
break;
} else {
//close has to be explicit, because it may throw.
//A close is done in WriteFile's destructor, but it may lead to some
//silent data loss
currentErrorToCount = "Error_diskCloseAfterWrite";
// Set the checksum on the server (actually needed only for Rados striper
// noop in other cases).
writeFile->setChecksum(checksum);
writeFile->close();
m_stats.closingTime +=localTime.secs(cta::utils::Timer::resetCounter);
m_stats.filesCount++;
break;
currentErrorToCount = "";
}
} //end of while(1)
m_retrieveJob->transferredSize = m_stats.dataVolume;
m_retrieveJob->transferredChecksumType = "ADLER32";
{
std::stringstream cs;
cs << std::hex << std::nouppercase << std::setfill('0') << std::setw(8) << (uint32_t)checksum;
m_retrieveJob->transferredChecksumValue = cs.str();
}
reporter.reportCompletedJob(std::move(m_retrieveJob));
m_stats.waitReportingTime+=localTime.secs(cta::utils::Timer::resetCounter);
m_stats.transferTime = transferTime.secs();
m_stats.totalTime = totalTime.secs();
logWithStat(cta::log::INFO, isVerifyOnly ? "File successfully verified" : "File successfully transfered to disk", lc);
watchdog.deleteParameter("stillOpenFileForThread" + std::to_string((long long)threadID));
//everything went well, return true
return true;
} //end of try
catch(const cta::exception::Exception& e){
/*
*We might end up there because ;
* -- WriteFile failed
* -- A desynchronization between tape read and disk write
* -- An error in tape read
* -- An error while writing the file
*/
//there might still be some blocks into m_fifo
// We need to empty it
releaseAllBlock();
// Propagate the error to the watchdog
if (currentErrorToCount.size()) {
watchdog.addToErrorCount(currentErrorToCount);
}
m_stats.waitReportingTime+=localTime.secs(cta::utils::Timer::resetCounter);
cta::log::ScopedParamContainer params(lc);
params.add("errorMessage", e.getMessageValue());
logWithStat(cta::log::ERR, isVerifyOnly ? "File verification failed" : "File writing to disk failed", lc);
lc.logBacktrace(cta::log::ERR, e.backtrace());
reporter.reportFailedJob(std::move(m_retrieveJob), e);
watchdog.deleteParameter("stillOpenFileForThread"+
std::to_string((long long)threadID));
//got an exception, return false
return false;
}
}
//------------------------------------------------------------------------------
// DiskWriteTask::getFreeBlock
//------------------------------------------------------------------------------
MemBlock *DiskWriteTask::getFreeBlock() {
throw cta::exception::Exception("DiskWriteTask::getFreeBlock should mot be called");
}
//------------------------------------------------------------------------------
// DiskWriteTask::pushDataBlock
//------------------------------------------------------------------------------
void DiskWriteTask::pushDataBlock(MemBlock *mb) {
cta::threading::MutexLocker ml(m_producerProtection);
m_fifo.push(mb);
}
//------------------------------------------------------------------------------
// DiskWriteTask::~DiskWriteTask
//------------------------------------------------------------------------------
DiskWriteTask::~DiskWriteTask() {
volatile cta::threading::MutexLocker ml(m_producerProtection);
}
//------------------------------------------------------------------------------
// DiskWriteTask::releaseAllBlock
//------------------------------------------------------------------------------
void DiskWriteTask::releaseAllBlock(){
while(1){
if(MemBlock* mb=m_fifo.pop())
AutoReleaseBlock release(mb,m_memManager);
else
break;
}
}
//------------------------------------------------------------------------------
// checkErrors
//------------------------------------------------------------------------------
void DiskWriteTask::checkErrors(MemBlock* mb,int blockId,cta::log::LogContext& lc){
using namespace cta::log;
if(m_retrieveJob->retrieveRequest.archiveFileID != mb->m_fileid
|| blockId != mb->m_fileBlock || mb->isFailed() ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("received_archiveFileID", mb->m_fileid)),
LogContext::ScopedParam(lc, Param("expected_NSBLOCKId", blockId)),
LogContext::ScopedParam(lc, Param("received_NSBLOCKId", mb->m_fileBlock)),
LogContext::ScopedParam(lc, Param("failed_Status", mb->isFailed()))
};
tape::utils::suppresUnusedVariable(sp);
std::string errorMsg;
//int errCode;
if(mb->isFailed()){
errorMsg=mb->errorMsg();
//disabled temporarily (see comment in MemBlock)
//errCode=mb->errorCode();
}
else{
errorMsg="Mismatch between expected and received fileid or blockid";
//errCode=666;
}
lc.log(cta::log::ERR,errorMsg);
throw cta::exception::Exception(errorMsg);
}
}
//------------------------------------------------------------------------------
// getTiming
//------------------------------------------------------------------------------
const DiskStats DiskWriteTask::getTaskStats() const{
return m_stats;
}
//------------------------------------------------------------------------------
// logWithStat
//------------------------------------------------------------------------------
void DiskWriteTask::logWithStat(int level,const std::string& msg,cta::log::LogContext& lc){
cta::log::ScopedParamContainer params(lc);
params.add("readWriteTime", m_stats.readWriteTime)
.add("checksumingTime",m_stats.checksumingTime)
.add("waitDataTime",m_stats.waitDataTime)
.add("waitReportingTime",m_stats.waitReportingTime)
.add("checkingErrorTime",m_stats.checkingErrorTime)
.add("openingTime",m_stats.openingTime)
.add("closingTime",m_stats.closingTime)
.add("transferTime", m_stats.transferTime)
.add("totalTime", m_stats.totalTime)
.add("dataVolume", m_stats.dataVolume)
.add("globalPayloadTransferSpeedMBps",
m_stats.totalTime?1.0*m_stats.dataVolume/1000/1000/m_stats.totalTime:0)
.add("diskPerformanceMBps",
m_stats.transferTime?1.0*m_stats.dataVolume/1000/1000/m_stats.transferTime:0)
.add("openRWCloseToTransferTimeRatio",
m_stats.transferTime?(m_stats.openingTime+m_stats.readWriteTime+m_stats.closingTime)/m_stats.transferTime:0.0)
.add("fileId",m_stats.fileId)
.add("dstURL",m_stats.dstURL);
lc.log(level,msg);
}
}}}}