Commit b15aa7f1 authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Reporting final states of migrations and recalls (unloading and unmounting)

parent b5534086
......@@ -25,6 +25,7 @@
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "serrno.h"
#include "common/DriveState.hpp"
#include <memory>
#include <numeric>
......@@ -95,6 +96,15 @@ void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int err
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSessionWithErrors(msg,errorCode));
}
//------------------------------------------------------------------------------
//reportTestGoingToEnd
//------------------------------------------------------------------------------
void MigrationReportPacker::reportTestGoingToEnd(){
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportTestGoingToEnd());
}
//------------------------------------------------------------------------------
//synchronousReportEndWithErrors
//------------------------------------------------------------------------------
......@@ -125,6 +135,7 @@ void MigrationReportPacker::reportDriveStatus(cta::DriveStatus status) {
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportDriveStatus::execute(MigrationReportPacker& parent){
parent.m_archiveMount->setDriveStatus(m_status);
if(m_status==cta::DriveStatus::Unmounting) parent.m_continue=false;
}
//------------------------------------------------------------------------------
......@@ -196,7 +207,6 @@ void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& r
usleep(500*1000);
}
}
reportPacker.m_continue=false;
}
//------------------------------------------------------------------------------
......@@ -227,7 +237,6 @@ void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationRepor
// by the end our process. To delay the latter, we sleep half a second here.
usleep(500*1000);
}
reportPacker.m_continue=false;
}
//------------------------------------------------------------------------------
//ReportError::execute
......@@ -312,11 +321,8 @@ void MigrationReportPacker::WorkerThread::run(){
// Drain the FIFO if necessary. We know that m_continue will be
// set by ReportEndofSessionWithErrors or ReportEndofSession
// TODO devise a more generic mechanism
while(m_parent.m_continue) {
while(m_parent.m_fifo.size()) {
std::unique_ptr<Report> rep (m_parent.m_fifo.pop());
if (dynamic_cast<ReportEndofSessionWithErrors *>(rep.get()) ||
dynamic_cast<ReportEndofSession *>(rep.get()))
m_parent.m_continue = false;
}
}
......
......@@ -84,6 +84,11 @@ public:
*/
virtual void reportEndOfSession();
/**
* Function for testing purposes. It is used to tell the report packer that this is the last report
*/
virtual void reportTestGoingToEnd();
/**
* Create into the MigrationReportPacker a report for an erroneous end of session
* @param msg The error message
......@@ -117,6 +122,11 @@ private:
m_successfulArchiveJob(std::move(successfulArchiveJob)) {}
virtual void execute(MigrationReportPacker& reportPacker);
};
class ReportTestGoingToEnd : public Report {
public:
ReportTestGoingToEnd() {}
virtual void execute(MigrationReportPacker& reportPacker) {reportPacker.m_continue=false;}
};
class ReportDriveStatus : public Report {
cta::DriveStatus m_status;
......
......@@ -99,6 +99,7 @@ namespace unitTests {
const tapeserver::drive::compressionStats statsCompress;
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
mrp.reportTestGoingToEnd();
mrp.waitThread(); //here
std::string temp = log.getLog();
......@@ -146,6 +147,7 @@ namespace unitTests {
const tapeserver::drive::compressionStats statsCompress;
mrp.reportFlush(statsCompress);
mrp.reportEndOfSession();
mrp.reportTestGoingToEnd();
mrp.waitThread();
std::string temp = log.getLog();
......@@ -197,6 +199,7 @@ namespace unitTests {
stats.toTape=(100000+1)/3;
mrp.reportFlush(stats);
mrp.reportEndOfSession();
mrp.reportTestGoingToEnd();
mrp.waitThread();
std::string temp = log.getLog();
......
......@@ -100,6 +100,15 @@ void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int
m_fifo.push(new ReportEndofSessionWithErrors(msg,error_code));
}
//------------------------------------------------------------------------------
//reportTestGoingToEnd
//------------------------------------------------------------------------------
void RecallReportPacker::reportTestGoingToEnd(){
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportTestGoingToEnd());
}
//------------------------------------------------------------------------------
//ReportSuccessful::execute
//------------------------------------------------------------------------------
......@@ -142,7 +151,7 @@ void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent)
//ReportEndofSession::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportEndofSession::goingToEnd(RecallReportPacker& packer) {
return packer.allThreadsDone();
return false;
}
//------------------------------------------------------------------------------
......@@ -156,6 +165,7 @@ void RecallReportPacker::ReportDriveStatus::execute(RecallReportPacker& parent){
//ReportDriveStatus::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportDriveStatus::goingToEnd(RecallReportPacker& packer) {
if(m_status==cta::DriveStatus::Unmounting) return true;
return false;
}
......@@ -188,7 +198,7 @@ void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacke
//ReportEndofSessionWithErrors::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportEndofSessionWithErrors::goingToEnd(RecallReportPacker& packer) {
return packer.allThreadsDone();
return false;
}
//------------------------------------------------------------------------------
......
......@@ -69,6 +69,11 @@ public:
*/
virtual void reportEndOfSession();
/**
* Function for testing purposes. It is used to tell the report packer that this is the last report
*/
virtual void reportTestGoingToEnd();
/**
* Create into the MigrationReportPacker a report for an erroneous end of session
* @param msg The error message
......@@ -120,7 +125,13 @@ private:
public:
virtual ~Report(){}
virtual void execute(RecallReportPacker& packer)=0;
virtual bool goingToEnd(RecallReportPacker& packer) {return false;};
virtual bool goingToEnd(RecallReportPacker& packer) {return false;}
};
class ReportTestGoingToEnd : public Report {
public:
ReportTestGoingToEnd() {}
virtual void execute(RecallReportPacker& reportPacker) {}
virtual bool goingToEnd(RecallReportPacker& packer) {return true;}
};
class ReportSuccessful : public Report {
/**
......
......@@ -29,6 +29,7 @@
#include "objectstore/BackendVFS.hpp"
#include "scheduler/testingMocks/MockRetrieveMount.hpp"
#include "scheduler/testingMocks/MockRetrieveJob.hpp"
#include "common/DriveState.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
......@@ -100,6 +101,7 @@ TEST_F(castor_tape_tapeserver_daemonTest, RecallReportPackerNominal) {
rrp.setTapeDone();
rrp.reportEndOfSession();
rrp.reportTestGoingToEnd();
rrp.waitThread();
std::string temp = log.getLog();
......@@ -154,6 +156,7 @@ TEST_F(castor_tape_tapeserver_daemonTest, RecallReportPackerBadBadEnd) {
rrp.setTapeDone();
rrp.reportEndOfSession();
rrp.reportTestGoingToEnd();
rrp.waitThread();
const std::string temp = log.getLog();
......
......@@ -260,3 +260,48 @@
fun:start_thread
fun:clone
}
{
recall_packer
Memcheck:Leak
fun:_Znwm
fun:_ZN6castor4tape10tapeserver6daemon18RecallReportPacker18reportCompletedJobESt10unique_ptrIN3cta11RetrieveJobESt14default_deleteIS6_EE
fun:_ZN6castor4tape10tapeserver6daemon13DiskWriteTask7executeERNS2_18RecallReportPackerERNS_3log10LogContextERNS0_8diskFile15DiskFileFactoryERNS2_14RecallWatchDogE
fun:_ZN6castor4tape10tapeserver6daemon19DiskWriteThreadPool21DiskWriteWorkerThread3runEv
fun:_ZN6castor6server6Thread14pthread_runnerEPv
fun:start_thread
fun:clone
}
{
migration_packer
Memcheck:Leak
fun:_Znwm
fun:_ZN6castor4tape10tapeserver6daemon21MigrationReportPacker28reportEndOfSessionWithErrorsESsi
fun:_ZN6castor4tape10tapeserver6daemon21TapeWriteSingleThread3runEv
fun:_ZN6castor6server6Thread14pthread_runnerEPv
fun:start_thread
fun:clone
}
{
recall_packer_2
Memcheck:Leak
fun:_Znwm
fun:_ZN6castor4tape10tapeserver6daemon18RecallReportPacker28reportEndOfSessionWithErrorsESsi
fun:_ZN6castor4tape10tapeserver6daemon19DiskWriteThreadPool21DiskWriteWorkerThread3runEv
fun:_ZN6castor6server6Thread14pthread_runnerEPv
fun:start_thread
fun:clone
}
{
recall_packer_3
Memcheck:Leak
fun:_Znwm
fun:_ZN6castor4tape10tapeserver6daemon18RecallReportPacker18reportEndOfSessionEv
fun:_ZN6castor4tape10tapeserver6daemon19DiskWriteThreadPool21DiskWriteWorkerThread3runEv
fun:_ZN6castor6server6Thread14pthread_runnerEPv
fun:start_thread
fun:clone
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment