Commit af41f663 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for log parameter processing in parent and child process of taped.

parent 3b21a6f2
......@@ -247,34 +247,43 @@ SubprocessHandler::ProcessingStatus DriveHandler::processEvent() {
try {
serializers::WatchdogMessage message;
message.ParseFromString(m_socketPair->receive());
// Logs are processed in all cases
processLogs(message);
switch((SessionState)message.sessionstate()) {
case SessionState::StartingUp:
return processStartingUp(message);
case SessionState::Checking:
return processChecking(message);
case SessionState::Scheduling:
return processScheduling(message);
case SessionState::Mounting:
return processMounting(message);
case SessionState::Running:
return processRunning(message);
case SessionState::Unmounting:
return processUnmounting(message);
case SessionState::DrainingToDisk:
return processDrainingToDisk(message);
case SessionState::ShutingDown:
return processShutingDown(message);
case SessionState::Fatal:
return processFatal(message);
default:
{
exception::Exception ex;
ex.getMessage() << "In DriveHandler::processEvent(): unexpected session state:"
<< session::toString((SessionState)message.sessionstate());
throw ex;
// If we report bytes, process the report
if (message.reportingbytes()) {
processBytes(message);
}
// If we report a state change, process it (last as this can change the return value)
if (message.reportingstate()) {
switch((SessionState)message.sessionstate()) {
case SessionState::StartingUp:
return processStartingUp(message);
case SessionState::Checking:
return processChecking(message);
case SessionState::Scheduling:
return processScheduling(message);
case SessionState::Mounting:
return processMounting(message);
case SessionState::Running:
return processRunning(message);
case SessionState::Unmounting:
return processUnmounting(message);
case SessionState::DrainingToDisk:
return processDrainingToDisk(message);
case SessionState::ShutingDown:
return processShutingDown(message);
case SessionState::Fatal:
return processFatal(message);
default:
{
exception::Exception ex;
ex.getMessage() << "In DriveHandler::processEvent(): unexpected session state:"
<< session::toString((SessionState)message.sessionstate());
throw ex;
}
}
}
return m_processingStatus;
} catch(cta::server::SocketPair::PeerDisconnected & ex) {
// The peer disconnected: close the socket pair and remove it from the epoll list.
if (m_socketPair.get()) {
......@@ -593,6 +602,13 @@ void DriveHandler::processLogs(serializers::WatchdogMessage& message) {
}
}
//------------------------------------------------------------------------------
// DriveHandler::processBytes
//------------------------------------------------------------------------------
void DriveHandler::processBytes(serializers::WatchdogMessage& message) {
throw cta::exception::Exception("In DriveHandler::processBytes(): not implemented");
}
//------------------------------------------------------------------------------
// DriveHandler::processSigChild
//------------------------------------------------------------------------------
......
......@@ -122,6 +122,8 @@ private:
std::unique_ptr<cta::server::SocketPair> m_socketPair;
/** Helper function accumulating logs */
void processLogs(serializers::WatchdogMessage & message);
/** Helper function accumulating bytes transferred */
void processBytes(serializers::WatchdogMessage & message);
};
// TODO: remove/merge ChildProcess.
......
......@@ -25,14 +25,33 @@ DriveHandlerProxy::DriveHandlerProxy(server::SocketPair& socketPair): m_socketPa
m_socketPair.close(server::SocketPair::Side::parent);
}
// TODO: me might want to group the messages to reduce the rate.
void DriveHandlerProxy::addLogParams(const std::string& unitName, const std::list<cta::log::Param>& params) {
throw cta::exception::Exception("In DriveHandlerProxy::addLogParams(): not implemented");
// TODO
serializers::WatchdogMessage watchdogMessage;
watchdogMessage.set_reportingstate(false);
watchdogMessage.set_reportingbytes(false);
for (auto & p: params) {
auto * lp = watchdogMessage.mutable_addedlogparams()->Add();
lp->set_name(p.getName());
lp->set_value(p.getValue());
}
std::string buffer;
watchdogMessage.SerializeToString(&buffer);
m_socketPair.send(buffer);
}
void DriveHandlerProxy::deleteLogParams(const std::string& unitName, const std::list<std::string>& paramNames) {
// TODO
throw cta::exception::Exception("In DriveHandlerProxy::deleteLogParams(): not implemented");
serializers::WatchdogMessage watchdogMessage;
watchdogMessage.set_reportingstate(false);
watchdogMessage.set_reportingbytes(false);
for (auto &pn: paramNames) {
auto * lpn = watchdogMessage.mutable_deletedlogparams()->Add();
*lpn = pn;
}
std::string buffer;
watchdogMessage.SerializeToString(&buffer);
m_socketPair.send(buffer);
}
void DriveHandlerProxy::labelError(const std::string& unitName, const std::string& message) {
......@@ -47,6 +66,8 @@ void DriveHandlerProxy::reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t t
void DriveHandlerProxy::reportState(const cta::tape::session::SessionState state, const cta::tape::session::SessionType type, const std::string& vid) {
serializers::WatchdogMessage watchdogMessage;
watchdogMessage.set_reportingstate(true);
watchdogMessage.set_reportingbytes(false);
watchdogMessage.set_totaldiskbytesmoved(0);
watchdogMessage.set_totaltapebytesmoved(0);
watchdogMessage.set_sessionstate((uint32_t)state);
......
......@@ -26,10 +26,12 @@ message LogParam {
}
message WatchdogMessage {
required uint32 sessionstate = 20010;
required uint32 sessiontype = 20011;
optional uint64 totaldiskbytesmoved = 20012;
optional uint64 totaltapebytesmoved = 20013;
repeated LogParam addedlogparams = 20014;
repeated string deletedlogparams = 20015;
required bool reportingstate = 20010;
required uint32 sessionstate = 20011;
required uint32 sessiontype = 20012;
required bool reportingbytes = 20013;
optional uint64 totaldiskbytesmoved = 20014;
optional uint64 totaltapebytesmoved = 20015;
repeated LogParam addedlogparams = 20017;
repeated string deletedlogparams = 20018;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment