Commit 09bf8783 authored by Victor Kotlyar's avatar Victor Kotlyar
Browse files

Ported commit 45b1a34627318967e2ca23f3bcebec1d67525ded from castor/master

CASTOR-4889: tapeserverd's end of session summary could indicate the
files still open

Fixed.

Add reports for still open files in the "Tape session finished"
log message to indicate problematic disk servers. It should be
only visible in case of kill of the data transfer session.
parent e138bd66
......@@ -47,7 +47,7 @@ m_nextTask(destination),m_archiveJob(archiveJob),
// DiskReadTask::execute
//------------------------------------------------------------------------------
void DiskReadTask::execute(cta::log::LogContext& lc, diskFile::DiskFileFactory & fileFactory,
MigrationWatchDog & watchdog) {
MigrationWatchDog & watchdog, const int threadID) {
using cta::log::LogContext;
using cta::log::Param;
......@@ -83,6 +83,9 @@ void DiskReadTask::execute(cta::log::LogContext& lc, diskFile::DiskFileFactory
LogContext::ScopedParam sp(lc, Param("fileId",m_archiveJob->archiveFile.archiveFileID));
lc.log(cta::log::INFO,"Opened disk file for read");
watchdog.addParameter(cta::log::Param("stillOpenFileForThread"+
std::to_string((long long)threadID), sourceFile->URL()));
while(migratingFileSize>0){
......@@ -169,6 +172,8 @@ void DiskReadTask::execute(cta::log::LogContext& lc, diskFile::DiskFileFactory
//deal here the number of mem block
circulateAllBlocks(blockId,mb);
} //end of catch
watchdog.deleteParameter("stillOpenFileForThread"+
std::to_string((long long)threadID));
}
//------------------------------------------------------------------------------
......
......@@ -49,8 +49,8 @@ public:
cta::threading::AtomicFlag& errorFlag);
void execute(cta::log::LogContext& lc, diskFile::DiskFileFactory & fileFactory,
MigrationWatchDog & watchdog);
/**
MigrationWatchDog & watchdog, const int threadID);
/**
* Return the stats of the tasks. Should be call after execute
* (otherwise, it is pointless)
* @return
......
......@@ -138,7 +138,7 @@ namespace unitTests{
castor::messages::TapeserverProxyDummy tspd;
MockMigrationWatchDog mmwd(1.0, 1.0, tspd, "", lc);
drt.execute(lc,fileFactory,mmwd);
drt.execute(lc,fileFactory,mmwd, 0);
ASSERT_EQ(original_checksum,ftwt.getChecksum());
delete ftwt.getFreeBlock();
......
......@@ -168,7 +168,7 @@ void DiskReadThreadPool::DiskReadWorkerThread::run() {
task.reset( m_parent.popAndRequestMore(m_lc));
m_threadStat.waitInstructionsTime += localTime.secs(cta::utils::Timer::resetCounter);
if (NULL!=task.get()) {
task->execute(m_lc, m_diskFileFactory,m_parent.m_watchdog);
task->execute(m_lc, m_diskFileFactory,m_parent.m_watchdog, m_threadID);
m_threadStat += task->getTaskStats();
}
else {
......
......@@ -44,7 +44,8 @@ m_retrieveJob(retrieveJob),m_memManager(mm){
// DiskWriteTask::execute
//------------------------------------------------------------------------------
bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext& lc,
diskFile::DiskFileFactory & fileFactory, RecallWatchDog & watchdog) {
diskFile::DiskFileFactory & fileFactory, RecallWatchDog & watchdog,
const int threadID) {
using cta::log::LogContext;
using cta::log::Param;
cta::utils::Timer localTime;
......@@ -92,6 +93,8 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&
URLcontext.add("actualURL", writeFile->URL());
lc.log(cta::log::INFO, "Opened disk file for writing");
m_stats.openingTime+=localTime.secs(cta::utils::Timer::resetCounter);
watchdog.addParameter(cta::log::Param("stillOpenFileForThread"+
std::to_string((long long)threadID), writeFile->URL()));
}
// Write the data.
......@@ -120,6 +123,9 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&
}
} //end of while(1)
logWithStat(cta::log::INFO, "File successfully transfered to disk",lc);
watchdog.deleteParameter("stillOpenFileForThread"+
std::to_string((long long)threadID));
m_retrieveJob->transferredSize = m_stats.dataVolume;
m_retrieveJob->transferredChecksumType = "ADLER32";
{
......@@ -161,7 +167,9 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,cta::log::LogContext&
m_retrieveJob->failureMessage = e.getMessageValue();
reporter.reportFailedJob(std::move(m_retrieveJob));
watchdog.deleteParameter("stillOpenFileForThread"+
std::to_string((long long)threadID));
//got an exception, return false
return false;
}
......
......@@ -57,7 +57,8 @@ public:
* @return true if the file has been successfully written false otherwise.
*/
virtual bool execute(RecallReportPacker& reporter,cta::log::LogContext& lc,
diskFile::DiskFileFactory & fileFactory, RecallWatchDog & watchdog) ;
diskFile::DiskFileFactory & fileFactory, RecallWatchDog & watchdog,
const int threadID);
/**
* Allows client code to return a reusable memory block. Should not been called
......
......@@ -125,7 +125,7 @@ namespace unitTests{
t.pushDataBlock(NULL);
castor::messages::TapeserverProxyDummy tspd;
RecallWatchDog rwd(1,1,tspd,"", lc);
t.execute(report,lc,fileFactory,rwd);
t.execute(report,lc,fileFactory,rwd, 0);
ASSERT_EQ(1, report.failedJobs);
}
}
......
......@@ -164,7 +164,7 @@ void DiskWriteThreadPool::DiskWriteWorkerThread::run() {
m_threadStat.waitInstructionsTime+=localTime.secs(cta::utils::Timer::resetCounter);
if (NULL!=task.get()) {
if(false==task->execute(m_parentThreadPool.m_reporter,m_lc,
m_diskFileFactory, m_parentThreadPool.m_watchdog)) {
m_diskFileFactory, m_parentThreadPool.m_watchdog, m_threadID)) {
++m_parentThreadPool.m_failedWriteCount;
cta::log::ScopedParamContainer logParams(m_lc);
logParams.add("errorCount", m_parentThreadPool.m_failedWriteCount);
......
......@@ -133,7 +133,12 @@ protected:
/**
* One offs parameters to be sent to the initial process
*/
cta::threading::BlockingQueue<cta::log::Param> m_paramsQueue;
cta::threading::BlockingQueue<cta::log::Param> m_toAddParamsQueue;
/**
* One offs parameters to be deleted from the initial process
*/
cta::threading::BlockingQueue<std::string> m_toDeleteParamsQueue;
/**
* Map of all error counts
......@@ -222,13 +227,25 @@ protected:
std::list<Param> params;
// This is thread safe because we are the only consumer:
// a non-zero size guarantees we will find something.
while (m_paramsQueue.size())
params.push_back(m_paramsQueue.pop());
while (m_toAddParamsQueue.size())
params.push_back(m_toAddParamsQueue.pop());
if (params.size()) {
m_initialProcess.addLogParams(m_driveUnitName, params);
}
}
// Send any one-off parameter to delete
{
std::list<std::string> params;
// This is thread safe because we are the only consumer:
// a non-zero size guarantees we will find something.
while (m_toDeleteParamsQueue.size())
params.push_back(m_toDeleteParamsQueue.pop());
if (params.size()) {
m_initialProcess.deleteLogParams(m_driveUnitName, params);
}
}
//heartbeat to notify activity to the mother
// and transmit statistics
if(m_reportTimer.secs() > m_reportPeriod){
......@@ -250,11 +267,19 @@ protected:
reportStats();
// Flush the one-of parameters one last time.
std::list<Param> params;
while (m_paramsQueue.size())
params.push_back(m_paramsQueue.pop());
while (m_toAddParamsQueue.size())
params.push_back(m_toAddParamsQueue.pop());
if (params.size()) {
m_initialProcess.addLogParams(m_driveUnitName, params);
}
std::list<std::string> paramsToDelete;
// Flush the one-of parameters one last time.
while (m_toDeleteParamsQueue.size())
paramsToDelete.push_back(m_toDeleteParamsQueue.pop());
if (params.size()) {
m_initialProcess.deleteLogParams(m_driveUnitName, paramsToDelete);
}
}
// We have a race condition here between the processing of this message by
// the initial process and the printing of the end-of-session log, triggered
......@@ -341,7 +366,14 @@ protected:
* Queue new parameter to be sent asynchronously to the main thread.
*/
void addParameter (const cta::log::Param & param) {
m_paramsQueue.push(param);
m_toAddParamsQueue.push(param);
}
/**
* Queue the parameter to be deleted asynchronously in the main thread.
*/
void deleteParameter (const std::string & param) {
m_toDeleteParamsQueue.push(param);
}
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment