From d76b890c94029132ac3d3b37aaa81b663991f732 Mon Sep 17 00:00:00 2001 From: Daniele Kruse <dkruse@cern.ch> Date: Sat, 5 Dec 2015 02:09:53 +0100 Subject: [PATCH] Now it's better... --- objectstore/DriveRegister.cpp | 109 ++++++++++++++++-- .../daemon/MigrationReportPacker.cpp | 15 +++ .../daemon/MigrationReportPacker.hpp | 17 ++- .../tapeserver/daemon/RecallReportPacker.cpp | 23 +++- .../tapeserver/daemon/RecallReportPacker.hpp | 4 +- .../daemon/TapeReadSingleThread.cpp | 11 +- .../daemon/TapeWriteSingleThread.cpp | 10 +- .../daemon/TapeWriteSingleThread.hpp | 8 +- tests/valgrind.suppr | 12 +- 9 files changed, 189 insertions(+), 20 deletions(-) diff --git a/objectstore/DriveRegister.cpp b/objectstore/DriveRegister.cpp index 63683fdbf1..db695565f3 100644 --- a/objectstore/DriveRegister.cpp +++ b/objectstore/DriveRegister.cpp @@ -23,6 +23,7 @@ #include "common/DriveState.hpp" #include <set> #include <json/json.h> +#include <iostream> cta::objectstore::DriveRegister::DriveRegister(const std::string & address, Backend & os): ObjectOps<serializers::DriveRegister>(os, address) { } @@ -278,16 +279,18 @@ void cta::objectstore::DriveRegister::reportDriveStatus(const std::string& drive CreationLog cl(UserIdentity(0,0), "", reportTime, "Automatic creation of " "drive on status report"); addDrive(driveName, logicalLibary, cl); - } else + } else { throw NoSuchDrive("In DriveRegister::reportDriveStatus(): No such drive"); + } // Find the drive again auto dl = m_payload.mutable_drives(); for (auto dp = dl->begin(); dp != dl->end(); dp++) { if (dp->drivename() == driveName) drive = &(*dp); } - if (!drive) + if (!drive) { throw NoSuchDrive("In DriveRegister::reportDriveStatus(): Failed to create the drive"); + } } switch (status) { case DriveStatus::Down: @@ -402,7 +405,7 @@ void cta::objectstore::DriveRegister::setDriveStarting(ReportDriveStatusInputs& drive->set_cleanupstarttime(0); drive->set_lastupdatetime(inputs.reportTime); drive->set_mounttype(serializeMountType(inputs.mountType)); - drive->set_drivestatus(serializers::DriveStatus::Mounting); + drive->set_drivestatus(serializers::DriveStatus::Starting); drive->set_currentvid(inputs.vid); drive->set_currenttapepool(inputs.tapepool); } @@ -470,19 +473,111 @@ void cta::objectstore::DriveRegister::setDriveTransfering(ReportDriveStatusInput } void cta::objectstore::DriveRegister::setDriveUnloading(ReportDriveStatusInputs& inputs, serializers::DriveState* drive) { - throw NotImplemented(""); + if (drive->drivestatus() == serializers::DriveStatus::Unloading) { + drive->set_lastupdatetime(inputs.reportTime); + return; + } + // If we are changing state, then all should be reset. We are not supposed to + // know the direction yet. + drive->set_logicallibrary(inputs.logicalLibary); + drive->set_sessionid(inputs.mountSessionId); + drive->set_bytestransferedinsession(0); + drive->set_filestransferedinsession(0); + drive->set_latestbandwidth(0); + drive->set_sessionstarttime(0); + drive->set_mountstarttime(0); + drive->set_transferstarttime(0); + drive->set_unloadstarttime(inputs.reportTime); + drive->set_unmountstarttime(0); + drive->set_drainingstarttime(0); + drive->set_downorupstarttime(0); + drive->set_cleanupstarttime(0); + drive->set_lastupdatetime(inputs.reportTime); + drive->set_mounttype(serializeMountType(inputs.mountType)); + drive->set_drivestatus(serializers::DriveStatus::Unloading); + drive->set_currentvid(inputs.vid); + drive->set_currenttapepool(inputs.tapepool); } void cta::objectstore::DriveRegister::setDriveUnmounting(ReportDriveStatusInputs& inputs, serializers::DriveState* drive) { - throw NotImplemented(""); + if (drive->drivestatus() == serializers::DriveStatus::Unmounting) { + drive->set_lastupdatetime(inputs.reportTime); + return; + } + // If we are changing state, then all should be reset. We are not supposed to + // know the direction yet. + drive->set_logicallibrary(inputs.logicalLibary); + drive->set_sessionid(inputs.mountSessionId); + drive->set_bytestransferedinsession(0); + drive->set_filestransferedinsession(0); + drive->set_latestbandwidth(0); + drive->set_sessionstarttime(0); + drive->set_mountstarttime(0); + drive->set_transferstarttime(0); + drive->set_unloadstarttime(0); + drive->set_unmountstarttime(inputs.reportTime); + drive->set_drainingstarttime(0); + drive->set_downorupstarttime(0); + drive->set_cleanupstarttime(0); + drive->set_lastupdatetime(inputs.reportTime); + drive->set_mounttype(serializeMountType(inputs.mountType)); + drive->set_drivestatus(serializers::DriveStatus::Unmounting); + drive->set_currentvid(inputs.vid); + drive->set_currenttapepool(inputs.tapepool); } void cta::objectstore::DriveRegister::setDriveDrainingToDisk(ReportDriveStatusInputs& inputs, serializers::DriveState* drive) { - throw NotImplemented(""); + if (drive->drivestatus() == serializers::DriveStatus::DrainingToDisk) { + drive->set_lastupdatetime(inputs.reportTime); + return; + } + // If we are changing state, then all should be reset. We are not supposed to + // know the direction yet. + drive->set_logicallibrary(inputs.logicalLibary); + drive->set_sessionid(inputs.mountSessionId); + drive->set_bytestransferedinsession(0); + drive->set_filestransferedinsession(0); + drive->set_latestbandwidth(0); + drive->set_sessionstarttime(0); + drive->set_mountstarttime(0); + drive->set_transferstarttime(0); + drive->set_unloadstarttime(0); + drive->set_unmountstarttime(0); + drive->set_drainingstarttime(inputs.reportTime); + drive->set_downorupstarttime(0); + drive->set_cleanupstarttime(0); + drive->set_lastupdatetime(inputs.reportTime); + drive->set_mounttype(serializeMountType(inputs.mountType)); + drive->set_drivestatus(serializers::DriveStatus::DrainingToDisk); + drive->set_currentvid(inputs.vid); + drive->set_currenttapepool(inputs.tapepool); } void cta::objectstore::DriveRegister::setDriveCleaningUp(ReportDriveStatusInputs& inputs, serializers::DriveState* drive) { - throw NotImplemented(""); + if (drive->drivestatus() == serializers::DriveStatus::CleaningUp) { + drive->set_lastupdatetime(inputs.reportTime); + return; + } + // If we are changing state, then all should be reset. We are not supposed to + // know the direction yet. + drive->set_logicallibrary(inputs.logicalLibary); + drive->set_sessionid(inputs.mountSessionId); + drive->set_bytestransferedinsession(0); + drive->set_filestransferedinsession(0); + drive->set_latestbandwidth(0); + drive->set_sessionstarttime(0); + drive->set_mountstarttime(0); + drive->set_transferstarttime(0); + drive->set_unloadstarttime(0); + drive->set_unmountstarttime(0); + drive->set_drainingstarttime(0); + drive->set_downorupstarttime(0); + drive->set_cleanupstarttime(inputs.reportTime); + drive->set_lastupdatetime(inputs.reportTime); + drive->set_mounttype(serializeMountType(inputs.mountType)); + drive->set_drivestatus(serializers::DriveStatus::CleaningUp); + drive->set_currentvid(inputs.vid); + drive->set_currenttapepool(inputs.tapepool); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp index 9a7b1bcb68..9081429ade 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp @@ -112,6 +112,21 @@ void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& rep reportPacker.m_successfulArchiveJobs.push(std::move(m_successfulArchiveJob)); } +//------------------------------------------------------------------------------ +//reportDriveStatus +//------------------------------------------------------------------------------ +void MigrationReportPacker::reportDriveStatus(cta::DriveStatus status) { + castor::server::MutexLocker ml(&m_producterProtection); + m_fifo.push(new ReportDriveStatus(status)); +} + +//------------------------------------------------------------------------------ +//ReportDriveStatus::execute +//------------------------------------------------------------------------------ +void MigrationReportPacker::ReportDriveStatus::execute(MigrationReportPacker& parent){ + parent.m_archiveMount->setDriveStatus(m_status); +} + //------------------------------------------------------------------------------ //ReportFlush::execute //------------------------------------------------------------------------------ diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp index 54002d4f25..0ac65c4911 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp @@ -70,7 +70,14 @@ public: * @param compressStats * */ - virtual void reportFlush(drive::compressionStats compressStats); + virtual void reportFlush(drive::compressionStats compressStats); + + /** + * Report the drive state and set it in the central drive register. This + * function is to be used by the tape thread when running. + * @param state the new drive state. + */ + virtual void reportDriveStatus(cta::DriveStatus status); /** * Create into the MigrationReportPacker a report for the nominal end of session @@ -110,6 +117,14 @@ private: m_successfulArchiveJob(std::move(successfulArchiveJob)) {} virtual void execute(MigrationReportPacker& reportPacker); }; + + class ReportDriveStatus : public Report { + cta::DriveStatus m_status; + public: + ReportDriveStatus(cta::DriveStatus status): m_status(status) {} + virtual void execute(MigrationReportPacker& reportPacker); + }; + class ReportFlush : public Report { drive::compressionStats m_compressStats; diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp index 6ee620a622..10e8388dca 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp @@ -84,10 +84,11 @@ void RecallReportPacker::reportEndOfSession(){ } //------------------------------------------------------------------------------ -//setDriveStatus +//reportDriveStatus //------------------------------------------------------------------------------ -void RecallReportPacker::setDriveStatus(cta::DriveStatus status) { - m_retrieveMount->setDriveStatus(status); +void RecallReportPacker::reportDriveStatus(cta::DriveStatus status) { + castor::server::MutexLocker ml(&m_producterProtection); + m_fifo.push(new ReportDriveStatus(status)); } @@ -144,6 +145,20 @@ bool RecallReportPacker::ReportEndofSession::goingToEnd(RecallReportPacker& pack return packer.allThreadsDone(); } +//------------------------------------------------------------------------------ +//ReportDriveStatus::execute +//------------------------------------------------------------------------------ +void RecallReportPacker::ReportDriveStatus::execute(RecallReportPacker& parent){ + parent.m_retrieveMount->setDriveStatus(m_status); +} + +//------------------------------------------------------------------------------ +//ReportDriveStatus::goingToEnd +//------------------------------------------------------------------------------ +bool RecallReportPacker::ReportDriveStatus::goingToEnd(RecallReportPacker& packer) { + return false; +} + //------------------------------------------------------------------------------ //ReportEndofSessionWithErrors::execute //------------------------------------------------------------------------------ @@ -200,7 +215,7 @@ void RecallReportPacker::WorkerThread::run(){ bool endFound = false; try{ while(1) { - std::unique_ptr<Report> rep(m_parent.m_fifo.pop()); + std::unique_ptr<Report> rep(m_parent.m_fifo.pop()); rep->execute(m_parent); if(rep->goingToEnd(m_parent)) { diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp index 29071c8184..648587946a 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp @@ -81,7 +81,7 @@ public: * function is to be used by the tape thread when running. * @param state the new drive state. */ - virtual void setDriveStatus(cta::DriveStatus status); + virtual void reportDriveStatus(cta::DriveStatus status); /** * Flag disk thread as done. @@ -149,6 +149,8 @@ private: cta::DriveStatus m_status; public: ReportDriveStatus(cta::DriveStatus status): m_status(status) {} + virtual void execute(RecallReportPacker& reportPacker); + virtual bool goingToEnd(RecallReportPacker& packer); }; class ReportEndofSession : public Report { diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp index b2b310622b..6fe47dce70 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeReadSingleThread.cpp @@ -45,6 +45,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeReadSingleThread( //TapeCleaning::~TapeCleaning() //------------------------------------------------------------------------------ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeCleaning() { + m_this.m_rrp.reportDriveStatus(cta::DriveStatus::CleaningUp); // Tell everyone to wrap up the session // We now acknowledge to the task injector that read reached the end. There // will hence be no more requests for more. @@ -73,7 +74,8 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean goto done; } // in the special case of a "manual" mode tape, we should skip the unload too. - if (mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType()) { + if (mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType()) { + m_this.m_rrp.reportDriveStatus(cta::DriveStatus::Unloading); m_this.m_drive.unloadTape(); m_this.m_logContext.log(LOG_INFO, "TapeReadSingleThread: Tape unloaded"); } else { @@ -83,7 +85,8 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean // And return the tape to the library // In case of manual mode, this will be filtered by the rmc daemon // (which will do nothing) - currentErrorToCount = "Error_tapeDismount"; + currentErrorToCount = "Error_tapeDismount"; + m_this.m_rrp.reportDriveStatus(cta::DriveStatus::Unmounting); m_this.m_mc.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.getLibrarySlot()); m_this.m_stats.unmountTime += m_timer.secs(castor::utils::Timer::resetCounter); m_this.m_logContext.log(LOG_INFO, mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType() ? @@ -93,6 +96,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean } catch(const castor::exception::Exception& ex){ // Something failed during the cleaning m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN; + m_this.m_rrp.reportDriveStatus(cta::DriveStatus::Down); castor::log::ScopedParamContainer scoped(m_this.m_logContext); scoped.add("exception_message", ex.getMessageValue()) .add("exception_code",ex.code()); @@ -105,6 +109,7 @@ castor::tape::tapeserver::daemon::TapeReadSingleThread::TapeCleaning::~TapeClean } catch (...) { // Notify something failed during the cleaning m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN; + m_this.m_rrp.reportDriveStatus(cta::DriveStatus::Down); m_this.m_logContext.log(LOG_ERR, "Non-Castor exception in TapeReadSingleThread-TapeCleaning when unmounting the tape"); try { if (currentErrorToCount.size()) { @@ -198,6 +203,7 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { TapeCleaning tapeCleaner(*this, timer); // Before anything, the tape should be mounted currentErrorToCount = "Error_tapeMountForRead"; + m_rrp.reportDriveStatus(cta::DriveStatus::Mounting); mountTapeReadOnly(); currentErrorToCount = "Error_tapeLoad"; waitForDrive(); @@ -226,6 +232,7 @@ void castor::tape::tapeserver::daemon::TapeReadSingleThread::run() { // Then we will loop on the tasks as they get from // the task injector std::unique_ptr<TapeReadTask> task; + m_rrp.reportDriveStatus(cta::DriveStatus::Transfering); while(true) { //get a task task.reset(popAndRequestMoreJobs()); diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp index 1b3a03be26..c0a3701e60 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.cpp @@ -33,13 +33,15 @@ castor::tape::tapeserver::drive::DriveInterface & drive, TapeServerReporter & tsr, MigrationWatchDog & mwd, const VolumeInfo& volInfo, - castor::log::LogContext & lc, MigrationReportPacker & repPacker, + castor::log::LogContext & lc, + MigrationReportPacker & repPacker, castor::server::ProcessCap &capUtils, uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush): TapeSingleThreadInterface<TapeWriteTask>(drive, mc, tsr, volInfo,capUtils, lc), m_filesBeforeFlush(filesBeforeFlush), m_bytesBeforeFlush(bytesBeforeFlush), - m_drive(drive), m_reportPacker(repPacker), + m_drive(drive), + m_reportPacker(repPacker), m_lastFseq(-1), m_compress(true), m_watchdog(mwd){} @@ -168,6 +170,7 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() { // TapeCleaning cleaner(*this, timer); currentErrorToCount = "Error_tapeMountForWrite"; + m_reportPacker.reportDriveStatus(cta::DriveStatus::Mounting); // Before anything, the tape should be mounted // This call does the logging of the mount mountTapeReadWrite(); @@ -202,7 +205,8 @@ void castor::tape::tapeserver::daemon::TapeWriteSingleThread::run() { m_stats.waitReportingTime += timer.secs(castor::utils::Timer::resetCounter); // Tasks handle their error logging themselves. currentErrorToCount = ""; - std::unique_ptr<TapeWriteTask> task; + std::unique_ptr<TapeWriteTask> task; + m_reportPacker.reportDriveStatus(cta::DriveStatus::Transfering); while(1) { //get a task task.reset(m_tasks.pop()); diff --git a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp index 352a51233c..dec5537180 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/TapeWriteSingleThread.hpp @@ -65,7 +65,8 @@ public: TapeServerReporter & tsr, MigrationWatchDog & mwd, const VolumeInfo& volInfo, - castor::log::LogContext & lc, MigrationReportPacker & repPacker, + castor::log::LogContext & lc, + MigrationReportPacker & repPacker, castor::server::ProcessCap &capUtils, uint64_t filesBeforeFlush, uint64_t bytesBeforeFlush); @@ -103,6 +104,7 @@ private: TapeCleaning(TapeWriteSingleThread& parent, castor::utils::Timer & timer): m_this(parent), m_timer(timer) {} ~TapeCleaning(){ + m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::CleaningUp); // This out-of-try-catch variables allows us to record the stage of the // process we're in, and to count the error if it occurs. // We will not record errors for an empty string. This will allow us to @@ -125,6 +127,7 @@ private: } // in the special case of a "manual" mode tape, we should skip the unload too. if (mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType()) { + m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::Unloading); m_this.m_drive.unloadTape(); m_this.m_logContext.log(LOG_INFO, "TapeWriteSingleThread: Tape unloaded"); } else { @@ -135,6 +138,7 @@ private: // In case of manual mode, this will be filtered by the rmc daemon // (which will do nothing) currentErrorToCount = "Error_tapeDismount"; + m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::Unmounting); m_this.m_mc.dismountTape(m_this.m_volInfo.vid, m_this.m_drive.config.getLibrarySlot()); m_this.m_stats.unmountTime += m_timer.secs(castor::utils::Timer::resetCounter); m_this.m_logContext.log(LOG_INFO, mediachanger::TAPE_LIBRARY_TYPE_MANUAL != m_this.m_drive.config.getLibrarySlot().getLibraryType() ? @@ -145,6 +149,7 @@ private: catch(const castor::exception::Exception& ex){ // Notify something failed during the cleaning m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN; + m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::Down); castor::log::ScopedParamContainer scoped(m_this.m_logContext); scoped.add("exception_message", ex.getMessageValue()) .add("exception_code",ex.code()); @@ -159,6 +164,7 @@ private: } catch (...) { // Notify something failed during the cleaning m_this.m_hardwareStatus = Session::MARK_DRIVE_AS_DOWN; + m_this.m_reportPacker.reportDriveStatus(cta::DriveStatus::Down); m_this.m_logContext.log(LOG_ERR, "Non-Castor exception in TapeWriteSingleThread-TapeCleaning when unmounting the tape"); try { if (currentErrorToCount.size()) { diff --git a/tests/valgrind.suppr b/tests/valgrind.suppr index 611635aee9..3f9b60f89f 100644 --- a/tests/valgrind.suppr +++ b/tests/valgrind.suppr @@ -249,4 +249,14 @@ obj:/usr/lib64/libXrdCl.so.2.0.0 } - +{ + tape_cleaning + Memcheck:Leak + fun:_Znwm + fun:_ZN6castor4tape10tapeserver6daemon21MigrationReportPacker17reportDriveStatusEN3cta11DriveStatusE + fun:_ZN6castor4tape10tapeserver6daemon21TapeWriteSingleThread12TapeCleaningD1Ev + fun:_ZN6castor4tape10tapeserver6daemon21TapeWriteSingleThread3runEv + fun:_ZN6castor6server6Thread14pthread_runnerEPv + fun:start_thread + fun:clone +} -- GitLab