Commit de8f825d authored by Eric Cano's avatar Eric Cano
Browse files

Instrumeted the tape threads (read/write) for timing reporting.

parent f40a6d13
/******************************************************************************
* SessionStats.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2014 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
namespace castor {
namespace tape {
namespace tapeserver {
/**
* Structure holding the timers and stats for the tape session. We use doubles,
* for time and all measurements are in seconds or uint64_t for bytes.
*/
struct SessionStats {
/** Mounting time, in seconds */
double mountTime;
/** Cumulated positioning time, in seconds. */
double positionTime;
/** Cumulated time spent computing checksums */
double checksumingTime;
/** Cumulated time spent transfering data with the drive (for both data and headers). */
double transferTime;
/** Cumulated time spent flushing */
double flushTime;
/** Unload time, in seconds. */
double unloadTime;
/** Unmount time, in seconds. */
double unmountTime;
/** Cumulated time spent waiting for data blocks/free memory. */
double waitDataTime;
/** Cumulated time spent by the tape thread waiting for a task. */
double waitInstructionsTime;
/** Cumulated time spent reporting */
double waitReportingTime;
/** Cumulated data volume (actual payload), in bytes. */
uint64_t dataVolume;
/** Cumulated space used by file headers. */
uint64_t headerVolume;
/** Count of files actually transfered in the session. */
uint64_t filesCount;
/** Constructor: all defaults are zero */
SessionStats(): mountTime(0.0), positionTime(0.0), checksumingTime(0.0),
transferTime(0.0), flushTime(0.0), unloadTime(0.0), unmountTime(0.0),
waitDataTime(0.0), waitInstructionsTime(0.0), waitReportingTime(0.0),
dataVolume(0), headerVolume(0), filesCount(0) {}
/** Accumulate contents of another stats block */
void add(const SessionStats& other) {
mountTime += other.mountTime;
positionTime += other.positionTime;
checksumingTime += other.checksumingTime;
transferTime += other.transferTime;
flushTime += other.flushTime;
unloadTime += other.unloadTime;
unmountTime += other.unmountTime;
waitDataTime += other.waitDataTime;
waitInstructionsTime += other.waitInstructionsTime;
waitReportingTime += other.waitReportingTime;
dataVolume += other.dataVolume;
headerVolume += other.headerVolume;
filesCount += other.filesCount;
}
};
}
}
}
\ No newline at end of file
......@@ -81,8 +81,11 @@ private:
//RAII class for cleaning tape stuff
class TapeCleaning{
TapeReadSingleThread& m_this;
// As we are living in the single thread of tape, we can borrow the timer
utils::Timer & m_timer;
public:
TapeCleaning(TapeReadSingleThread& parent):m_this(parent){}
TapeCleaning(TapeReadSingleThread& parent, utils::Timer & timer):
m_this(parent), m_timer(timer){}
~TapeCleaning(){
// Tell everyone to wrap up the session
// We now acknowledge to the task injector that read reached the end. There
......@@ -91,8 +94,8 @@ private:
//then we log/notify
m_this.m_logContext.log(LOG_INFO, "Finishing Tape Read Thread."
" Just signaled task injector of the end");
m_this.m_stats.waitReportingTime += m_timer.secs(utils::Timer::resetCounter);
try {
tape::utils::Timer timer;
// Do the final cleanup
// in the special case of a "manual" mode tape, we should skip the unload too.
if (m_this.m_drive.librarySlot != "manual") {
......@@ -101,15 +104,16 @@ private:
} else {
m_this.m_logContext.log(LOG_INFO, "TapeReadSingleThread: Tape NOT unloaded (manual mode)");
}
m_this.m_stats.unloadTime += m_timer.secs(utils::Timer::resetCounter);
// And return the tape to the library
// In case of manual mode, this will be filtered by the rmc daemon
// (which will do nothing)
m_this.m_rmc.unmountTape(m_this.m_volInfo.vid, m_this.m_drive.librarySlot);
log::LogContext::ScopedParam sp0( m_this.m_logContext, log::Param("timeTaken", timer.usecs()));
m_this.m_stats.unmountTime += m_timer.secs(utils::Timer::resetCounter);
m_this.m_logContext.log(LOG_INFO, m_this.m_drive.librarySlot != "manual"?
"TapeReadSingleThread : tape unmounted":"TapeReadSingleThread : tape NOT unmounted (manual mode)");
m_this.m_tsr.tapeUnmounted();
m_this.m_stats.waitReportingTime += m_timer.secs(utils::Timer::resetCounter);
} catch(const castor::exception::Exception& ex){
//set it to -1 to notify something failed during the cleaning
m_this.m_hardarwareStatus = -1;
......@@ -174,47 +178,70 @@ private:
*/
virtual void run() {
m_logContext.pushOrReplace(log::Param("thread", "tapeRead"));
castor::tape::utils::Timer timer, totalTimer;
try{
// Set capabilities allowing rawio (and hence arbitrary SCSI commands)
// through the st driver file descriptor.
setCapabilities();
// the tape ins loaded
//it has to be unloaded, unmounted at all cost -> RAII
//will also take care of the TapeServerReporter and of RecallTaskInjector
TapeCleaning tapeCleaner(*this);
// Before anything, the tape should be mounted
mountTape(legacymsg::RmcProxy::MOUNT_MODE_READONLY);
waitForDrive();
// Then we have to initialise the tape read session
std::auto_ptr<castor::tape::tapeFile::ReadSession> rs(openReadSession());
//and then report
m_logContext.log(LOG_INFO, "Tape read session session successfully started");
m_tsr.tapeMountedForRead();
tape::utils::Timer timer;
//start the threading and ask to initiate the protocol with the tapeserverd
m_watchdog.startThread();
// Then we will loop on the tasks as they get from
// the task injector
while(1) {
// NULL indicated the end of work
TapeReadTask * task = popAndRequestMoreJobs();
m_logContext.log(LOG_DEBUG, "TapeReadThread: just got one more job");
if (task) {
task->execute(*rs, m_logContext,m_watchdog);
delete task;
} else {
log::LogContext::ScopedParam sp0(m_logContext, log::Param("time taken", timer.secs()));
m_logContext.log(LOG_INFO, "reading data from the tape has finished");
break;
{
// The tape will be loaded
// it has to be unloaded, unmounted at all cost -> RAII
// will also take care of the TapeServerReporter and of RecallTaskInjector
TapeCleaning tapeCleaner(*this, timer);
// Before anything, the tape should be mounted
mountTape(legacymsg::RmcProxy::MOUNT_MODE_READONLY);
waitForDrive();
m_stats.mountTime += timer.secs(utils::Timer::resetCounter);
// Then we have to initialise the tape read session
std::auto_ptr<castor::tape::tapeFile::ReadSession> rs(openReadSession());
m_stats.positionTime += timer.secs(utils::Timer::resetCounter);
//and then report
m_logContext.log(LOG_INFO, "Tape read session session successfully started");
m_tsr.tapeMountedForRead();
m_stats.waitReportingTime += timer.secs(utils::Timer::resetCounter);
//start the threading and ask to initiate the protocol with the tapeserverd
// TODO: move up so this thread is the same as others (if we still need it)
m_watchdog.startThread();
// Then we will loop on the tasks as they get from
// the task injector
std::auto_ptr<TapeReadTask> task;
while(1) {
//get a task
task.reset(popAndRequestMoreJobs());
m_stats.waitInstructionsTime += timer.secs(utils::Timer::resetCounter);
// If we reached the end
if (NULL==task.get()) {
m_logContext.log(LOG_INFO, "No more files to read from tape");
break;
}
task->execute(*rs, m_logContext, m_watchdog,m_stats,timer);
}
m_watchdog.stopThread();
}
m_watchdog.stopThread();
// The session completed successfully, and the cleaner (unmount) executed
// at the end of the previous block. Log the results.
double sessionTime = totalTimer.secs();
log::ScopedParamContainer params(m_logContext);
params.add("mountTime", m_stats.mountTime)
.add("positionTime", m_stats.positionTime)
.add("waitInstructionsTime", m_stats.waitInstructionsTime)
.add("checksumingTime", m_stats.checksumingTime)
.add("transferTime", m_stats.transferTime)
.add("waitDataTime", m_stats.waitDataTime)
.add("waitReportingTime", m_stats.waitReportingTime)
.add("flushTime", m_stats.flushTime)
.add("unloadTime", m_stats.unloadTime)
.add("unmountTime", m_stats.unmountTime)
.add("dataVolume", m_stats.dataVolume)
.add("headerVolume", m_stats.headerVolume)
.add("filesCount", m_stats.filesCount)
.add("dataBandwidthMiB/s", 1.0*m_stats.dataVolume
/1024/1024/sessionTime)
.add("driveBandwidthMiB/s", 1.0*(m_stats.dataVolume+m_stats.headerVolume)
/1024/1024/sessionTime)
.add("sessionTime", sessionTime);
m_logContext.log(LOG_INFO, "Completed recall session's tape thread successfully");
} catch(const castor::exception::Exception& e){
// we can only end there because
// moundTape, waitForDrive or crating the ReadSession failed
......
......@@ -31,6 +31,8 @@
#include "castor/tape/tapeserver/exception/Exception.hpp"
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/tape/tapeserver/daemon/SessionStats.hpp"
#include "castor/tape/utils/Timer.hpp"
namespace castor {
namespace tape {
......@@ -59,8 +61,9 @@ public:
* The main loop is :
* Acquire a free memory block from the memory manager , fill it, push it
*/
void execute(castor::tape::tapeFile::ReadSession & rs,
castor::log::LogContext & lc,TaskWatchDog<detail::Recall>& watchdog) {
void execute(castor::tape::tapeFile::ReadSession & rs,
castor::log::LogContext & lc,TaskWatchDog<detail::Recall>& watchdog,
SessionStats & stats, utils::Timer & timer) {
using castor::log::Param;
typedef castor::log::LogContext::ScopedParam ScopedParam;
......@@ -72,8 +75,11 @@ public:
ScopedParam sp3(lc, Param("fSeq", m_fileToRecall->fseq()));
ScopedParam sp4(lc, Param("fileTransactionId", m_fileToRecall->fileTransactionId()));
// We will clock the stats for the file itself, and eventually add those
// stats to the session's.
SessionStats localStats;
utils::Timer localTime;
// Read the file and transmit it
bool stillReading = true;
//for counting how many mem blocks have used and how many tape blocks
......@@ -84,10 +90,13 @@ public:
MemBlock* mb=NULL;
try {
std::auto_ptr<castor::tape::tapeFile::ReadFile> rf(openReadFile(rs,lc));
localStats.positionTime += timer.secs(utils::Timer::resetCounter);
watchdog.notifyBeginNewJob(*m_fileToRecall);
localStats.waitReportingTime += timer.secs(utils::Timer::resetCounter);
while (stillReading) {
// Get a memory block and add information to its metadata
mb=m_mm.getFreeBlock();
localStats.waitDataTime += timer.secs(utils::Timer::resetCounter);
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileBlock = fileBlock++;
......@@ -99,62 +108,84 @@ public:
// append conveniently returns false when there will not be more space
// for an extra tape block, and throws an exception if we reached the
// end of file. append() also protects against reading too big tape blocks.
while (mb->m_payload.append(*rf)) {
tapeBlock++;
}
} catch (const castor::tape::exceptions::EndOfFile&) {
// append() signaled the end of the file.
stillReading = false;
}
// Pass the block to the disk write task
m_fifo.pushDataBlock(mb);
watchdog.notify();
lc.log(LOG_INFO, "going for sleep");
while (mb->m_payload.append(*rf)) {
tapeBlock++;
}
} catch (const castor::tape::exceptions::EndOfFile&) {
// append() signaled the end of the file.
stillReading = false;
}
localStats.transferTime += timer.secs(utils::Timer::resetCounter);
localStats.dataVolume += mb->m_payload.size();
// Pass the block to the disk write task
m_fifo.pushDataBlock(mb);
watchdog.notify();
localStats.waitReportingTime += timer.secs(utils::Timer::resetCounter);
lc.log(LOG_INFO, "going for sleep");
} //end of while(stillReading)
// we have to signal the end of the tape read to the disk write task.
m_fifo.pushDataBlock(NULL);
lc.log(LOG_DEBUG, "File read completed");
} //end of try
catch (castor::exception::Exception & ex) {
//we end up there because :
//-- openReadFile brought us here (cant put the tape into position)
//-- m_payload.append brought us here (error while reading the file)
// This is an error case. Log and signal to the disk write task
{
castor::log::LogContext::ScopedParam sp0(lc, Param("fileBlock", fileBlock));
castor::log::LogContext::ScopedParam sp1(lc, Param("ErrorMessage", ex.getMessageValue()));
castor::log::LogContext::ScopedParam sp2(lc, Param("ErrorCode", ex.code()));
lc.log(LOG_ERR, "Error reading a file block in TapeReadFileTask (backtrace follows)");
}
{
castor::log::LogContext lc2(lc.logger());
lc2.logBacktrace(LOG_ERR, ex.backtrace());
}
//if we end up there because openReadFile brought us here
//then mb is not valid, we need to get a block
//that will be done in reportErrorToDiskTask()
//or directly call reportErrorToDiskTask with the mem block
if(!mb) {
reportErrorToDiskTask();
}
else{
reportErrorToDiskTask(mb);
}
} //end of catch
// Log the successful transfer
double fileTime = localTime.secs();
log::ScopedParamContainer params(lc);
params.add("transferTime", localStats.transferTime)
.add("checksumingTime",localStats.checksumingTime)
.add("waitDataTime",localStats.waitDataTime)
.add("waitReportingTime",localStats.waitReportingTime)
.add("dataVolume",localStats.dataVolume)
.add("headerVolume",localStats.headerVolume)
.add("totalTime", fileTime)
.add("driveTransferSpeedMiB/s",
(localStats.dataVolume+localStats.headerVolume)
/1024/1024
/localStats.transferTime)
.add("payloadTransferSpeedMB/s",
1.0*localStats.dataVolume/1024/1024/fileTime)
.add("fileid",m_fileToRecall->fileid())
.add("fseq",m_fileToRecall->fseq())
.add("fileTransactionId",m_fileToRecall->fileTransactionId());
lc.log(LOG_INFO, "File successfully read from drive");
// Add the local counts to the session's
stats.add(localStats);
} //end of try
catch (castor::exception::Exception & ex) {
//we end up there because :
//-- openReadFile brought us here (cant put the tape into position)
//-- m_payload.append brought us here (error while reading the file)
// This is an error case. Log and signal to the disk write task
{
castor::log::LogContext::ScopedParam sp0(lc, Param("fileBlock", fileBlock));
castor::log::LogContext::ScopedParam sp1(lc, Param("ErrorMessage", ex.getMessageValue()));
castor::log::LogContext::ScopedParam sp2(lc, Param("ErrorCode", ex.code()));
lc.log(LOG_ERR, "Error reading a file block in TapeReadFileTask (backtrace follows)");
}
{
castor::log::LogContext lc2(lc.logger());
lc2.logBacktrace(LOG_ERR, ex.backtrace());
}
//if we end up there because openReadFile brought us here
//then mb is not valid, we need to get a block
//that will be done in reportErrorToDiskTask()
//or directly call reportErrorToDiskTask with the mem block
if(!mb) {
reportErrorToDiskTask();
}
else{
reportErrorToDiskTask(mb);
}
} //end of catch
watchdog.fileFinished();
}
/**
* Get a valid block and ask to to do the report to the disk write task
*/
void reportErrorToDiskTask(){
MemBlock* mb =m_mm.getFreeBlock();
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileid = m_fileToRecall->fileid();
reportErrorToDiskTask(mb);
}
void reportErrorToDiskTask(){
MemBlock* mb =m_mm.getFreeBlock();
mb->m_fSeq = m_fileToRecall->fseq();
mb->m_fileid = m_fileToRecall->fileid();
reportErrorToDiskTask(mb);
}
private:
/**
* Do the actual report to the disk write task
......
......@@ -86,8 +86,7 @@ private:
// As we are living in the single thread of tape, we can borrow the timer
utils::Timer & m_timer;
public:
TapeCleaning(TapeWriteSingleThread& parent,
utils::Timer & timer):
TapeCleaning(TapeWriteSingleThread& parent, utils::Timer & timer):
m_this(parent), m_timer(timer){}
~TapeCleaning(){
try{
......@@ -187,56 +186,85 @@ private:
virtual void run() {
m_logContext.pushOrReplace(log::Param("thread", "TapeWrite"));
castor::tape::utils::Timer timer, totalTimer;
try
{
m_logContext.pushOrReplace(log::Param("thread", "TapeWrite"));
// Set capabilities allowing rawio (and hence arbitrary SCSI commands)
// through the st driver file descriptor.
setCapabilities();
TapeCleaning cleaner(*this, timer);
mountTape(castor::legacymsg::RmcProxy::MOUNT_MODE_READWRITE);
waitForDrive();
m_stats.mountTime += timer.secs(utils::Timer::resetCounter);
// Then we have to initialise the tape write session
std::auto_ptr<castor::tape::tapeFile::WriteSession> writeSession(openWriteSession());
m_stats.positionTime += timer.secs(utils::Timer::resetCounter);
//log and notify
m_logContext.log(LOG_INFO, "Starting tape write thread");
m_tsr.tapeMountedForWrite();
uint64_t bytes=0;
uint64_t files=0;
m_stats.waitReportingTime += timer.secs(utils::Timer::resetCounter);
std::auto_ptr<TapeWriteTask> task;
while(1) {
//get a task
task.reset(m_tasks.pop());
m_stats.waitInstructionsTime += timer.secs(utils::Timer::resetCounter);
//if is the end
if(NULL==task.get()) {
//we flush without asking
tapeFlush("No more data to write on tape, unconditional flushing to the client",bytes,files,timer);
m_stats.flushTime += timer.secs(utils::Timer::resetCounter);
//end of session + log
m_reportPacker.reportEndOfSession();
log::LogContext::ScopedParam sp0(m_logContext, log::Param("tapeThreadDuration", totalTimer.secs()));
m_logContext.log(LOG_DEBUG, "writing data to tape has finished");
break;
}
task->execute(*writeSession,m_reportPacker,m_logContext,m_stats,timer);
// Increase local flush counters (session counters are incremented by
// the task)
files++;
bytes+=task->fileSize();
//if one flush counter is above a threshold, then we flush
if (files >= m_filesBeforeFlush || bytes >= m_bytesBeforeFlush) {
tapeFlush("Normal flush because thresholds was reached",bytes,files,timer);
files=0;
bytes=0;
}
} //end of while(1))
{
// The tape will be loaded
// it has to be unloaded, unmounted at all cost -> RAII
// will also take care of the TapeServerReporter
TapeCleaning cleaner(*this, timer);
// Before anything, the tape should be mounted
mountTape(castor::legacymsg::RmcProxy::MOUNT_MODE_READWRITE);
waitForDrive();
m_stats.mountTime += timer.secs(utils::Timer::resetCounter);
// Then we have to initialise the tape write session
std::auto_ptr<castor::tape::tapeFile::WriteSession> writeSession(openWriteSession());
m_stats.positionTime += timer.secs(utils::Timer::resetCounter);
//log and notify
m_logContext.log(LOG_INFO, "Starting tape write thread");
m_tsr.tapeMountedForWrite();
uint64_t bytes=0;
uint64_t files=0;
m_stats.waitReportingTime += timer.secs(utils::Timer::resetCounter);
std::auto_ptr<TapeWriteTask> task;
while(1) {
//get a task
task.reset(m_tasks.pop());
m_stats.waitInstructionsTime += timer.secs(utils::Timer::resetCounter);
//if is the end
if(NULL==task.get()) {
//we flush without asking
tapeFlush("No more data to write on tape, unconditional flushing to the client",bytes,files,timer);
m_stats.flushTime += timer.secs(utils::Timer::resetCounter);
//end of session + log
m_reportPacker.reportEndOfSession();
log::LogContext::ScopedParam sp0(m_logContext, log::Param("tapeThreadDuration", totalTimer.secs()));
m_logContext.log(LOG_DEBUG, "writing data to tape has finished");
break;
}
task->execute(*writeSession,m_reportPacker,m_logContext,m_stats,timer);
// Increase local flush counters (session counters are incremented by
// the task)
files++;
bytes+=task->fileSize();
//if one flush counter is above a threshold, then we flush
if (files >= m_filesBeforeFlush || bytes >= m_bytesBeforeFlush) {
tapeFlush("Normal flush because thresholds was reached",bytes,files,timer);
files=0;
bytes=0;
}
} //end of while(1))
}
// The session completed successfully, and the cleaner (unmount) executed
// at the end of the previous block. Log the results.
double sessionTime = totalTimer.secs();
log::ScopedParamContainer params(m_logContext);
params.add("mountTime", m_stats.mountTime)
.add("positionTime", m_stats.positionTime)
.add("waitInstructionsTime", m_stats.waitInstructionsTime)
.add("checksumingTime", m_stats.checksumingTime)
.add("transferTime", m_stats.transferTime)
.add("waitDataTime", m_stats.waitDataTime)
.add("waitReportingTime", m_stats.waitReportingTime)
.add("flushTime", m_stats.flushTime)
.add("unloadTime", m_stats.unloadTime)
.add("unmountTime", m_stats.unmountTime)
.add("dataVolume", m_stats.dataVolume)
.add("headerVolume", m_stats.headerVolume)
.add("filesCount", m_stats.filesCount)
.add("dataBandwidthMiB/s", 1.0*m_stats.dataVolume
/1024/1024/sessionTime)
.add("driveBandwidthMiB/s", 1.0*(m_stats.dataVolume+m_stats.headerVolume)
/1024/1024/sessionTime)
.add("sessionTime", sessionTime);
m_logContext.log(LOG_INFO, "Completed migration session successfully");
} //end of try
catch(const castor::exception::Exception& e){
//we end there because write session could not be opened or because a task failed
......@@ -255,9 +283,7 @@ private:
m_logContext.log(LOG_ERR,"An error occurred during TapwWriteSingleThread::execute");
m_reportPacker.reportEndOfSessionWithErrors(e.what(),e.code());
}
// The session completed successfuly. Log the results
TODOTODO
}
}
//m_filesBeforeFlush and m_bytesBeforeFlush are thresholds for flushing
......
......@@ -65,6 +65,8 @@ namespace daemon {
using castor::log::LogContext;
using castor::log::Param;
// We will clock the stats for the file itself, and eventually add those
// stats to the session's.
SessionStats localStats;
utils::Timer localTime;
unsigned long ckSum = Payload::zeroAdler32();
......@@ -118,6 +120,7 @@ namespace daemon {
reportPacker.reportCompletedJob(*m_fileToMigrate,ckSum);
localStats.waitReportingTime += timer.secs(utils::Timer::resetCounter);
// Log the successful transfer
double fileTime = localTime.secs();
log::ScopedParamContainer params(lc);
params.add("transferTime", localStats.transferTime)
.add("checksumingTime",localStats.checksumingTime)
......@@ -125,12 +128,13 @@ namespace daemon {
.add("waitReportingTime",localStats.waitReportingTime)
.add("dataVolume",localStats.dataVolume)
.add("headerVolume",localStats.headerVolume)