Commit 97ec27f4 authored by Eric Cano's avatar Eric Cano
Browse files

Added disk reporter to the new maintenance process.

The maintenance process replaces the garbage collector process. It now
runs both garbage collection and disk reports.
parent e0710afc
......@@ -27,7 +27,7 @@ class Scheduler;
class DiskReportRunner {
public:
DiskReportRunner(Scheduler & scheduler);
DiskReportRunner(Scheduler & scheduler): m_scheduler(scheduler) {}
void runOnePass(log::LogContext & lc);
......
......@@ -15,7 +15,7 @@ add_library(ctatapedaemon
CommandLineParams.cpp
DriveHandler.cpp
DriveHandlerProxy.cpp
GarbageCollectorHandler.cpp
MaintenanceHandler.cpp
SignalHandler.cpp
SubprocessHandler.cpp
ProcessManager.cpp
......
......@@ -16,7 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "GarbageCollectorHandler.hpp"
#include "MaintenanceHandler.hpp"
#include "common/exception/Errnum.hpp"
#include "objectstore/AgentHeartbeatThread.hpp"
#include "objectstore/BackendPopulator.hpp"
......@@ -29,6 +29,7 @@
#include "scheduler/Scheduler.hpp"
#include "rdbms/Login.hpp"
#include "common/make_unique.hpp"
#include "scheduler/DiskReportRunner.hpp"
#include <signal.h>
#include <sys/wait.h>
......@@ -39,7 +40,7 @@ namespace cta { namespace tape { namespace daemon {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
GarbageCollectorHandler::GarbageCollectorHandler(const TapedConfiguration& tapedConfig, ProcessManager& pm):
MaintenanceHandler::MaintenanceHandler(const TapedConfiguration& tapedConfig, ProcessManager& pm):
SubprocessHandler("garbageCollector"), m_processManager(pm), m_tapedConfig(tapedConfig) {
// As the handler is started, its first duty is to create a new subprocess. This
// will be managed by the process manager (initial request in getInitialStatus)
......@@ -48,7 +49,7 @@ SubprocessHandler("garbageCollector"), m_processManager(pm), m_tapedConfig(taped
//------------------------------------------------------------------------------
// GarbageCollectorHandler::getInitialStatus
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus GarbageCollectorHandler::getInitialStatus() {
SubprocessHandler::ProcessingStatus MaintenanceHandler::getInitialStatus() {
m_processingStatus.forkRequested=true;
return m_processingStatus;
}
......@@ -56,7 +57,7 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::getInitialStatus()
//------------------------------------------------------------------------------
// GarbageCollectorHandler::getInitialStatus
//------------------------------------------------------------------------------
void GarbageCollectorHandler::postForkCleanup() {
void MaintenanceHandler::postForkCleanup() {
// We are in the child process of another handler. We can close our socket pair
// without re-registering it from poll.
m_socketPair.reset(nullptr);
......@@ -66,7 +67,7 @@ void GarbageCollectorHandler::postForkCleanup() {
//------------------------------------------------------------------------------
// GarbageCollectorHandler::fork
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus GarbageCollectorHandler::fork() {
SubprocessHandler::ProcessingStatus MaintenanceHandler::fork() {
// If anything fails while attempting to fork, we will have to declare ourselves
// failed and ask for a shutdown by sending the TERM signal to the parent process
// This will ensure the shutdown-kill sequence managed by the signal handler without code duplication.
......@@ -77,7 +78,7 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::fork() {
m_socketPair.reset(new cta::server::SocketPair());
// and fork
m_pid=::fork();
exception::Errnum::throwOnMinusOne(m_pid, "In DriveHandler::fork(): failed to fork()");
exception::Errnum::throwOnMinusOne(m_pid, "In MaintenanceHandler::fork(): failed to fork()");
if (!m_pid) {
// We are in the child process
SubprocessHandler::ProcessingStatus ret;
......@@ -93,7 +94,7 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::fork() {
}
} catch (cta::exception::Exception & ex) {
cta::log::ScopedParamContainer params(m_processManager.logContext());
m_processManager.logContext().log(log::ERR, "Failed to fork garbage collector process. Initiating shutdown with SIGTERM.");
m_processManager.logContext().log(log::ERR, "Failed to fork maintenance process. Initiating shutdown with SIGTERM.");
// Wipe all previous states as we are shutting down
m_processingStatus = SubprocessHandler::ProcessingStatus();
m_processingStatus.shutdownComplete=true;
......@@ -107,7 +108,7 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::fork() {
//------------------------------------------------------------------------------
// GarbageCollectorHandler::kill
//------------------------------------------------------------------------------
void GarbageCollectorHandler::kill() {
void MaintenanceHandler::kill() {
// If we have a subprocess, kill it and wait for completion (if needed). We do not need to keep
// track of the exit state as kill() means we will not be called anymore.
log::ScopedParamContainer params(m_processManager.logContext());
......@@ -127,29 +128,29 @@ void GarbageCollectorHandler::kill() {
} else {
params.add("WIFSIGNALED", WIFSIGNALED(status));
}
m_processManager.logContext().log(log::INFO, "In GarbageCollectorHandler::kill(): sub process completed");
m_processManager.logContext().log(log::INFO, "In MaintenanceHandler::kill(): sub process completed");
} catch (exception::Exception & ex) {
params.add("Exception", ex.getMessageValue());
m_processManager.logContext().log(log::ERR, "In GarbageCollectorHandler::kill(): failed to kill existing subprocess");
m_processManager.logContext().log(log::ERR, "In MaintenanceHandler::kill(): failed to kill existing subprocess");
}
} else {
m_processManager.logContext().log(log::INFO, "In GarbageCollectorHandler::kill(): no subprocess to kill");
m_processManager.logContext().log(log::INFO, "In MaintenanceHandler::kill(): no subprocess to kill");
}
}
//------------------------------------------------------------------------------
// GarbageCollectorHandler::processEvent
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus GarbageCollectorHandler::processEvent() {
SubprocessHandler::ProcessingStatus MaintenanceHandler::processEvent() {
// We do not expect any feedback for the child process...
m_processManager.logContext().log(log::WARNING, "In GarbageCollectorHandler::processEvent(): spurious event");
m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::processEvent(): spurious event");
return m_processingStatus;
}
//------------------------------------------------------------------------------
// GarbageCollectorHandler::processSigChild
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus GarbageCollectorHandler::processSigChild() {
SubprocessHandler::ProcessingStatus MaintenanceHandler::processSigChild() {
// Check out child process's status. If the child process is still around,
// waitpid will return 0. Non zero if process completed (and status needs to
// be picked up) and -1 if the process is entirely gone.
......@@ -166,7 +167,7 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::processSigChild() {
params.add("pid", m_pid)
.add("Message", ex.getMessageValue());
m_processManager.logContext().log(log::WARNING,
"In GarbageCollectorHandler::processSigChild(): failed to get child process exit code. Doing nothing as we are unable to determine if it is still running or not.");
"In MaintenanceHandler::processSigChild(): failed to get child process exit code. Doing nothing as we are unable to determine if it is still running or not.");
return m_processingStatus;
}
if (rc) {
......@@ -182,11 +183,11 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::processSigChild() {
params.add("exitCode", WEXITSTATUS(processStatus));
// If we are shutting down, we should not request a new session.
if (!m_shutdownInProgress) {
m_processManager.logContext().log(log::INFO, "Garbage collector subprocess exited. Will spawn a new one.");
m_processManager.logContext().log(log::INFO, "Maintenance subprocess exited. Will spawn a new one.");
m_processingStatus.forkRequested=true;
m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
} else {
m_processManager.logContext().log(log::INFO, "Garbage collector subprocess exited. Will not spawn new one as we are shutting down.");
m_processManager.logContext().log(log::INFO, "Maintenance subprocess exited. Will not spawn new one as we are shutting down.");
m_processingStatus.forkRequested=false;
m_processingStatus.shutdownComplete=true;
m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
......@@ -197,11 +198,11 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::processSigChild() {
.add("CoreDump", WCOREDUMP(processStatus));
// If we are shutting down, we should not request a new session.
if (!m_shutdownInProgress) {
m_processManager.logContext().log(log::INFO, "Garbage collector subprocess crashed. Will spawn a new one.");
m_processManager.logContext().log(log::INFO, "Maintenance subprocess crashed. Will spawn a new one.");
m_processingStatus.forkRequested=true;
m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
} else {
m_processManager.logContext().log(log::INFO, "Garbage collector subprocess crashed. Will not spawn new one as we are shutting down.");
m_processManager.logContext().log(log::INFO, "Maintenance subprocess crashed. Will not spawn new one as we are shutting down.");
m_processingStatus.forkRequested=false;
m_processingStatus.shutdownComplete=true;
m_processingStatus.nextTimeout=m_processingStatus.nextTimeout.max();
......@@ -216,19 +217,19 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::processSigChild() {
//------------------------------------------------------------------------------
// GarbageCollectorHandler::processTimeout
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus GarbageCollectorHandler::processTimeout() {
SubprocessHandler::ProcessingStatus MaintenanceHandler::processTimeout() {
// The only time we expect a timeout is when shutting down
if (!m_shutdownInProgress) {
m_processManager.logContext().log(log::WARNING, "In GarbageCollectorHandler::processTimeout(): spurious timeout: no shutdown");
m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::processTimeout(): spurious timeout: no shutdown");
return m_processingStatus;
}
// We were shutting down and the child process did not exit in time. Killing it.
if (-1!=m_pid) {
m_processManager.logContext().log(log::WARNING,
"In GarbageCollectorHandler::processTimeout(): spurious timeout: no more process");
"In MaintenanceHandler::processTimeout(): spurious timeout: no more process");
} else {
// We will help the exit of the child process by killing it.
m_processManager.logContext().log(log::WARNING, "In GarbageCollectorHandler::processTimeout(): will kill subprocess");
m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::processTimeout(): will kill subprocess");
kill();
}
// In all cases, the shutdown is complete.
......@@ -240,7 +241,7 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::processTimeout() {
//------------------------------------------------------------------------------
// GarbageCollectorHandler::runChild
//------------------------------------------------------------------------------
int GarbageCollectorHandler::runChild() {
int MaintenanceHandler::runChild() {
// We are in the child process. It is time to open connections to the catalogue
// and object store, and run the garbage collector.
// We do not have to care for previous crashed sessions as we will garbage
......@@ -267,7 +268,7 @@ int GarbageCollectorHandler::runChild() {
std::unique_ptr<cta::catalogue::Catalogue> catalogue;
std::unique_ptr<cta::Scheduler> scheduler;
try {
backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, "GarbageCollector", m_processManager.logContext()));
backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, "Maintenance", m_processManager.logContext()));
osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *catalogue, m_processManager.logContext().logger()));
const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(m_tapedConfig.fileCatalogConfigFile.value());
const uint64_t nbConns = 1;
......@@ -281,14 +282,14 @@ int GarbageCollectorHandler::runChild() {
log::ScopedParamContainer param(m_processManager.logContext());
param.add("errorMessage", ex.getMessageValue());
m_processManager.logContext().log(log::CRIT,
"In GarbageCollectorHandler::runChild(): contact central storage. Waiting for shutdown.");
"In MaintenanceHandler::runChild(): contact central storage. Waiting for shutdown.");
}
server::SocketPair::pollMap pollList;
pollList["0"]=m_socketPair.get();
// Wait forever (negative timeout) for something to come from parent process.
server::SocketPair::poll(pollList, -1, server::SocketPair::Side::parent);
m_processManager.logContext().log(log::INFO,
"In GarbageCollectorHandler::runChild(): Received shutdown message after failure to contact storage. Exiting.");
"In MaintenanceHandler::runChild(): Received shutdown message after failure to contact storage. Exiting.");
return EXIT_FAILURE;
}
......@@ -296,31 +297,34 @@ int GarbageCollectorHandler::runChild() {
objectstore::AgentHeartbeatThread agentHeartbeat(backendPopulator->getAgentReference(), *backend, m_processManager.logContext().logger());
agentHeartbeat.startThread();
// Create the garbage collector itself
// Create the garbage collector and the disk reporter
objectstore::GarbageCollector gc(*backend, backendPopulator->getAgentReference(), *catalogue);
DiskReportRunner diskReportRunner(*scheduler);
// Run the gc in a loop
// Run the maintenance in a loop: garbage collector and disk reporter
try {
server::SocketPair::pollMap pollList;
pollList["0"]=m_socketPair.get();
bool receivedMessage=false;
do {
utils::Timer t;
m_processManager.logContext().log(log::DEBUG,
"In GarbageCollectorHandler::runChild(): About to run a GC pass.");
"In MaintenanceHandler::runChild(): About to run a GC pass.");
gc.runOnePass(m_processManager.logContext());
diskReportRunner.runOnePass(m_processManager.logContext());
try {
server::SocketPair::poll(pollList, s_pollInterval, server::SocketPair::Side::parent);
server::SocketPair::poll(pollList, s_pollInterval - t.secs(), server::SocketPair::Side::parent);
receivedMessage=true;
} catch (server::SocketPair::Timeout & ex) {}
} while (!receivedMessage);
m_processManager.logContext().log(log::INFO,
"In GarbageCollectorHandler::runChild(): Received shutdown message. Exiting.");
"In MaintenanceHandler::runChild(): Received shutdown message. Exiting.");
} catch (cta::exception::Exception & ex) {
{
log::ScopedParamContainer params(m_processManager.logContext());
params.add("Message", ex.getMessageValue());
m_processManager.logContext().log(log::ERR,
"In GarbageCollectorHandler::runChild(): received an exception. Backtrace follows.");
"In MaintenanceHandler::runChild(): received an exception. Backtrace follows.");
}
m_processManager.logContext().logBacktrace(log::ERR, ex.backtrace());
}
......@@ -331,14 +335,14 @@ int GarbageCollectorHandler::runChild() {
//------------------------------------------------------------------------------
// GarbageCollectorHandler::shutdown
//------------------------------------------------------------------------------
SubprocessHandler::ProcessingStatus GarbageCollectorHandler::shutdown() {
SubprocessHandler::ProcessingStatus MaintenanceHandler::shutdown() {
// We will signal the shutdown to the child process by sending a byte over the
// socket pair (if we have one)
m_shutdownInProgress=true;
if (!m_socketPair.get()) {
m_processManager.logContext().log(log::WARNING, "In GarbageCollectorHandler::shutdown(): no socket pair");
m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::shutdown(): no socket pair");
} else {
m_processManager.logContext().log(log::INFO, "In GarbageCollectorHandler::shutdown(): sent shutdown message to child process");
m_processManager.logContext().log(log::INFO, "In MaintenanceHandler::shutdown(): sent shutdown message to child process");
m_socketPair->send("\0");
}
return m_processingStatus;
......@@ -347,13 +351,13 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::shutdown() {
//------------------------------------------------------------------------------
// GarbageCollectorHandler::~GarbageCollectorHandler
//------------------------------------------------------------------------------
GarbageCollectorHandler::~GarbageCollectorHandler() {
MaintenanceHandler::~MaintenanceHandler() {
// If we still have a child process (should not), just stop it the hard way.
if (-1 != m_pid) {
cta::log::ScopedParamContainer params(m_processManager.logContext());
params.add("pid", m_pid);
::kill(m_pid, SIGKILL);
m_processManager.logContext().log(log::WARNING, "In GarbageCollectorHandler::~GarbageCollectorHandler(): killed leftover subprocess");
m_processManager.logContext().log(log::WARNING, "In MaintenanceHandler::~GarbageCollectorHandler(): killed leftover subprocess");
}
}
......
......@@ -29,10 +29,10 @@ namespace cta { namespace tape { namespace daemon {
* Handler for garbage collector subprocesses. This long lived process should live
* as long as the main process, but will be respawned in case of crash.
*/
class GarbageCollectorHandler: public SubprocessHandler {
class MaintenanceHandler: public SubprocessHandler {
public:
GarbageCollectorHandler(const TapedConfiguration & tapedConfig, ProcessManager & pm);
virtual ~GarbageCollectorHandler();
MaintenanceHandler(const TapedConfiguration & tapedConfig, ProcessManager & pm);
virtual ~MaintenanceHandler();
SubprocessHandler::ProcessingStatus getInitialStatus() override;
SubprocessHandler::ProcessingStatus fork() override;
void postForkCleanup() override;
......
......@@ -23,7 +23,7 @@
#include "ProcessManager.hpp"
#include "SignalHandler.hpp"
#include "DriveHandler.hpp"
#include "GarbageCollectorHandler.hpp"
#include "MaintenanceHandler.hpp"
#include <google/protobuf/service.h>
#include <limits.h>
#include <sys/prctl.h>
......@@ -112,7 +112,7 @@ void cta::tape::daemon::TapeDaemon::mainEventLoop() {
pm.addHandler(std::move(dh));
}
// Create the garbage collector
std::unique_ptr<GarbageCollectorHandler> gc(new GarbageCollectorHandler(m_globalConfiguration, pm));
std::unique_ptr<MaintenanceHandler> gc(new MaintenanceHandler(m_globalConfiguration, pm));
pm.addHandler(std::move(gc));
// And run the process manager
int ret=pm.run();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment