Commit 7536a63e authored by David COME's avatar David COME
Browse files

Started to measure the duration of Disk operation (read/write) at task and thread level

parent 0e17aca1
......@@ -22,7 +22,9 @@
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/DiskReadTask.hpp"
#include "castor/tape/utils/Timer.hpp"
#include "castor/log/LogContext.hpp"
namespace{
/** Use RAII to make sure the memory block is released
*(ie pushed back to the memory manager) in any case (exception or not)
......@@ -60,6 +62,10 @@ m_nextTask(destination),m_migratedFile(file),
// DiskReadTask::execute
//------------------------------------------------------------------------------
void DiskReadTask::execute(log::LogContext& lc) {
using log::LogContext;
using log::Param;
utils::Timer localTime;
size_t blockId=0;
size_t migratingFileSize=m_migratedFile->fileSize();
try{
......@@ -69,8 +75,9 @@ void DiskReadTask::execute(log::LogContext& lc) {
hasAnotherTaskTailed();
tape::diskFile::ReadFile sourceFile(m_migratedFile->path());
log::LogContext::ScopedParam sp(lc, log::Param("filePath",m_migratedFile->path()));
m_stats.openingTime+=localTime.secs(utils::Timer::resetCounter);
LogContext::ScopedParam sp(lc, Param("filePath",m_migratedFile->path()));
lc.log(LOG_INFO,"Opened file on disk for migration ");
while(migratingFileSize>0){
......@@ -78,14 +85,18 @@ void DiskReadTask::execute(log::LogContext& lc) {
hasAnotherTaskTailed();
MemBlock* const mb = m_nextTask.getFreeBlock();
m_stats.waitFreeMemoryTime+=localTime.secs(utils::Timer::resetCounter);
AutoPushBlock push(mb,m_nextTask);
//set metadata and read the data
mb->m_fileid = m_migratedFile->fileid();
mb->m_fileBlock = blockId++;
migratingFileSize -= mb->m_payload.read(sourceFile);
m_stats.transferTime+=localTime.secs(utils::Timer::resetCounter);
m_stats.dataVolume += mb->m_payload.size();
//we either read at full capacity (ie size=capacity) or if there different,
//it should be the end => migratingFileSize should be 0. If it not, error
if(mb->m_payload.size() != mb->m_payload.totalCapacity() && migratingFileSize>0){
......@@ -93,6 +104,7 @@ void DiskReadTask::execute(log::LogContext& lc) {
mb->markAsFailed(erroMsg,SEINTERNAL);
throw castor::tape::Exception(erroMsg);
}
m_stats.checkingErrorTime += localTime.secs(utils::Timer::resetCounter);
} //end of while(migratingFileSize>0)
}
catch(const castor::tape::exceptions::ErrorFlag&){
......@@ -107,9 +119,7 @@ void DiskReadTask::execute(log::LogContext& lc) {
//we have to pump the blocks anyway, mark them failed and then pass them back to TapeWrite
//Otherwise they would be stuck into TapeWriteTask free block fifo
using log::LogContext;
using log::Param;
LogContext::ScopedParam sp(lc, Param("blockID",blockId));
LogContext::ScopedParam sp0(lc, Param("exceptionCode",e.code()));
LogContext::ScopedParam sp1(lc, Param("exceptionMessage", e.getMessageValue()));
......@@ -133,5 +143,29 @@ void DiskReadTask::circulateAllBlocks(size_t fromBlockId){
++blockId;
} //end of while
}
//------------------------------------------------------------------------------
// logWithStat
//------------------------------------------------------------------------------
void DiskReadTask::logWithStat(int level,const std::string& msg,log::LogContext& lc){
log::ScopedParamContainer params(lc);
params.add("transferTime", m_stats.transferTime)
.add("closingTime", m_stats.closingTime)
.add("checksumingTime",m_stats.checksumingTime)
.add("waitDataTime",m_stats.waitDataTime)
.add("waitReportingTime",m_stats.waitReportingTime)
.add("checkingErrorTime",m_stats.checkingErrorTime)
.add("openingTime",m_stats.openingTime)
.add("payloadTransferSpeedMB/s",
1.0*m_stats.dataVolume/1024/1024/m_stats.transferTime)
.add("FILEID",m_migratedFile->fileid())
.add("path",m_migratedFile->path());
lc.log(level,msg);
}
const DiskStats DiskReadTask::getTaskStats() const{
return m_stats;
}
}}}}
......@@ -25,9 +25,11 @@
#include "castor/tape/tapeserver/daemon/DataPipeline.hpp"
#include "castor/tape/tapeserver/daemon/DataConsumer.hpp"
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tapeserver/threading/AtomicCounter.hpp"
#include "castor/log/LogContext.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
......@@ -45,13 +47,37 @@ public:
castor::tape::threading::AtomicFlag& errorFlag);
void execute(log::LogContext& lc);
/**
* Return the stats of the tasks. Should be call after execute
* (otherwise, it is pointless)
* @return
*/
const DiskStats getTaskStats() const;
private:
/**
* Stats to measue how long it takes to write on disk
*/
DiskStats m_stats;
/**
* Throws an exception if m_errorFlag is set
*/
void hasAnotherTaskTailed() const {
//if a task has signaled an error, we stop our job
if(m_errorFlag){
throw castor::tape::exceptions::ErrorFlag();
}
}
/**
* log into lc all m_stats parameters with the given message at the
* given level
* @param level
* @param message
*/
void logWithStat(int level,const std::string& msg,log::LogContext& lc) ;
void circulateAllBlocks(size_t fromBlockId);
/**
* The task (a TapeWriteTask) that will handle the read blocks
......
......@@ -113,19 +113,28 @@ DiskReadTask* DiskReadThreadPool::popAndRequestMore(castor::log::LogContext &lc)
}
return vrp.value;
}
void DiskReadThreadPool::addThreadStats(const DiskStats& other){
castor::tape::threading::MutexLocker lock(&m_statAddingProtection);
m_pooldStat+=other;
}
//------------------------------------------------------------------------------
// DiskReadThreadPool::DiskReadWorkerThread::run
// DiskReadWorkerThread::run
//------------------------------------------------------------------------------
void DiskReadThreadPool::DiskReadWorkerThread::run() {
m_lc.pushOrReplace(log::Param("thread", "DiskRead"));
m_lc.pushOrReplace(log::Param("threadID",m_threadID));
m_lc.log(LOG_DEBUG, "DiskReadWorkerThread Running");
std::auto_ptr<DiskReadTask> task;
utils::Timer localTime;
while(1) {
task.reset( m_parent.popAndRequestMore(m_lc));
m_threadStat.waitInstructionsTime += localTime.secs(utils::Timer::resetCounter);
if (NULL!=task.get()) {
task->execute(m_lc);
m_threadStat += task->getTaskStats();
}
else {
break;
......@@ -137,8 +146,27 @@ void DiskReadThreadPool::DiskReadWorkerThread::run() {
m_parent.m_injector->finish();
m_lc.log(LOG_INFO, "Signaled to task injector the end of disk read threads");
}
m_lc.log(LOG_INFO, "Finishing of DiskReadWorkerThread");
m_parent.addThreadStats(m_threadStat);
logWithStat(LOG_INFO, "Finishing of DiskReadWorkerThread");
}
//------------------------------------------------------------------------------
// DiskReadWorkerThread::logWithStat
//------------------------------------------------------------------------------
void DiskReadThreadPool::DiskReadWorkerThread::
logWithStat(int level, const std::string& message){
log::ScopedParamContainer params(m_lc);
params.add("threadTransferTime", m_threadStat.transferTime)
.add("threadChecksumingTime",m_threadStat.checksumingTime)
.add("threadWaitDataTime",m_threadStat.waitDataTime)
.add("threadWaitReportingTime",m_threadStat.waitReportingTime)
.add("threadCheckingErrorTime",m_threadStat.checkingErrorTime)
.add("threadOpeningTime",m_threadStat.openingTime)
.add("threadClosingTime", m_threadStat.closingTime)
.add("threaDataVolumeInMB", 1.0*m_threadStat.dataVolume/1024/1024)
.add("threadPayloadTransferSpeedMB/s",
1.0*m_threadStat.dataVolume/1024/1024/m_threadStat.transferTime);
m_lc.log(level,message);
}
}}}}
......@@ -19,7 +19,7 @@
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
......@@ -106,6 +106,23 @@ private:
*/
DiskReadTask* popAndRequestMore(castor::log::LogContext & lc);
/**
* When a thread finishm it call this function to Add its stats to one one of the
* Threadpool
* @param threadStats
*/
void addThreadStats(const DiskStats& stats);
/**
To protect addThreadStats from concurrent calls
*/
castor::tape::threading::Mutex m_statAddingProtection;
/**
* Aggregate all threads' stats
*/
DiskStats m_pooldStat;
/**
* Subclass of the thread pool's worker thread.
*/
......@@ -119,6 +136,11 @@ private:
void start() { castor::tape::threading::Thread::start(); }
void wait() { castor::tape::threading::Thread::wait(); }
private:
void logWithStat(int level, const std::string& message);
/*
* For measuring how long are the the different steps
*/
DiskStats m_threadStat;
/** Pointer to the thread pool, allowing calls to popAndRequestMore,
* and calling finish() on the task injector when the last thread
......
/******************************************************************************
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2014 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
*
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#pragma once
namespace castor {
namespace tape {
namespace tapeserver {
namespace daemon {
/**
* Structure holding the timers and stats for the tape session. We use doubles,
* for time and all measurements are in seconds or uint64_t for bytes.
*/
struct DiskStats {
/** Mounting time, in seconds */
double openingTime;
/** Mounting time, in seconds */
double closingTime;
/** Cumulated time spent computing checksums */
double checksumingTime;
/** Cumulated time spent transfering data with the disk */
double transferTime;
/** Closing time, in seconds */
//double closingTime;
/** Cumulated time spent waiting for data blocks. */
double waitDataTime;
/** Cumulated time spent waiting for free memory. */
double waitFreeMemoryTime;
/** Cumulated time spent by the tape thread waiting for a task. */
double waitInstructionsTime;
/** Cumulated time spent reporting */
double waitReportingTime;
/** Cumulated time spent reporting */
double checkingErrorTime;
/** Cumulated data volume (actual payload), in bytes. */
uint64_t dataVolume;
/** Count of files actually transfered in the session. */
uint64_t filesCount;
/** Constructor: all defaults are zero */
DiskStats(): openingTime(0.0),
closingTime(0.),
checksumingTime(0.0),
transferTime(0.0),
//closingTime(0.),
waitDataTime(0.0),
waitFreeMemoryTime(0.0),
waitInstructionsTime(0.0),
waitReportingTime(0.0),
checkingErrorTime(0),
dataVolume(0),
filesCount(0) {}
/** Accumulate contents of another stats block */
void operator+=(const DiskStats& other) {
openingTime += other.openingTime;
closingTime += other.closingTime;
checksumingTime += other.checksumingTime;
transferTime += other.transferTime;
//closingTime += other.closingTime;
waitDataTime += other.waitDataTime;
waitFreeMemoryTime += other.waitFreeMemoryTime;
waitInstructionsTime += other.waitInstructionsTime;
waitReportingTime += other.waitReportingTime;
checkingErrorTime += other.checkingErrorTime;
filesCount += other.filesCount;
dataVolume += other.dataVolume;
}
};
}}}}
......@@ -25,6 +25,8 @@
#include "castor/tape/tapeserver/daemon/AutoReleaseBlock.hpp"
#include "castor/tape/tapeserver/daemon/MemBlock.hpp"
#include "castor/log/LogContext.hpp"
#include "castor/tape/utils/Timer.hpp"
namespace castor {
namespace tape {
namespace tapeserver {
......@@ -44,12 +46,16 @@ m_recallingFile(file),m_memManager(mm){
bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc) {
using log::LogContext;
using log::Param;
utils::Timer localTime;
try{
tape::diskFile::WriteFile ourFile(m_recallingFile->path());
m_stats.openingTime+=localTime.secs(utils::Timer::resetCounter);
int blockId = 0;
unsigned long checksum = Payload::zeroAdler32();
while(1) {
if(MemBlock* const mb = m_fifo.pop()) {
m_stats.waitDataTime+=localTime.secs(utils::Timer::resetCounter);
AutoReleaseBlock<RecallMemoryManager> releaser(mb,m_memManager);
if(mb->isCanceled()) {
// If the tape side got canceled, we report nothing and count
......@@ -60,17 +66,26 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc) {
//will throw (thus exiting the loop) if something is wrong
checkErrors(mb,blockId,lc);
m_stats.checkingErrorTime += localTime.secs(utils::Timer::resetCounter);
m_stats.dataVolume+=mb->m_payload.size();
mb->m_payload.write(ourFile);
m_stats.transferTime+=localTime.secs(utils::Timer::resetCounter);
checksum = mb->m_payload.adler32(checksum);
m_stats.checksumingTime+=localTime.secs(utils::Timer::resetCounter);
blockId++;
} //end if block non NULL
else {
break;
}
} //end of while(1)
lc.log(LOG_DEBUG, "File successfully transfered.");
reporter.reportCompletedJob(*m_recallingFile,checksum);
m_stats.waitReportingTime+=localTime.secs(utils::Timer::resetCounter);;
logWithStat(LOG_DEBUG, "File successfully transfered to disk",lc);
return true;
} //end of try
catch(const castor::exception::Exception& e){
......@@ -156,5 +171,29 @@ void DiskWriteTask::releaseAllBlock(){
}
}
//------------------------------------------------------------------------------
// getTiming
//------------------------------------------------------------------------------
const DiskStats DiskWriteTask::getTaskStats() const{
return m_stats;
}
//------------------------------------------------------------------------------
// logWithStat
//------------------------------------------------------------------------------
void DiskWriteTask::logWithStat(int level,const std::string& msg,log::LogContext& lc){
log::ScopedParamContainer params(lc);
params.add("transferTime", m_stats.transferTime)
.add("checksumingTime",m_stats.checksumingTime)
.add("waitDataTime",m_stats.waitDataTime)
.add("waitReportingTime",m_stats.waitReportingTime)
.add("checkingErrorTime",m_stats.checkingErrorTime)
.add("openingTime",m_stats.openingTime)
.add("closingTime",m_stats.closingTime)
.add("payloadTransferSpeedMB/s",
1.0*m_stats.dataVolume/1024/1024/m_stats.transferTime)
.add("FILEID",m_recallingFile->fileid())
.add("path",m_recallingFile->path());
lc.log(level,msg);
}
}}}}
......@@ -29,7 +29,7 @@
#include "castor/tape/tapeserver/file/File.hpp"
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
#include "castor/tape/tapeserver/daemon/RecallReportPacker.hpp"
#include "castor/tape/tapeserver/daemon/DiskStats.hpp"
#include <memory>
namespace {
......@@ -81,7 +81,19 @@ public:
*/
virtual ~DiskWriteTask();
/**
* Return the stats of the tasks. Should be call after execute
* (otherwise, it is pointless)
* @return
*/
const DiskStats getTaskStats() const;
private:
/**
* Stats to measue how long it takes to write on disk
*/
DiskStats m_stats;
/**
* This function will check the consistency of the mem block and
* throw exception is something goes wrong
......@@ -91,6 +103,10 @@ private:
*/
void checkErrors(MemBlock* mb,int blockId,castor::log::LogContext& lc);
/**
* In case of error, it will spin on the blocks until we reach the end
* in order to push them back into the memory manager
*/
void releaseAllBlock();
/**
......@@ -112,6 +128,13 @@ private:
*/
castor::tape::threading::Mutex m_producerProtection;
/**
* log into lc all m_stats parameters with the given message at the
* given level
* @param level
* @param message
*/
void logWithStat(int level,const std::string& msg,log::LogContext& lc) ;
};
}}}}
......@@ -21,6 +21,7 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include "castor/tape/tapeserver/daemon/DiskWriteThreadPool.hpp"
#include "castor/tape/utils/Timer.hpp"
#include "log.h"
#include <memory>
#include <sstream>
......@@ -102,9 +103,33 @@ void DiskWriteThreadPool::finish() {
m_tasks.push(NULL);
}
}
//------------------------------------------------------------------------------
// constructor
//addThreadStats
//------------------------------------------------------------------------------
void DiskWriteThreadPool::addThreadStats(const DiskStats& other){
castor::tape::threading::MutexLocker lock(&m_statAddingProtection);
m_pooldStat+=other;
}
//------------------------------------------------------------------------------
//logWithStat
//------------------------------------------------------------------------------
void DiskWriteThreadPool::logWithStat(int level, const std::string& message){
log::ScopedParamContainer params(m_lc);
params.add("threadTransferTime", m_pooldStat.transferTime)
.add("threadChecksumingTime",m_pooldStat.checksumingTime)
.add("threadWaitDataTime",m_pooldStat.waitDataTime)
.add("threadWaitReportingTime",m_pooldStat.waitReportingTime)
.add("threadCheckingErrorTime",m_pooldStat.checkingErrorTime)
.add("threadOpeningTime",m_pooldStat.openingTime)
.add("threadClosingTime", m_pooldStat.closingTime)
.add("threaDataVolumeInMB", 1.0*m_pooldStat.dataVolume/1024/1024)
.add("threadPayloadTransferSpeedMB/s",
1.0*m_pooldStat.dataVolume/1024/1024/m_pooldStat.transferTime);
;
m_lc.log(level,message);
}
//------------------------------------------------------------------------------
// DiskWriteWorkerThread::run
//------------------------------------------------------------------------------
void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
typedef castor::log::LogContext::ScopedParam ScopedParam;
......@@ -112,36 +137,60 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
m_lc.pushOrReplace(log::Param("thread", "diskWrite"));
m_lc.pushOrReplace(log::Param("threadID", m_threadID));
m_lc.log(LOG_INFO, "Starting DiskWriteWorkerThread");
std::auto_ptr<DiskWriteTask> task;
utils::Timer localTime;
while(1) {
task.reset(m_parentThreadPool.m_tasks.pop());
m_threadStat.waitInstructionsTime+=localTime.secs(utils::Timer::resetCounter);
if (NULL!=task.get()) {
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc)) {
++m_parentThreadPool.m_failedWriteCount;
ScopedParam sp(m_lc, Param("errorCount", m_parentThreadPool.m_failedWriteCount));
m_lc.log(LOG_ERR, "Task failed: counting another error for this session");
}
m_threadStat+=task->getTaskStats();
} //end of task!=NULL
else {
m_lc.log(LOG_DEBUG,"DiskWriteWorkerThread exiting: no more work");
break;
}
} //enf of while(1)
logWithStat(LOG_INFO, "Finishing DiskWriteWorkerThread");
m_parentThreadPool.addThreadStats(m_threadStat);
if(0 == --m_parentThreadPool.m_nbActiveThread){
//Im the last Thread alive, report end of session
if(m_parentThreadPool.m_failedWriteCount==0){
m_parentThreadPool.m_reporter.reportEndOfSession();
m_lc.log(LOG_INFO, "As last exiting DiskWriteWorkerThread, reported a successful end of session");
m_parentThreadPool.logWithStat(LOG_INFO, "As last exiting DiskWriteWorkerThread, reported a successful end of session");
}
else{
m_parentThreadPool.m_reporter.reportEndOfSessionWithErrors("End of recall session with error(s)",SEINTERNAL);
ScopedParam sp(m_lc, Param("errorCount", m_parentThreadPool.m_failedWriteCount));
m_lc.log(LOG_ERR, "As last exiting DiskWriteWorkerThread, reported an end of session with errors");
m_parentThreadPool.logWithStat(LOG_ERR, "As last exiting DiskWriteWorkerThread, reported an end of session with errors");
}
}
m_lc.log(LOG_INFO, "Finishing DiskWriteWorkerThread");
}
//------------------------------------------------------------------------------
// DiskWriteWorkerThread::logWithStat
//------------------------------------------------------------------------------
void DiskWriteThreadPool::DiskWriteWorkerThread::
logWithStat(int level, const std::string& msg) {
log::ScopedParamContainer params(m_lc);
params.add("threadTransferTime", m_threadStat.transferTime)
.add("threadChecksumingTime",m_threadStat.checksumingTime)
.add("threadWaitDataTime",m_threadStat.waitDataTime)
.add("threadWaitReportingTime",m_threadStat.waitReportingTime)
.add("threadCheckingErrorTime",m_threadStat.checkingErrorTime)
.add("threadOpeningTime",m_threadStat.openingTime)
.add("threadClosingTime", m_threadStat.closingTime)
.add("threaDataVolumeInMB", 1.0*m_threadStat.dataVolume/1024/1024)
.add("threadPayloadTransferSpeedMB/s",
1.0*m_threadStat.dataVolume/1024/1024/m_threadStat.transferTime);
m_lc.log(level,msg);
}