Commit 656aa531 authored by Eric Cano's avatar Eric Cano
Browse files

Partial implementation of drive status reporting.

The report packer should now also be referenced in the tape recall thread, so it can report the drive status.
testingMocks/MockRetrieveMount.hpp needs to be improved so it handles les end of session correctly.
Then the mechanism has to be mirrored in Archives (simpler case).
parent a13ffdd1
......@@ -105,6 +105,13 @@ void cta::ArchiveMount::complete() {
m_sessionRunning = false;
}
//------------------------------------------------------------------------------
// abort
//------------------------------------------------------------------------------
void cta::ArchiveMount::abort() {
complete();
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
......
......@@ -77,13 +77,13 @@ namespace cta {
/**
* Indicates that the mount was completed.
*
* @param checksumOfTransfer The adler-32 checksum of the file as calculated
* during the execution of the job.
* @param fileSizeOfTransfer The size of the file as calculated during the
* execution of the job.
*/
virtual void complete();
/**
* Indicates that the mount was cancelled.
*/
virtual void abort();
CTA_GENERATE_EXCEPTION_CLASS(SessionNotRunning);
/**
......
......@@ -1586,8 +1586,24 @@ void OStoreDB::RetrieveMount::complete(time_t completionTime) {
m_agent.fetch();
m_agent.removeFromOwnership(t.getAddressIfSet());
m_agent.commit();
}
}
void OStoreDB::RetrieveMount::setDriveStatus(cta::DriveStatus status, time_t completionTime) {
// We just report the drive status as instructed by the tape thread.
// Get the drive register
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
objectstore::ScopedExclusiveLock drl(dr);
dr.fetch();
// Reset the drive state.
dr.reportDriveStatus(mountInfo.drive, mountInfo.logicalLibrary,
status, completionTime,
cta::MountType::RETRIEVE, 0,
0, 0, 0, "", "");
dr.commit();
}
void OStoreDB::ArchiveJob::fail() {
if (!m_jobOwned)
......
......@@ -118,6 +118,7 @@ public:
virtual const MountInfo & getMountInfo();
virtual std::unique_ptr<RetrieveJob> getNextJob();
virtual void complete(time_t completionTime);
virtual void setDriveStatus(cta::DriveStatus status, time_t completionTime);
};
/* === Retrieve Job handling ============================================== */
......
......@@ -84,13 +84,57 @@ std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob() {
}
//------------------------------------------------------------------------------
// complete
// tapeComplete())
//------------------------------------------------------------------------------
void cta::RetrieveMount::tapeComplete() {
m_tapeRunning = false;
if (!m_diskRunning) {
// Just set the session as complete in the DB.
m_dbMount->complete(time(NULL));
// and record we are done with the mount
m_sessionRunning = false;
} else {
// This is a special case: we have to report the tape server is draining
// its memory to disk
setDriveStatus(cta::DriveStatus::DrainingToDisk);
}
}
//------------------------------------------------------------------------------
// diskComplete())
//------------------------------------------------------------------------------
void cta::RetrieveMount::diskComplete() {
m_diskRunning = false;
if (!m_tapeRunning) {
// Just set the session as complete in the DB.
cta::SchedulerDatabase::RetrieveMount * ptr = m_dbMount.get();
ptr=ptr;
m_dbMount->complete(time(NULL));
// and record we are done with the mount
m_sessionRunning = false;
}
}
//------------------------------------------------------------------------------
// abort())
//------------------------------------------------------------------------------
void cta::RetrieveMount::abort() {
diskComplete();
tapeComplete();
}
//------------------------------------------------------------------------------
// setDriveStatus()
//------------------------------------------------------------------------------
void cta::RetrieveMount::setDriveStatus(cta::DriveStatus status) {
m_dbMount->setDriveStatus(status, time(NULL));
}
//------------------------------------------------------------------------------
// bothSidesComplete())
//------------------------------------------------------------------------------
void cta::RetrieveMount::complete() {
// Just set the session as complete in the DB.
m_dbMount->complete(time(NULL));
// and record we are done with the mount
m_sessionRunning = false;
bool cta::RetrieveMount::bothSidesComplete() {
return !(m_diskRunning || m_tapeRunning);
}
//------------------------------------------------------------------------------
......
......@@ -80,17 +80,37 @@ namespace cta {
* @return The mount transaction id.
*/
virtual uint32_t getNbFiles() const;
/**
* Report a drive status change
*/
virtual void setDriveStatus(cta::DriveStatus status);
/**
* Indicates that the mount was completed.
*
* @param checksumOfTransfer The adler-32 checksum of the file as calculated
* during the execution of the job.
* @param fileSizeOfTransfer The size of the file as calculated during the
* execution of the job.
* Indicates that the disk thread of the mount was completed. This
* will implicitly trigger the transition from DrainingToDisk to Up if necessary.
*/
virtual void complete();
virtual void diskComplete();
/**
* Indicates that the tape thread of the mount was completed. This
* will implicitly trigger the transition from Unmounting to either Up or
* DrainingToDisk, depending on the the disk thread's status.
*/
virtual void tapeComplete();
/**
* Indicates that the we should cancel the mount (equivalent to diskComplete
* + tapeComeplete).
*/
virtual void abort();
/**
* Tests whether all threads are complete
* @return true if both tape and disk are complete.
*/
virtual bool bothSidesComplete();
CTA_GENERATE_EXCEPTION_CLASS(SessionNotRunning);
/**
* Job factory
......@@ -116,6 +136,18 @@ namespace cta {
* Internal tracking of the session completion
*/
bool m_sessionRunning;
/**
* Internal tracking of the tape thread
*/
bool m_tapeRunning;
/**
* Internal tracking of the disk thread
*/
bool m_diskRunning;
}; // class RetrieveMount
......
......@@ -880,6 +880,8 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(
Utils::getShortHostname(),
time(NULL))));
internalRet->m_sessionRunning = true;
internalRet->m_diskRunning = true;
internalRet->m_tapeRunning = true;
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
std::string debug=ex.getMessageValue();
......
......@@ -250,6 +250,7 @@ public:
virtual const MountInfo & getMountInfo() = 0;
virtual std::unique_ptr<RetrieveJob> getNextJob() = 0;
virtual void complete(time_t completionTime) = 0;
virtual void setDriveStatus(DriveStatus status, time_t completionTime) = 0;
virtual ~RetrieveMount() {}
uint32_t nbFilesCurrentlyOnTape;
};
......
......@@ -59,14 +59,9 @@ namespace cta {
virtual uint32_t getNbFiles() const = 0;
/**
* Indicates that the mount was completed.
*
* @param checksumOfTransfer The adler-32 checksum of the file as calculated
* during the execution of the job.
* @param fileSizeOfTransfer The size of the file as calculated during the
* execution of the job.
* Indicates that the mount was aborted.
*/
virtual void complete() = 0;
virtual void abort() = 0;
/**
* Destructor.
......
......@@ -49,7 +49,15 @@ namespace cta {
}
virtual void complete() { completes ++; }
virtual void abort() { completes ++; }
void diskComplete() {};
void tapeComplete() {};
bool bothSidesComplete() { return false; }
private:
std::list<std::unique_ptr<cta::RetrieveJob>> m_jobs;
......
......@@ -201,7 +201,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
log::LogContext::ScopedParam sp1(lc, log::Param("errorMessage", "Aborted: empty recall mount"));
log::LogContext::ScopedParam sp2(lc, log::Param("errorCode", SEINTERNAL));
try {
retrieveMount->complete();
retrieveMount->abort();
log::LogContext::ScopedParam sp08(lc, log::Param("MountTransactionId", retrieveMount->getMountTransactionId()));
log::LogContext::ScopedParam sp11(lc, log::Param("errorMessage", "Aborted: empty recall mount"));
log::LogContext::ScopedParam sp12(lc, log::Param("errorCode", SEINTERNAL));
......@@ -371,7 +371,7 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const DriveConf
std::stringstream errMsg;
errMsg << "Drive not found on this path" << lc;
mount->complete();
mount->abort();
log::LogContext::ScopedParam sp10(lc, log::Param("tapebridgeTransId", mount->getMountTransactionId()));
log::LogContext::ScopedParam sp13(lc, log::Param("errorMessage", errMsg.str()));
log::LogContext::ScopedParam sp14(lc, log::Param("errorCode", SEINTERNAL));
......@@ -385,7 +385,7 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const DriveConf
std::stringstream errMsg;
errMsg << "Error looking to path to tape drive: " << lc;
mount->complete();
mount->abort();
log::LogContext::ScopedParam sp11(lc, log::Param("tapebridgeTransId", mount->getMountTransactionId()));
log::LogContext::ScopedParam sp14(lc, log::Param("errorMessage", errMsg.str()));
log::LogContext::ScopedParam sp15(lc, log::Param("errorCode", SEINTERNAL));
......@@ -398,7 +398,7 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const DriveConf
std::stringstream errMsg;
errMsg << "Unexpected exception while looking for drive" << lc;
mount->complete();
mount->abort();
log::LogContext::ScopedParam sp10(lc, log::Param("tapebridgeTransId", mount->getMountTransactionId()));
log::LogContext::ScopedParam sp13(lc, log::Param("errorMessage", errMsg.str()));
log::LogContext::ScopedParam sp14(lc, log::Param("errorCode", SEINTERNAL));
......@@ -418,7 +418,7 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const DriveConf
std::stringstream errMsg;
errMsg << "Error opening tape drive" << lc;
mount->complete();
mount->abort();
log::LogContext::ScopedParam sp11(lc, log::Param("tapebridgeTransId", mount->getMountTransactionId()));
log::LogContext::ScopedParam sp14(lc, log::Param("errorMessage", errMsg.str()));
log::LogContext::ScopedParam sp15(lc, log::Param("errorCode", SEINTERNAL));
......@@ -431,7 +431,7 @@ castor::tape::tapeserver::daemon::DataTransferSession::findDrive(const DriveConf
std::stringstream errMsg;
errMsg << "Unexpected exception while opening drive" << lc;
mount->complete();
mount->abort();
log::LogContext::ScopedParam sp10(lc, log::Param("tapebridgeTransId", mount->getMountTransactionId()));
log::LogContext::ScopedParam sp13(lc, log::Param("errorMessage", errMsg.str()));
log::LogContext::ScopedParam sp14(lc, log::Param("errorCode", SEINTERNAL));
......
......@@ -45,6 +45,7 @@ namespace unitTests{
virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
virtual void setDriveStatus(cta::DriveStatus status, time_t completionTime) { throw std::runtime_error("Not implemented"); }
};
class TestingRetrieveMount: public cta::RetrieveMount {
......
......@@ -38,6 +38,7 @@ namespace unitTests{
virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
virtual void setDriveStatus(cta::DriveStatus status, time_t completionTime) { throw std::runtime_error("Not implemented"); }
};
class TestingRetrieveMount: public cta::RetrieveMount {
......
......@@ -46,8 +46,10 @@ namespace daemon {
//Constructor
//------------------------------------------------------------------------------
RecallReportPacker::RecallReportPacker(cta::RetrieveMount *retrieveMount, log::LogContext lc):
ReportPackerInterface<detail::Recall>(lc),
m_workerThread(*this),m_errorHappened(false), m_retrieveMount(retrieveMount){
ReportPackerInterface<detail::Recall>(lc),
m_workerThread(*this), m_errorHappened(false), m_retrieveMount(retrieveMount),
m_tapeThreadComplete(false), m_diskThreadComplete(false)
{
}
//------------------------------------------------------------------------------
......@@ -79,6 +81,14 @@ void RecallReportPacker::reportEndOfSession(){
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportEndofSession());
}
//------------------------------------------------------------------------------
//setDriveStatus
//------------------------------------------------------------------------------
void RecallReportPacker::setDriveStatus(cta::DriveStatus status) {
m_retrieveMount->setDriveStatus(status);
}
//------------------------------------------------------------------------------
//reportEndOfSessionWithErrors
......@@ -99,43 +109,52 @@ void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
//ReportEndofSession::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSession::execute(RecallReportPacker& parent){
if(!parent.errorHappened()){
parent.m_retrieveMount->complete();
parent.m_lc.log(LOG_INFO,"Nominal RecallReportPacker::EndofSession has been reported");
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(log::Param("status","success"));
// We have a race condition here between the processing of this message by
// the initial process and the printing of the end-of-session log, triggered
// by the end our process. To delay the latter, we sleep half a second here.
usleep(500*1000);
}
if(!parent.errorHappened()){
parent.m_retrieveMount->diskComplete();
parent.m_lc.log(LOG_INFO,"Nominal RecallReportPacker::EndofSession has been reported");
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(log::Param("status","success"));
// We have a race condition here between the processing of this message by
// the initial process and the printing of the end-of-session log, triggered
// by the end our process. To delay the latter, we sleep half a second here.
usleep(500*1000);
}
else {
const std::string& msg ="RecallReportPacker::EndofSession has been reported but an error happened somewhere in the process";
parent.m_lc.log(LOG_ERR,msg);
parent.m_retrieveMount->complete();
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(log::Param("status","failure"));
// We have a race condition here between the processing of this message by
// the initial process and the printing of the end-of-session log, triggered
// by the end our process. To delay the latter, we sleep half a second here.
usleep(500*1000);
}
}
else {
const std::string& msg ="RecallReportPacker::EndofSession has been reported but an error happened somewhere in the process";
parent.m_lc.log(LOG_ERR,msg);
parent.m_retrieveMount->diskComplete();
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(log::Param("status","failure"));
// We have a race condition here between the processing of this message by
// the initial process and the printing of the end-of-session log, triggered
// by the end our process. To delay the latter, we sleep half a second here.
usleep(500*1000);
}
}
}
//------------------------------------------------------------------------------
//ReportEndofSession::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportEndofSession::goingToEnd(RecallReportPacker& packer) {
packer.setDiskDone();
return packer.allThreadsDone();
}
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacker& parent){
if(parent.m_errorHappened) {
parent.m_retrieveMount->complete();
parent.m_retrieveMount->diskComplete();
LogContext::ScopedParam(parent.m_lc,Param("errorCode",m_error_code));
parent.m_lc.log(LOG_ERR,m_message);
}
else{
const std::string& msg ="RecallReportPacker::EndofSessionWithErrors has been reported but NO error was detected during the process";
parent.m_lc.log(LOG_ERR,msg);
parent.m_retrieveMount->complete();
parent.m_retrieveMount->diskComplete();
}
if (parent.m_watchdog) {
parent.m_watchdog->addParameter(log::Param("status","failure"));
......@@ -145,6 +164,15 @@ void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacke
usleep(500*1000);
}
}
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportEndofSessionWithErrors::goingToEnd(RecallReportPacker& packer) {
packer.setDiskDone();
return packer.allThreadsDone();
}
//------------------------------------------------------------------------------
//ReportError::execute
//------------------------------------------------------------------------------
......@@ -153,6 +181,7 @@ void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
parent.m_lc.log(LOG_ERR,m_failedRetrieveJob->failureMessage);
m_failedRetrieveJob->failed();
}
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
//------------------------------------------------------------------------------
......@@ -171,7 +200,7 @@ void RecallReportPacker::WorkerThread::run(){
std::unique_ptr<Report> rep(m_parent.m_fifo.pop());
rep->execute(m_parent);
if(rep->goingToEnd()) {
if(rep->goingToEnd(m_parent)) {
endFound = true;
break;
}
......@@ -221,7 +250,7 @@ void RecallReportPacker::WorkerThread::run(){
if (!endFound) {
while (1) {
std::unique_ptr<Report> report(m_parent.m_fifo.pop());
if (report->goingToEnd())
if (report->goingToEnd(m_parent))
break;
}
}
......@@ -235,4 +264,25 @@ bool RecallReportPacker::errorHappened() {
return m_errorHappened || (m_watchdog && m_watchdog->errorHappened());
}
//------------------------------------------------------------------------------
//reportTapeDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setTapeDone() {
m_tapeThreadComplete = true;
}
//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
void RecallReportPacker::setDiskDone() {
m_diskThreadComplete = true;
}
//------------------------------------------------------------------------------
//reportDiskDone()
//------------------------------------------------------------------------------
bool RecallReportPacker::allThreadsDone() {
return m_tapeThreadComplete && m_diskThreadComplete;
}
}}}}
......@@ -75,6 +75,29 @@ public:
* @param error_code The error code given by the drive
*/
virtual void reportEndOfSessionWithErrors(const std::string msg,int error_code);
/**
* Report the drive state and set it in the central drive register. This
* function is to be used by the tape thread when running.
* @param state the new drive state.
*/
virtual void setDriveStatus(cta::DriveStatus status);
/**
* Flag disk thread as done.
*/
virtual void setDiskDone();
/**
* Flag tape thread as done. Set the drive status to draining if needed.
*/
virtual void setTapeDone();
/**
* Query the status of disk and tape threads (are they both done?).
* @return true if both tape and disk threads are done.
*/
virtual bool allThreadsDone();
/**
* Start the inner thread
......@@ -94,12 +117,10 @@ public:
private:
//inner classes use to store content while receiving a report
class Report {
const bool m_endNear;
public:
Report(bool b):m_endNear(b){}
virtual ~Report(){}
virtual void execute(RecallReportPacker& packer)=0;
bool goingToEnd() const {return m_endNear;};
virtual bool goingToEnd(RecallReportPacker& packer) {return false;};
};
class ReportSuccessful : public Report {
/**
......@@ -108,7 +129,7 @@ private:
std::unique_ptr<cta::RetrieveJob> m_successfulRetrieveJob;
public:
ReportSuccessful(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob):
Report(false), m_successfulRetrieveJob(std::move(successfulRetrieveJob)){}
m_successfulRetrieveJob(std::move(successfulRetrieveJob)){}
virtual void execute(RecallReportPacker& reportPacker);
};
class ReportError : public Report {
......@@ -118,26 +139,34 @@ private:
std::unique_ptr<cta::RetrieveJob> m_failedRetrieveJob;
public:
ReportError(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob):
Report(false),
m_failedRetrieveJob(std::move(failedRetrieveJob)) {
}
virtual void execute(RecallReportPacker& reportPacker);
};
class ReportDriveStatus : public Report {
cta::DriveStatus m_status;
public:
ReportDriveStatus(cta::DriveStatus status): m_status(status) {}
};
class ReportEndofSession : public Report {
public:
ReportEndofSession():Report(true){}
ReportEndofSession(){}
virtual void execute(RecallReportPacker& reportPacker);
virtual bool goingToEnd(RecallReportPacker& packer);
};
class ReportEndofSessionWithErrors : public Report {
std::string m_message;
int m_error_code;
public:
ReportEndofSessionWithErrors(std::string msg,int error_code):
Report(true),m_message(msg),m_error_code(error_code){}
m_message(msg),m_error_code(error_code){}
virtual void execute(RecallReportPacker& reportPacker);
virtual bool goingToEnd(RecallReportPacker& packer);
};
class WorkerThread: public castor::server::Thread {
......@@ -165,6 +194,16 @@ private:
* The mount object used to send reports
*/
cta::RetrieveMount * m_retrieveMount;
/**
* Tracking of the tape thread end
*/
bool m_tapeThreadComplete;
/**