diff --git a/tapeserver/daemon/DriveHandler.cpp b/tapeserver/daemon/DriveHandler.cpp index b6f7e08d00cef9e497e1628573b54926ee6b0bb1..e975e0cfaffea09709ed1402d9c131b034a32886 100644 --- a/tapeserver/daemon/DriveHandler.cpp +++ b/tapeserver/daemon/DriveHandler.cpp @@ -247,34 +247,43 @@ SubprocessHandler::ProcessingStatus DriveHandler::processEvent() { try { serializers::WatchdogMessage message; message.ParseFromString(m_socketPair->receive()); + // Logs are processed in all cases processLogs(message); - switch((SessionState)message.sessionstate()) { - case SessionState::StartingUp: - return processStartingUp(message); - case SessionState::Checking: - return processChecking(message); - case SessionState::Scheduling: - return processScheduling(message); - case SessionState::Mounting: - return processMounting(message); - case SessionState::Running: - return processRunning(message); - case SessionState::Unmounting: - return processUnmounting(message); - case SessionState::DrainingToDisk: - return processDrainingToDisk(message); - case SessionState::ShutingDown: - return processShutingDown(message); - case SessionState::Fatal: - return processFatal(message); - default: - { - exception::Exception ex; - ex.getMessage() << "In DriveHandler::processEvent(): unexpected session state:" - << session::toString((SessionState)message.sessionstate()); - throw ex; + // If we report bytes, process the report + if (message.reportingbytes()) { + processBytes(message); + } + // If we report a state change, process it (last as this can change the return value) + if (message.reportingstate()) { + switch((SessionState)message.sessionstate()) { + case SessionState::StartingUp: + return processStartingUp(message); + case SessionState::Checking: + return processChecking(message); + case SessionState::Scheduling: + return processScheduling(message); + case SessionState::Mounting: + return processMounting(message); + case SessionState::Running: + return processRunning(message); + case SessionState::Unmounting: + return processUnmounting(message); + case SessionState::DrainingToDisk: + return processDrainingToDisk(message); + case SessionState::ShutingDown: + return processShutingDown(message); + case SessionState::Fatal: + return processFatal(message); + default: + { + exception::Exception ex; + ex.getMessage() << "In DriveHandler::processEvent(): unexpected session state:" + << session::toString((SessionState)message.sessionstate()); + throw ex; + } } } + return m_processingStatus; } catch(cta::server::SocketPair::PeerDisconnected & ex) { // The peer disconnected: close the socket pair and remove it from the epoll list. if (m_socketPair.get()) { @@ -593,6 +602,13 @@ void DriveHandler::processLogs(serializers::WatchdogMessage& message) { } } +//------------------------------------------------------------------------------ +// DriveHandler::processBytes +//------------------------------------------------------------------------------ +void DriveHandler::processBytes(serializers::WatchdogMessage& message) { + throw cta::exception::Exception("In DriveHandler::processBytes(): not implemented"); +} + //------------------------------------------------------------------------------ // DriveHandler::processSigChild //------------------------------------------------------------------------------ diff --git a/tapeserver/daemon/DriveHandler.hpp b/tapeserver/daemon/DriveHandler.hpp index 93c0c02144d39c36ef42b2a590787a12c918a243..a57c576580aab871480f166ba7dbfc52522e8cbc 100644 --- a/tapeserver/daemon/DriveHandler.hpp +++ b/tapeserver/daemon/DriveHandler.hpp @@ -122,6 +122,8 @@ private: std::unique_ptr<cta::server::SocketPair> m_socketPair; /** Helper function accumulating logs */ void processLogs(serializers::WatchdogMessage & message); + /** Helper function accumulating bytes transferred */ + void processBytes(serializers::WatchdogMessage & message); }; // TODO: remove/merge ChildProcess. diff --git a/tapeserver/daemon/DriveHandlerProxy.cpp b/tapeserver/daemon/DriveHandlerProxy.cpp index 939a53ba41d9479ac9ae86d759dde2b36caaf082..66ba3f648924e84de0efad4de022d12f7e59279f 100644 --- a/tapeserver/daemon/DriveHandlerProxy.cpp +++ b/tapeserver/daemon/DriveHandlerProxy.cpp @@ -25,14 +25,33 @@ DriveHandlerProxy::DriveHandlerProxy(server::SocketPair& socketPair): m_socketPa m_socketPair.close(server::SocketPair::Side::parent); } +// TODO: me might want to group the messages to reduce the rate. + void DriveHandlerProxy::addLogParams(const std::string& unitName, const std::list<cta::log::Param>& params) { - throw cta::exception::Exception("In DriveHandlerProxy::addLogParams(): not implemented"); - // TODO + serializers::WatchdogMessage watchdogMessage; + watchdogMessage.set_reportingstate(false); + watchdogMessage.set_reportingbytes(false); + for (auto & p: params) { + auto * lp = watchdogMessage.mutable_addedlogparams()->Add(); + lp->set_name(p.getName()); + lp->set_value(p.getValue()); + } + std::string buffer; + watchdogMessage.SerializeToString(&buffer); + m_socketPair.send(buffer); } void DriveHandlerProxy::deleteLogParams(const std::string& unitName, const std::list<std::string>& paramNames) { - // TODO - throw cta::exception::Exception("In DriveHandlerProxy::deleteLogParams(): not implemented"); + serializers::WatchdogMessage watchdogMessage; + watchdogMessage.set_reportingstate(false); + watchdogMessage.set_reportingbytes(false); + for (auto &pn: paramNames) { + auto * lpn = watchdogMessage.mutable_deletedlogparams()->Add(); + *lpn = pn; + } + std::string buffer; + watchdogMessage.SerializeToString(&buffer); + m_socketPair.send(buffer); } void DriveHandlerProxy::labelError(const std::string& unitName, const std::string& message) { @@ -47,6 +66,8 @@ void DriveHandlerProxy::reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t t void DriveHandlerProxy::reportState(const cta::tape::session::SessionState state, const cta::tape::session::SessionType type, const std::string& vid) { serializers::WatchdogMessage watchdogMessage; + watchdogMessage.set_reportingstate(true); + watchdogMessage.set_reportingbytes(false); watchdogMessage.set_totaldiskbytesmoved(0); watchdogMessage.set_totaltapebytesmoved(0); watchdogMessage.set_sessionstate((uint32_t)state); diff --git a/tapeserver/daemon/WatchdogMessage.proto b/tapeserver/daemon/WatchdogMessage.proto index 3a0d681479f21b78192ebe72424750e999372679..92c970c785211cc31d80c477d6ac24f79f944975 100644 --- a/tapeserver/daemon/WatchdogMessage.proto +++ b/tapeserver/daemon/WatchdogMessage.proto @@ -26,10 +26,12 @@ message LogParam { } message WatchdogMessage { - required uint32 sessionstate = 20010; - required uint32 sessiontype = 20011; - optional uint64 totaldiskbytesmoved = 20012; - optional uint64 totaltapebytesmoved = 20013; - repeated LogParam addedlogparams = 20014; - repeated string deletedlogparams = 20015; + required bool reportingstate = 20010; + required uint32 sessionstate = 20011; + required uint32 sessiontype = 20012; + required bool reportingbytes = 20013; + optional uint64 totaldiskbytesmoved = 20014; + optional uint64 totaltapebytesmoved = 20015; + repeated LogParam addedlogparams = 20017; + repeated string deletedlogparams = 20018; }