Commit d8f41e5a authored by Eric Cano's avatar Eric Cano
Browse files

Implemented the cleaner process startup logic.

The cleaner is started if a tape could be in the drive after a session crashed. This depends on the previous
sesion state.
If a cleaner crashes, the next session will put the drive down.
The sessions do not fully report their states yet.
parent 0e2b69b5
......@@ -793,20 +793,123 @@ int DriveHandler::runChild() {
// We are in the child process. It is time to open connections to the catalogue
// and object store, and run the session.
// If the previous session crashed, and a tape is potentially in the drive (after
// areport of starting to mount and before a report of successful unmount),
// report of starting to mount and before a report of successful unmount),
// this session instance will be a cleaner session.
// This condition is detected with a non empty m_previousVID.
// This condition is detected with a non empty m_previousVID and the corresponding
// m_previousState and m_PreviousType.
// Finally, on a crashed cleaner session, we will put the drive down.
// A non crashed session which failed to unmount the tape will have put itself
// down (and already failed to unmount on its own).
// Otherwise, this session will run a regular data transfer session which will
// schedule itself info an empty drive probe, archive, retrieve or label session.
if (m_previousSession == PreviousSession::Crashed && m_previousVid.size()) {
// Create the channel to talk back to the parent process.
cta::tape::daemon::DriveHandlerProxy driveHandlerProxy(*m_socketPair);
// Before anything, we need to check we have access to the scheduler's central storages.
std::unique_ptr<cta::objectstore::Backend> backend(
cta::objectstore::BackendFactory::createBackend(m_tapedConfig.objectStoreURL.value()).release());
// If the backend is a VFS, make sure we don't delete it on exit.
// If not, nevermind.
try {
dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit();
} catch (std::bad_cast &){}
// Create the agent entry in the object store. This could fail (even before ping, so
// handle failure like a ping failure).
std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator;
std::unique_ptr<cta::OStoreDBWithAgent> osdb;
try {
backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend));
osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference()));
} catch(cta::exception::Exception &ex) {
log::ScopedParamContainer param(m_processManager.logContext());
param.add("errorMessage", ex.getMessageValue());
m_processManager.logContext().log(log::CRIT, "In DriveHandler::runChild(): failed to instantiate agent entry. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
sleep(1);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
}
std::unique_ptr<cta::catalogue::Catalogue> catalogue;
try {
const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(m_tapedConfig.fileCatalogConfigFile.value());
const uint64_t nbConns = 1;
catalogue=cta::catalogue::CatalogueFactory::create(catalogueLogin, nbConns);
} catch(cta::exception::Exception &ex) {
log::ScopedParamContainer param(m_processManager.logContext());
param.add("errorMessage", ex.getMessageValue());
m_processManager.logContext().log(log::CRIT, "In DriveHandler::runChild(): failed to instantiate catalogue. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
sleep(1);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
}
cta::Scheduler scheduler(*catalogue, *osdb, 5, 2*1000*1000); //TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them
// Before launching the transfer session, we validate that the scheduler is reachable.
try {
scheduler.ping();
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer param (m_processManager.logContext());
param.add("errorMessage", ex.getMessageValue());
m_processManager.logContext().log(log::CRIT, "In DriveHandler::runChild(): failed to ping central storage before session. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
}
// 1) Special case first, if we crashed in a cleaner session, we put the drive down
if (m_previousSession == PreviousSession::Crashed && m_previousType == SessionType::Cleanup) {
log::ScopedParamContainer params(m_processManager.logContext());
params.add("driveUnit", m_configLine.unitName);
m_processManager.logContext().log(log::ERR, "In DriveHandler::runChild(): the cleaner session crashed. Putting the drive down.");
// Get hold of the scheduler.
try {
cta::common::dataStructures::DriveInfo driveInfo;
driveInfo.driveName=m_configLine.unitName;
driveInfo.logicalLibrary=m_configLine.rawLibrarySlot;
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down);
cta::common::dataStructures::SecurityIdentity securityIdentity;
scheduler.setDesiredDriveState(securityIdentity, m_configLine.unitName, false, false);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer param(m_processManager.logContext());
param.add("errorMessage", ex.getMessageValue());
m_processManager.logContext().log(log::CRIT, "In DriveHandler::runChild(): failed to set the drive down. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
sleep(1);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
}
}
// 2) In the previous session crashed, we might want to run a cleaner session, depending
// on the previous state
std::set<SessionState> statesRequiringCleaner = { SessionState::Mounting,
SessionState::Running, SessionState::Unmounting };
if (m_previousSession == PreviousSession::Crashed && statesRequiringCleaner.count(m_previousState)) {
if (!m_previousVid.size()) {
m_processManager.logContext().log(log::ERR, "In DriveHandler::runChild(): Should run cleaner but VID is missing. Putting the drive down.");
try {
cta::common::dataStructures::DriveInfo driveInfo;
driveInfo.driveName=m_configLine.unitName;
driveInfo.logicalLibrary=m_configLine.rawLibrarySlot;
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down);
cta::common::dataStructures::SecurityIdentity securityIdentity;
scheduler.setDesiredDriveState(securityIdentity, m_configLine.unitName, false, false);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer param(m_processManager.logContext());
param.add("errorMessage", ex.getMessageValue());
m_processManager.logContext().log(log::CRIT, "In DriveHandler::runChild(): failed to set the drive down. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
sleep(1);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
}
}
// Log the decision
{
log::ScopedParamContainer params(m_processManager.logContext());
params.add("VID", m_previousVid)
.add("driveUnit", m_configLine.unitName);
m_processManager.logContext().log(log::ERR, "In DriveHandler::runChild(): starting cleaner after crash with tape potentially loaded.");
.add("driveUnit", m_configLine.unitName)
.add("PreviousState", session::toString(m_sessionState))
.add("PreviousType", session::toString(m_sessionType));
m_processManager.logContext().log(log::INFO, "In DriveHandler::runChild(): starting cleaner after crash with tape potentially loaded.");
}
// Capabilities management.
cta::server::ProcessCap capUtils;
......@@ -874,8 +977,6 @@ int DriveHandler::runChild() {
castor::tape::System::realWrapper sWrapper;
cta::tape::daemon::DriveHandlerProxy driveHandlerProxy(*m_socketPair);
castor::tape::tapeserver::daemon::DataTransferConfig dataTransferConfig;
dataTransferConfig.bufsz = m_tapedConfig.bufferSize.value();
dataTransferConfig.bulkRequestMigrationMaxBytes =
......@@ -896,54 +997,6 @@ int DriveHandler::runChild() {
dataTransferConfig.useLbp = true;
dataTransferConfig.xrootPrivateKey = "";
std::unique_ptr<cta::objectstore::Backend> backend(
cta::objectstore::BackendFactory::createBackend(m_tapedConfig.objectStoreURL.value()).release());
// If the backend is a VFS, make sure we don't delete it on exit.
// If not, nevermind.
try {
dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit();
} catch (std::bad_cast &){}
// Create the agent entry in the object store. This could fail (even before ping, so
// handle failure like a ping failure).
std::unique_ptr<cta::objectstore::BackendPopulator> backendPopulator;
std::unique_ptr<cta::OStoreDBWithAgent> osdb;
try {
backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend));
osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference()));
} catch(cta::exception::Exception &ex) {
log::ScopedParamContainer param(m_processManager.logContext());
param.add("errorMessage", ex.getMessageValue());
m_processManager.logContext().log(log::CRIT, "In DriveHandler::runChild(): failed to instantiate agent entry. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
sleep(1);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
}
std::unique_ptr<cta::catalogue::Catalogue> catalogue;
try {
const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(m_tapedConfig.fileCatalogConfigFile.value());
const uint64_t nbConns = 1;
catalogue=cta::catalogue::CatalogueFactory::create(catalogueLogin, nbConns);
} catch(cta::exception::Exception &ex) {
log::ScopedParamContainer param(m_processManager.logContext());
param.add("errorMessage", ex.getMessageValue());
m_processManager.logContext().log(log::CRIT, "In DriveHandler::runChild(): failed to instantiate catalogue. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
sleep(1);
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
}
cta::Scheduler scheduler(*catalogue, *osdb, 5, 2*1000*1000); //TODO: we have hardcoded the mount policy parameters here temporarily we will remove them once we know where to put them
// Before launching the transfer session, we validate that the scheduler is reachable.
try {
scheduler.ping();
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer param (m_processManager.logContext());
param.add("errorMessage", ex.getMessageValue());
m_processManager.logContext().log(log::CRIT, "In DriveHandler::runChild(): failed to ping central storage before session. Reporting fatal error.");
driveHandlerProxy.reportState(tape::session::SessionState::Fatal, tape::session::SessionType::Undetermined, "");
return castor::tape::tapeserver::daemon::Session::MARK_DRIVE_AS_DOWN;
}
// Before launching, and if this is the first session since daemon start, we will
// put the drive down.
if (m_previousSession == PreviousSession::Initiating) {
......
......@@ -62,6 +62,10 @@ private:
};
/** Representation of the outcome of the previous session/child process. */
PreviousSession m_previousSession=PreviousSession::Initiating;
/** Representation of the last know state of the previous session (useful for crashes) */
session::SessionState m_previousState = session::SessionState::StartingUp;
/** Representation of the last know type of the previous session (useful for crashes) */
session::SessionType m_previousType = session::SessionType::Undetermined;
/** Previous VID, that can help the unmount process */
std::string m_previousVid;
/** Representation of the status of the current process. */
......@@ -91,6 +95,8 @@ private:
SubprocessHandler::ProcessingStatus processFatal(serializers::WatchdogMessage & message);
/** Current session's type */
session::SessionType m_sessionType=session::SessionType::Undetermined;
/** Current session's VID */
std::string m_sessionVid;
/** Current session's parameters: they are accumulated during session's lifetime
* and logged as session ends */
log::LogContext m_sessionEndContext;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment