Commit 6357c57e authored by Steven Murray's avatar Steven Murray
Browse files

Moved cleaner from tapeserverd to ProcessForker

PLEASE NOTE that the cleaner is not yet launched correctly.  I will
fix this in a future commit.

With this commit the TapeDaemon process no longer forks a session.
All sessions (CleanerSession, DataTransferSession and LabelSession)
have  been moved to the ProcessForker.
parent cf28b768
......@@ -21,6 +21,17 @@
package castor.messages;
message ForkCleaner {
// Description of the tape drive
required string unitname = 1;
required string vid = 2;
required string dgn = 2;
required string devfilename = 3;
repeated string density = 4;
required string libraryslot = 5;
required string devtype = 6;
// Description of the tape associated with the drive
required string vid = 7;
// Configuration parameters of the session
required uint32 rmcport = 8;
}
......@@ -35,6 +35,8 @@
#include "castor/messages/StopProcessForker.pb.h"
#include "castor/messages/TapeserverProxyZmq.hpp"
#include "castor/tape/tapeserver/daemon/Constants.hpp"
#include "castor/tape/tapeserver/daemon/CleanerSession.hpp"
#include "castor/tape/tapeserver/daemon/DataTransferSession.hpp"
#include "castor/tape/tapeserver/daemon/LabelSession.hpp"
#include "castor/tape/tapeserver/daemon/ProcessForker.hpp"
#include "castor/tape/tapeserver/daemon/ProcessForkerUtils.hpp"
......@@ -289,6 +291,7 @@ castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult
std::list<log::Param> params;
params.push_back(log::Param("unitName", rqst.unitname()));
params.push_back(log::Param("vid", rqst.vid()));
params.push_back(log::Param("rmcPort", rqst.rmcport()));
m_log(LOG_INFO, "ProcessForker handling ForkCleaner message", params);
// Fork a label session
......@@ -314,8 +317,21 @@ castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult
// Else this is the child process
} else {
closeCmdReceiverSocket();
// TO BE DONE
exit(0);
try {
exit(runCleanerSession(rqst));
} catch(castor::exception::Exception &ne) {
log::Param params[] = {log::Param("message", ne.getMessage().str())};
m_log(LOG_ERR, "Failed to run cleaner session", params);
} catch(std::exception &ne) {
log::Param params[] = {log::Param("message", ne.what())};
m_log(LOG_ERR, "Failed to run cleaner session", params);
} catch(...) {
log::Param params[] = {log::Param("message",
"Caught an unknown exception")};
m_log(LOG_ERR, "Failed to run cleaner session", params);
}
exit(1);
}
}
......@@ -333,6 +349,7 @@ castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult
// Log the contents of the incomming request
std::list<log::Param> params;
params.push_back(log::Param("unitName", rqst.unitname()));
params.push_back(log::Param("rmcPort", rqst.rmcport()));
m_log(LOG_INFO, "ProcessForker handling ForkDataTransfer message", params);
// Fork a data-transfer session
......@@ -385,6 +402,7 @@ castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult
std::list<log::Param> params;
params.push_back(log::Param("unitName", rqst.unitname()));
params.push_back(log::Param("vid", rqst.vid()));
params.push_back(log::Param("rmcPort", rqst.rmcport()));
m_log(LOG_INFO, "ProcessForker handling ForkLabel message", params);
// Fork a label session
......@@ -444,6 +462,45 @@ castor::tape::tapeserver::daemon::ProcessForker::MsgHandlerResult
return createReturnValueResult(0, false);
}
//------------------------------------------------------------------------------
// runCleanerSession
//------------------------------------------------------------------------------
int castor::tape::tapeserver::daemon::ProcessForker::runCleanerSession(
const messages::ForkCleaner &rqst) {
try {
const utils::DriveConfig driveConfig = getDriveConfig(rqst);
std::list<log::Param> params;
params.push_back(log::Param("unitName", driveConfig.unitName));
params.push_back(log::Param("vid", rqst.vid()));
params.push_back(log::Param("rmcPort", rqst.rmcport()));
m_log(LOG_INFO, "Cleaner-session child-process started", params);
const int netTimeout = 10; // Timeout in seconds
legacymsg::RmcProxyTcpIp rmc(m_log, rqst.rmcport(), netTimeout);
castor::tape::System::realWrapper sWrapper;
CleanerSession cleanerSession(
rmc,
m_log,
driveConfig,
sWrapper);
// clean() returns 0 if drive should be put up or 1 if it should be put down
return cleanerSession.clean(rqst.vid());
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to run cleaner session: " << ne.getMessage().str();
throw ex;
} catch(std::exception &se) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to run cleaner session: " << se.what();
throw ex;
} catch(...) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to run cleaner session"
": Caught an unknown exception";
throw ex;
}
}
//------------------------------------------------------------------------------
// runDataTransferSession
//------------------------------------------------------------------------------
......@@ -776,6 +833,7 @@ int castor::tape::tapeserver::daemon::ProcessForker::runLabelSession(
std::list<log::Param> params;
params.push_back(log::Param("unitName", driveConfig.unitName));
params.push_back(log::Param("vid", labelJob.vid));
params.push_back(log::Param("rmcPort", rqst.rmcport()));
m_log(LOG_INFO, "Label-session child-process started", params);
const int netTimeout = 10; // Timeout in seconds
......
......@@ -25,6 +25,7 @@
#include "castor/legacymsg/TapeLabelRqstMsgBody.hpp"
#include "castor/log/Logger.hpp"
#include "castor/messages/ForkCleaner.pb.h"
#include "castor/messages/ForkDataTransfer.pb.h"
#include "castor/messages/ForkLabel.pb.h"
#include "castor/tape/tapeserver/daemon/DataTransferSession.hpp"
......@@ -217,6 +218,15 @@ private:
*/
MsgHandlerResult handleForkCleanerMsg(const ProcessForkerFrame &frame);
/**
* Runs a cleaner session. This method is to be called within the child
* process responsible for running the cleaner session.
*
* @param rqst The ForkCleaner message.
* @return The value to be used when exiting the child process.
*/
int runCleanerSession(const messages::ForkCleaner &rqst);
/**
* Runs a data-transfer session. This method is to be called within the
* child process responsible for running the data-transfer session.
......
......@@ -82,15 +82,16 @@ public:
/**
* Forks a cleaner session for the specified tape drive.
*
* @param unitName The unit name of the tape drive.
* @param driveConfig The configuration of the tape drive.
* @param vid If known then this string specifies the volume identifier of the
* tape in the drive if there is in fact a tape in the drive and its volume
* identifier is known. If the volume identifier is not known then this
* parameter should be set to an empty string.
* @param rmcPort The TCP/IP port on which the rmcd daemon is listening.
* @return The process identifier of the newly forked session.
*/
virtual pid_t forkCleaner(const std::string &unitName,
const std::string &vid) = 0;
virtual pid_t forkCleaner(const utils::DriveConfig &driveConfig,
const std::string &vid, const unsigned short rmcPort) = 0;
}; // class ProcessForkerProxy
......
......@@ -209,13 +209,13 @@ castor::messages::ForkLabel castor::tape::tapeserver::daemon::
// forkCleaner
//------------------------------------------------------------------------------
pid_t castor::tape::tapeserver::daemon::ProcessForkerProxySocket::
forkCleaner(const std::string &unitName, const std::string &vid) {
forkCleaner(const utils::DriveConfig &driveConfig, const std::string &vid,
const unsigned short rmcPort) {
// Request the process forker to fork a label session
messages::ForkCleaner msg;
msg.set_unitname(unitName);
msg.set_vid(vid);
ProcessForkerUtils::writeFrame(m_socketFd, msg);
const messages::ForkCleaner rqst = createForkCleanerMsg(driveConfig, vid,
rmcPort);
ProcessForkerUtils::writeFrame(m_socketFd, rqst);
// Read back the reply
messages::ForkSucceeded reply;
......@@ -226,3 +226,21 @@ pid_t castor::tape::tapeserver::daemon::ProcessForkerProxySocket::
return reply.pid();
}
//------------------------------------------------------------------------------
// createForkCleanerMsg
//------------------------------------------------------------------------------
castor::messages::ForkCleaner castor::tape::tapeserver::daemon::
ProcessForkerProxySocket::createForkCleanerMsg(
const utils::DriveConfig &driveConfig, const std::string &vid,
const unsigned short rmcPort) {
messages::ForkCleaner msg;
// Description of the tape drive
fillMsgWithDriveConfig(msg, driveConfig);
msg.set_vid(vid);
msg.set_rmcport(rmcPort);
return msg;
}
......@@ -102,15 +102,17 @@ public:
/**
* Forks a cleaner session for the specified tape drive.
*
* @param unitName The unit name of the tape drive.
*
* @param driveConfig The configuration of the tape drive.
* @param vid If known then this string specifies the volume identifier of the
* tape in the drive if there is in fact a tape in the drive and its volume
* identifier is known. If the volume identifier is not known then this
* parameter should be set to an empty string.
* @param rmcPort The TCP/IP port on which the rmcd daemon is listening.
* @return The process identifier of the newly forked session.
*/
pid_t forkCleaner(const std::string &unitName, const std::string &vid);
pid_t forkCleaner(const utils::DriveConfig &driveConfig,
const std::string &vid, const unsigned short rmcPort);
private:
......@@ -203,6 +205,20 @@ private:
msg.set_vid(labelJob.vid);
}
/**
* Creates a ForkDataLabel message from the specified tape-drive
* configuration and label job.
*
* @param driveConfig The configuration of the tape drive.
* @param vid The volume identifier of the tape associated with the tape
* drive.
* @param rmcPort The TCP/IP port on which the rmcd daemon is listening.
* @return The message.
*/
messages::ForkCleaner createForkCleanerMsg(
const utils::DriveConfig &driveConfig, const std::string &vid,
const unsigned short rmcPort);
}; // class ProcessForkerProxySocket
} // namespace daemon
......
......@@ -928,7 +928,8 @@ void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedProcess(
if(pid == m_processForkerPid) {
handleReapedProcessForker(pid, waitpidStat);
} else {
handleReapedSession(pid, waitpidStat);
log::Param params[] = {log::Param("pid", pid)};
m_log(LOG_ERR, "Reaped process was unknown", params);
}
}
......@@ -943,22 +944,6 @@ void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedProcessForker(
m_processForkerPid = 0;
}
//------------------------------------------------------------------------------
// handleReapedSession
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedSession(
const pid_t pid, const int waitpidStat) throw() {
try {
const DriveCatalogueEntry *const drive = m_driveCatalogue.findDrive(pid);
dispatchReapedSessionHandler(drive->getSessionType(), pid,
waitpidStat);
} catch(castor::exception::Exception &ne) {
log::Param params[] = {log::Param("message", ne.getMessage().str())};
m_log(LOG_ERR, "Failed to handle reaped session",
params);
}
}
//------------------------------------------------------------------------------
// logChildProcessTerminated
//------------------------------------------------------------------------------
......@@ -994,57 +979,6 @@ void castor::tape::tapeserver::daemon::TapeDaemon::logChildProcessTerminated(
m_log(LOG_INFO, "Child process terminated", params);
}
//------------------------------------------------------------------------------
// dispatchReapedSessionHandler
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::
dispatchReapedSessionHandler(
const DriveCatalogueEntry::SessionType sessionType,
const pid_t pid,
const int waitpidStat) {
switch(sessionType) {
case DriveCatalogueEntry::SESSION_TYPE_CLEANER:
return handleReapedCleanerSession(pid, waitpidStat);
default:
{
castor::exception::Exception ex;
ex.getMessage() << "Failed to dispatch handler for reaped session"
": Unexpected session type: sessionType=" << sessionType;
throw ex;
}
}
}
//------------------------------------------------------------------------------
// handleReapedCleanerSession
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedCleanerSession(
const pid_t pid, const int waitpidStat) {
try {
std::list<log::Param> params;
params.push_back(log::Param("cleanerPid", pid));
DriveCatalogueEntry *const drive = m_driveCatalogue.findDrive(pid);
const utils::DriveConfig &driveConfig = drive->getConfig();
if(WIFEXITED(waitpidStat) && 0 == WEXITSTATUS(waitpidStat)) {
const std::string vid = drive->getVid();
drive->sessionSucceeded();
m_log(LOG_INFO, "Cleaner session succeeded. Drive will stay UP", params);
notifyVdqmTapeUnmounted(driveConfig, vid, pid);
} else {
drive->sessionFailed();
m_log(LOG_INFO, "Cleaner session failed. Drive will go DOWN", params);
setDriveDownInVdqm(pid, drive->getConfig());
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to handle reaped cleaner session: " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// requestVdqmToReleaseDrive
//------------------------------------------------------------------------------
......@@ -1116,33 +1050,6 @@ void castor::tape::tapeserver::daemon::TapeDaemon::setDriveDownInVdqm(
}
}
//------------------------------------------------------------------------------
// handleReapedLabelSession
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::handleReapedLabelSession(
const pid_t pid, const int waitpidStat) {
try {
std::list<log::Param> params;
params.push_back(log::Param("labelPid", pid));
DriveCatalogueEntry *const drive = m_driveCatalogue.findDrive(pid);
if(WIFEXITED(waitpidStat) && 0 == WEXITSTATUS(waitpidStat)) {
drive->sessionSucceeded();
m_log(LOG_INFO, "Label session succeeded", params);
} else {
drive->sessionFailed();
m_log(LOG_INFO, "Label session failed", params);
setDriveDownInVdqm(pid, drive->getConfig());
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to handle reaped label session: " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// forkDataTransferSessions
//------------------------------------------------------------------------------
......@@ -1305,62 +1212,9 @@ void castor::tape::tapeserver::daemon::TapeDaemon::forkCleanerSession(
params.push_back(log::Param("unitName", driveConfig.unitName));
std::string vid = "";
m_processForker->forkCleaner(driveConfig.unitName, vid);
m_log.prepareForFork();
const pid_t forkRc = fork();
// If fork failed
if(0 > forkRc) {
// Log an error message and return
char message[100];
sstrerror_r(errno, message, sizeof(message));
params.push_back(log::Param("message", message));
m_log(LOG_ERR, "Failed to fork cleaner session", params);
// Else if this is the parent process
} else if(0 < forkRc) {
drive->forkedCleanerSession(forkRc);
} else { // Else this is the child process
// Clear the reactor which in turn will close all of the open
// file-descriptors owned by the event handlers
//m_reactor.clear();
runCleanerSession(drive, vid);
}
}
//------------------------------------------------------------------------------
// runCleanerSession
//------------------------------------------------------------------------------
void castor::tape::tapeserver::daemon::TapeDaemon::runCleanerSession(
DriveCatalogueEntry *drive, const std::string &vid) throw() {
const utils::DriveConfig &driveConfig = drive->getConfig();
std::list<log::Param> params;
params.push_back(log::Param("unitName", driveConfig.unitName));
m_log(LOG_INFO, "Cleaner-session child-process started", params);
const unsigned short rmcPort =
common::CastorConfiguration::getConfig().getConfEntInt(
"RMC", "PORT", (unsigned short)RMC_PORT, &m_log);
try {
std::auto_ptr<legacymsg::RmcProxy> rmc(m_rmcFactory.create());
castor::tape::System::realWrapper sWrapper;
castor::tape::tapeserver::daemon::CleanerSession cleanerSession(
*(rmc.get()),
m_log,
driveConfig,
sWrapper);
exit(cleanerSession.clean(vid)); //clean() returns 0 if drive should be put up or 1 if it should be put down
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Aborting cleaner session. Reason: " << ne.getMessage().str();
m_log(LOG_ERR, ex.getMessage().str());
} catch(std::exception &se) {
params.push_back(log::Param("message", se.what()));
m_log(LOG_ERR, "Aborting cleaner session: Caught an unexpected exception",
params);
} catch(...) {
m_log(LOG_ERR,
"Aborting cleaner session: Caught an unexpected and unknown exception");
}
exit(1);
m_processForker->forkCleaner(driveConfig, vid, rmcPort);
}
......@@ -397,14 +397,6 @@ protected:
void handleReapedProcessForker(const pid_t pid, const int waitpidStat)
throw();
/**
* Handles the specified reaped session.
*
* @param pid The process ID of the child process.
* @param waitpidStat The status information given by a call to waitpid().
*/
void handleReapedSession(const pid_t pid, const int waitpidStat) throw();
/**
* Logs the fact that the specified child process has terminated.
*
......@@ -414,28 +406,6 @@ protected:
void logChildProcessTerminated(const pid_t pid, const int waitpidStat)
throw();
/**
* Dispatches the appropriate post-processor of repaed sessions based on
* the specified session type.
*
* @sessionType The type of the reaped session.
* @param pid The process ID of the reaped session.
* @param waitpidStat The status information given by a call to waitpid().
*/
void dispatchReapedSessionHandler(
const DriveCatalogueEntry::SessionType sessionType,
const pid_t pid,
const int waitpidStat);
/**
* Does the required post processing for the specified reaped session.
*
* @param pid The process ID of the reaped session.
* @param waitpidStat The status information given by a call to waitpid().
*/
void handleReapedCleanerSession(const pid_t pid,
const int waitpidStat);
/**
* Sets the state of the tape drive asscoiated with the specified
* child process to down within the vdqmd daemon.
......@@ -458,14 +428,6 @@ protected:
size_t marshalTapeRcReplyMsg(char *const dst, const size_t dstLen,
const int rc) ;
/**
* Handles the specified reaped session.
*
* @param pid The process ID of the reaped session.
* @param waitpidStat The status information given by a call to waitpid().
*/
void handleReapedLabelSession(const pid_t pid, const int waitpidStat);
/**
* Request the vdqmd daemon to release the tape drive associated with the
* session child-process with the specified process ID.
......@@ -548,17 +510,6 @@ protected:
*/
void forkCleanerSession(DriveCatalogueEntry *drive);
/**
* Runs the cleaner session. This method is to be called within the child
* process responsible for running the cleaner session.
*
* @param drive The catalogue entry of the tape drive to be used during the
* session.
* @param vid The identifier of the mounted volume (one can pass the empty string
* in case it is unknown, as it not used except for logging purposes).
*/
void runCleanerSession(DriveCatalogueEntry *drive, const std::string &vid) throw();
/**
* Catalogue used to keep track of both the initial and current state of
* each tape drive being controlled by the tapeserverd daemon.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment