Commit 4e0f51ca authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Now it's better...

parent 84d7c787
......@@ -23,6 +23,7 @@
#include "common/DriveState.hpp"
#include <set>
#include <json/json.h>
#include <iostream>
cta::objectstore::DriveRegister::DriveRegister(const std::string & address, Backend & os):
ObjectOps<serializers::DriveRegister>(os, address) { }
......@@ -278,16 +279,18 @@ void cta::objectstore::DriveRegister::reportDriveStatus(const std::string& drive
CreationLog cl(UserIdentity(0,0), "", reportTime, "Automatic creation of "
"drive on status report");
addDrive(driveName, logicalLibary, cl);
} else
} else {
throw NoSuchDrive("In DriveRegister::reportDriveStatus(): No such drive");
}
// Find the drive again
auto dl = m_payload.mutable_drives();
for (auto dp = dl->begin(); dp != dl->end(); dp++) {
if (dp->drivename() == driveName)
drive = &(*dp);
}
if (!drive)
if (!drive) {
throw NoSuchDrive("In DriveRegister::reportDriveStatus(): Failed to create the drive");
}
}
switch (status) {
case DriveStatus::Down:
......@@ -402,7 +405,7 @@ void cta::objectstore::DriveRegister::setDriveStarting(ReportDriveStatusInputs&
drive->set_cleanupstarttime(0);
drive->set_lastupdatetime(inputs.reportTime);
drive->set_mounttype(serializeMountType(inputs.mountType));
drive->set_drivestatus(serializers::DriveStatus::Mounting);
drive->set_drivestatus(serializers::DriveStatus::Starting);
drive->set_currentvid(inputs.vid);
drive->set_currenttapepool(inputs.tapepool);
}
......@@ -470,19 +473,111 @@ void cta::objectstore::DriveRegister::setDriveTransfering(ReportDriveStatusInput
}
void cta::objectstore::DriveRegister::setDriveUnloading(ReportDriveStatusInputs& inputs, serializers::DriveState* drive) {
throw NotImplemented("");
if (drive->drivestatus() == serializers::DriveStatus::Unloading) {
drive->set_lastupdatetime(inputs.reportTime);
return;
}
// If we are changing state, then all should be reset. We are not supposed to
// know the direction yet.
drive->set_logicallibrary(inputs.logicalLibary);
drive->set_sessionid(inputs.mountSessionId);
drive->set_bytestransferedinsession(0);
drive->set_filestransferedinsession(0);
drive->set_latestbandwidth(0);
drive->set_sessionstarttime(0);
drive->set_mountstarttime(0);
drive->set_transferstarttime(0);
drive->set_unloadstarttime(inputs.reportTime);
drive->set_unmountstarttime(0);
drive->set_drainingstarttime(0);
drive->set_downorupstarttime(0);
drive->set_cleanupstarttime(0);
drive->set_lastupdatetime(inputs.reportTime);
drive->set_mounttype(serializeMountType(inputs.mountType));
drive->set_drivestatus(serializers::DriveStatus::Unloading);
drive->set_currentvid(inputs.vid);
drive->set_currenttapepool(inputs.tapepool);
}
void cta::objectstore::DriveRegister::setDriveUnmounting(ReportDriveStatusInputs& inputs, serializers::DriveState* drive) {
throw NotImplemented("");
if (drive->drivestatus() == serializers::DriveStatus::Unmounting) {
drive->set_lastupdatetime(inputs.reportTime);
return;
}
// If we are changing state, then all should be reset. We are not supposed to
// know the direction yet.
drive->set_logicallibrary(inputs.logicalLibary);
drive->set_sessionid(inputs.mountSessionId);
drive->set_bytestransferedinsession(0);
drive->set_filestransferedinsession(0);
drive->set_latestbandwidth(0);
drive->set_sessionstarttime(0);
drive->set_mountstarttime(0);
drive->set_transferstarttime(0);
drive->set_unloadstarttime(0);
drive->set_unmountstarttime(inputs.reportTime);
drive->set_drainingstarttime(0);
drive->set_downorupstarttime(0);
drive->set_cleanupstarttime(0);
drive->set_lastupdatetime(inputs.reportTime);
drive->set_mounttype(serializeMountType(inputs.mountType));
drive->set_drivestatus(serializers::DriveStatus::Unmounting);
drive->set_currentvid(inputs.vid);
drive->set_currenttapepool(inputs.tapepool);
}
void cta::objectstore::DriveRegister::setDriveDrainingToDisk(ReportDriveStatusInputs& inputs, serializers::DriveState* drive) {
throw NotImplemented("");
if (drive->drivestatus() == serializers::DriveStatus::DrainingToDisk) {
drive->set_lastupdatetime(inputs.reportTime);
return;
}
// If we are changing state, then all should be reset. We are not supposed to
// know the direction yet.
drive->set_logicallibrary(inputs.logicalLibary);
drive->set_sessionid(inputs.mountSessionId);
drive->set_bytestransferedinsession(0);
drive->set_filestransferedinsession(0);
drive->set_latestbandwidth(0);
drive->set_sessionstarttime(0);
drive->set_mountstarttime(0);
drive->set_transferstarttime(0);
drive->set_unloadstarttime(0);
drive->set_unmountstarttime(0);
drive->set_drainingstarttime(inputs.reportTime);
drive->set_downorupstarttime(0);
drive->set_cleanupstarttime(0);
drive->set_lastupdatetime(inputs.reportTime);
drive->set_mounttype(serializeMountType(inputs.mountType));
drive->set_drivestatus(serializers::DriveStatus::DrainingToDisk);
drive->set_currentvid(inputs.vid);
drive->set_currenttapepool(inputs.tapepool);
}
void cta::objectstore::DriveRegister::setDriveCleaningUp(ReportDriveStatusInputs& inputs, serializers::DriveState* drive) {
throw NotImplemented("");
if (drive->drivestatus() == serializers::DriveStatus::CleaningUp) {
drive->set_lastupdatetime(inputs.reportTime);
return;
}
// If we are changing state, then all should be reset. We are not supposed to
// know the direction yet.
drive->set_logicallibrary(inputs.logicalLibary);
drive->set_sessionid(inputs.mountSessionId);
drive->set_bytestransferedinsession(0);
drive->set_filestransferedinsession(0);
drive->set_latestbandwidth(0);
drive->set_sessionstarttime(0);
drive->set_mountstarttime(0);
drive->set_transferstarttime(0);
drive->set_unloadstarttime(0);
drive->set_unmountstarttime(0);
drive->set_drainingstarttime(0);
drive->set_downorupstarttime(0);
drive->set_cleanupstarttime(inputs.reportTime);
drive->set_lastupdatetime(inputs.reportTime);
drive->set_mounttype(serializeMountType(inputs.mountType));
drive->set_drivestatus(serializers::DriveStatus::CleaningUp);
drive->set_currentvid(inputs.vid);
drive->set_currenttapepool(inputs.tapepool);
}
......
......@@ -112,6 +112,21 @@ void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& rep
reportPacker.m_successfulArchiveJobs.push(std::move(m_successfulArchiveJob));
}
//------------------------------------------------------------------------------
//reportDriveStatus
//------------------------------------------------------------------------------
void MigrationReportPacker::reportDriveStatus(cta::DriveStatus status) {
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportDriveStatus(status));
}
//------------------------------------------------------------------------------
//ReportDriveStatus::execute
//------------------------------------------------------------------------------
void MigrationReportPacker::ReportDriveStatus::execute(MigrationReportPacker& parent){
parent.m_archiveMount->setDriveStatus(m_status);
}
//------------------------------------------------------------------------------
//ReportFlush::execute
//------------------------------------------------------------------------------
......
......@@ -70,7 +70,14 @@ public:
* @param compressStats
*
*/
virtual void reportFlush(drive::compressionStats compressStats);
virtual void reportFlush(drive::compressionStats compressStats);
/**
* Report the drive state and set it in the central drive register. This
* function is to be used by the tape thread when running.
* @param state the new drive state.
*/
virtual void reportDriveStatus(cta::DriveStatus status);
/**
* Create into the MigrationReportPacker a report for the nominal end of session
......@@ -110,6 +117,14 @@ private:
m_successfulArchiveJob(std::move(successfulArchiveJob)) {}
virtual void execute(MigrationReportPacker& reportPacker);
};
class ReportDriveStatus : public Report {
cta::DriveStatus m_status;
public:
ReportDriveStatus(cta::DriveStatus status): m_status(status) {}
virtual void execute(MigrationReportPacker& reportPacker);
};
class ReportFlush : public Report {
drive::compressionStats m_compressStats;
......
......@@ -84,10 +84,11 @@ void RecallReportPacker::reportEndOfSession(){
}
//------------------------------------------------------------------------------
//setDriveStatus
//reportDriveStatus
//------------------------------------------------------------------------------
void RecallReportPacker::setDriveStatus(cta::DriveStatus status) {
m_retrieveMount->setDriveStatus(status);
void RecallReportPacker::reportDriveStatus(cta::DriveStatus status) {
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(new ReportDriveStatus(status));
}
......@@ -144,6 +145,20 @@ bool RecallReportPacker::ReportEndofSession::goingToEnd(RecallReportPacker& pack
return packer.allThreadsDone();
}
//------------------------------------------------------------------------------
//ReportDriveStatus::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportDriveStatus::execute(RecallReportPacker& parent){
parent.m_retrieveMount->setDriveStatus(m_status);
}
//------------------------------------------------------------------------------
//ReportDriveStatus::goingToEnd
//------------------------------------------------------------------------------
bool RecallReportPacker::ReportDriveStatus::goingToEnd(RecallReportPacker& packer) {
return false;
}
//------------------------------------------------------------------------------
//ReportEndofSessionWithErrors::execute
//------------------------------------------------------------------------------
......@@ -200,7 +215,7 @@ void RecallReportPacker::WorkerThread::run(){
bool endFound = false;
try{
while(1) {
std::unique_ptr<Report> rep(m_parent.m_fifo.pop());
std::unique_ptr<Report> rep(m_parent.m_fifo.pop());
rep->execute(m_parent);
if(rep->goingToEnd(m_parent)) {
......
......@@ -81,7 +81,7 @@ public:
* function is to be used by the tape thread when running.
* @param state the new drive state.
*/
virtual void setDriveStatus(cta::DriveStatus status);
virtual void reportDriveStatus(cta::DriveStatus status);
/**
* Flag disk thread as done.
......@@ -149,6 +149,8 @@ private:
cta::DriveStatus m_status;
public:
ReportDriveStatus(cta::DriveStatus status): m_status(status) {}
virtual void execute(RecallReportPacker& reportPacker);
virtual bool goingToEnd(RecallReportPacker& packer);
};
class ReportEndofSession : public Report {
......
......@@ -45,6 +45,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeReadSingleThread(
//TapeCleaning::~TapeCleaning()
//------------------------------------------------------------------------------
castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeCleaning() {
m_this.m_rrp.reportDriveStatus(cta::DriveStatus::CleaningUp);
// Tell everyone to wrap up the session
// We now acknowledge to the task injector that read reached the end. There
// will hence be no more requests for more.
......@@ -73,7 +74,8 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
goto done;
}
// in the special case of a "manual" mode tape, we should skip the unload too.
if (mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType()) {
if (mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType()) {
m_this.m_rrp.reportDriveStatus(cta::DriveStatus::Unloading);
m_this.m_drive.unloadTape();
m_this.m_logContext.log(LOG_INFO, "TapeReadSingleThread: Tape unloaded");
} else {
......@@ -83,7 +85,8 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
// And return the tape to the library
// In case of manual mode, this will be filtered by the rmc daemon
// (which will do nothing)
currentErrorToCount = "Error_tapeDismount";
currentErrorToCount = "Error_tapeDismount";
m_this.m_rrp.reportDriveStatus(cta::DriveStatus::Unmounting);
m_this.m_mc.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.getLibrarySlot());
m_this.m_stats.unmountTime += m_timer.secs(castor::utils::Timer::resetCounter);
m_this.m_logContext.log(LOG_INFO, mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType() ?
......@@ -93,6 +96,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
} catch(const castor::exception::Exception& ex){
// Something failed during the cleaning
m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
m_this.m_rrp.reportDriveStatus(cta::DriveStatus::Down);
castor::log::ScopedParamContainer scoped(m_this.m_logContext);
scoped.add("exception_message", ex.getMessageValue())
.add("exception_code",ex.code());
......@@ -105,6 +109,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean
} catch (...) {
// Notify something failed during the cleaning
m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
m_this.m_rrp.reportDriveStatus(cta::DriveStatus::Down);
m_this.m_logContext.log(LOG_ERR, "Non-Castor exception in TapeReadSingleThread-TapeCleaning when unmounting the tape");
try {
if (currentErrorToCount.size()) {
......@@ -198,6 +203,7 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
TapeCleaning tapeCleaner(*this, timer);
// Before anything, the tape should be mounted
currentErrorToCount = "Error_tapeMountForRead";
m_rrp.reportDriveStatus(cta::DriveStatus::Mounting);
mountTapeReadOnly();
currentErrorToCount = "Error_tapeLoad";
waitForDrive();
......@@ -226,6 +232,7 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() {
// Then we will loop on the tasks as they get from
// the task injector
std::unique_ptr<TapeReadTask> task;
m_rrp.reportDriveStatus(cta::DriveStatus::Transfering);
while(true) {
//get a task
task.reset(popAndRequestMoreJobs());
......
......@@ -33,13 +33,15 @@ castor::tape::tapeserver::drive::DriveInterface & drive,
TapeServerReporter & tsr,
MigrationWatchDog & mwd,
const VolumeInfo& volInfo,
castor::log::LogContext & lc, MigrationReportPacker & repPacker,
castor::log::LogContext & lc,
MigrationReportPacker & repPacker,
castor::server::ProcessCap &capUtils,
uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush):
TapeSingleThreadInterface<TapeWriteTask>(drive, mc, tsr, volInfo,capUtils, lc),
m_filesBeforeFlush(filesBeforeFlush),
m_bytesBeforeFlush(bytesBeforeFlush),
m_drive(drive), m_reportPacker(repPacker),
m_drive(drive),
m_reportPacker(repPacker),
m_lastFseq(-1),
m_compress(true),
m_watchdog(mwd){}
......@@ -168,6 +170,7 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
//
TapeCleaning cleaner(*this, timer);
currentErrorToCount = "Error_tapeMountForWrite";
m_reportPacker.reportDriveStatus(cta::DriveStatus::Mounting);
// Before anything, the tape should be mounted
// This call does the logging of the mount
mountTapeReadWrite();
......@@ -202,7 +205,8 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() {
m_stats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter);
// Tasks handle their error logging themselves.
currentErrorToCount = "";
std::unique_ptr<TapeWriteTask> task;
std::unique_ptr<TapeWriteTask> task;
m_reportPacker.reportDriveStatus(cta::DriveStatus::Transfering);
while(1) {
//get a task
task.reset(m_tasks.pop());
......
......@@ -65,7 +65,8 @@ public:
TapeServerReporter & tsr,
MigrationWatchDog & mwd,
const VolumeInfo& volInfo,
castor::log::LogContext & lc, MigrationReportPacker & repPacker,
castor::log::LogContext & lc,
MigrationReportPacker & repPacker,
castor::server::ProcessCap &capUtils,
uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush);
......@@ -103,6 +104,7 @@ private:
TapeCleaning(TapeWriteSingleThread& parent, castor::utils::Timer & timer):
m_this(parent), m_timer(timer) {}
~TapeCleaning(){
m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::CleaningUp);
// This out-of-try-catch variables allows us to record the stage of the
// process we're in, and to count the error if it occurs.
// We will not record errors for an empty string. This will allow us to
......@@ -125,6 +127,7 @@ private:
}
// in the special case of a "manual" mode tape, we should skip the unload too.
if (mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType()) {
m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::Unloading);
m_this.m_drive.unloadTape();
m_this.m_logContext.log(LOG_INFO, "TapeWriteSingleThread: Tape unloaded");
} else {
......@@ -135,6 +138,7 @@ private:
// In case of manual mode, this will be filtered by the rmc daemon
// (which will do nothing)
currentErrorToCount = "Error_tapeDismount";
m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::Unmounting);
m_this.m_mc.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.getLibrarySlot());
m_this.m_stats.unmountTime += m_timer.secs(castor::utils::Timer::resetCounter);
m_this.m_logContext.log(LOG_INFO, mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType() ?
......@@ -145,6 +149,7 @@ private:
catch(const castor::exception::Exception& ex){
// Notify something failed during the cleaning
m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::Down);
castor::log::ScopedParamContainer scoped(m_this.m_logContext);
scoped.add("exception_message", ex.getMessageValue())
.add("exception_code",ex.code());
......@@ -159,6 +164,7 @@ private:
} catch (...) {
// Notify something failed during the cleaning
m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN;
m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::Down);
m_this.m_logContext.log(LOG_ERR, "Non-Castor exception in TapeWriteSingleThread-TapeCleaning when unmounting the tape");
try {
if (currentErrorToCount.size()) {
......
......@@ -249,4 +249,14 @@
obj:/usr/lib64/libXrdCl.so.2.0.0
}
{
tape_cleaning
Memcheck:Leak
fun:_Znwm
fun:_ZN6castor4tape10tapeserver6daemon21MigrationReportPacker17reportDriveStatusEN3cta11DriveStatusE
fun:_ZN6castor4tape10tapeserver6daemon21TapeWriteSingleThread12TapeCleaningD1Ev
fun:_ZN6castor4tape10tapeserver6daemon21TapeWriteSingleThread3runEv
fun:_ZN6castor6server6Thread14pthread_runnerEPv
fun:start_thread
fun:clone
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment