Commit d166fdb5 authored by Steven Murray's avatar Steven Murray
Browse files

Added the Daemon and MutlithreadedDaemon classes. The stager, tape gateway,

tape bridge and vdqm daemons will eventaully inherit from one of these new
classes.  This commit is a step towards this goal.  Please note that this
commit modifies the castor::rh::Server and castor::server::BaseDaemon classes
in not so desirable ways.  The castor::rh::Server has been modified so that it
does not use the MetricCollector class.  The castor::server::BaseDaemon class
has been modified so that it does not use either the UDPListenerThreadPool or
the MetricCollector class.  I will put back this code in another commit.
parent 4d6b06cc
......@@ -161,8 +161,10 @@ set (CLIENT_LIB_SRC_FILES
server/BaseServer.cpp
server/AuthListenerThreadPool.cpp
server/BaseThreadPool.cpp
server/Daemon.cpp
server/DynamicThreadPool.cpp
server/ListenerThreadPool.cpp
server/MultiThreadedDaemon.cpp
server/TCPListenerThreadPool.cpp
server/UDPListenerThreadPool.cpp
server/ForkedProcessPool.cpp
......
......@@ -148,8 +148,10 @@ STGLIB_SRCS = BaseAddress.cpp \
server/BaseServer.cpp \
server/AuthListenerThreadPool.cpp \
server/BaseThreadPool.cpp \
server/Daemon.cpp \
server/DynamicThreadPool.cpp \
server/ListenerThreadPool.cpp \
server/MultiThreadedDaemon.cpp \
server/TCPListenerThreadPool.cpp \
server/UDPListenerThreadPool.cpp \
server/ForkedProcessPool.cpp \
......
/******************************************************************************
/*******************************************************************************
* castor/log/DummyLogger.cpp
*
* This file is part of the Castor project.
......@@ -20,7 +20,7 @@
* Interface to the CASTOR logging system
*
* @author Steven.Murray@cern.ch
*****************************************************************************/
******************************************************************************/
#include "castor/log/DummyLogger.hpp"
......@@ -38,9 +38,16 @@ castor::log::DummyLogger::DummyLogger(const std::string &programName)
castor::log::DummyLogger::~DummyLogger() throw() {
}
//-----------------------------------------------------------------------------
//------------------------------------------------------------------------------
// prepareForFork
//------------------------------------------------------------------------------
void castor::log::DummyLogger::prepareForFork()
throw(castor::exception::Internal) {
}
//------------------------------------------------------------------------------
// logMsg
//-----------------------------------------------------------------------------
//------------------------------------------------------------------------------
void castor::log::DummyLogger::logMsg(
const int priority,
const std::string &msg,
......@@ -49,9 +56,9 @@ void castor::log::DummyLogger::logMsg(
const struct timeval &timeStamp) throw() {
}
//-----------------------------------------------------------------------------
//------------------------------------------------------------------------------
// logMsg
//-----------------------------------------------------------------------------
//------------------------------------------------------------------------------
void castor::log::DummyLogger::logMsg(
const int priority,
const std::string &msg,
......@@ -59,9 +66,9 @@ void castor::log::DummyLogger::logMsg(
const log::Param params[]) throw() {
}
//-----------------------------------------------------------------------------
//------------------------------------------------------------------------------
// logMsg
//-----------------------------------------------------------------------------
//------------------------------------------------------------------------------
void castor::log::DummyLogger::logMsg(
const int priority,
const std::string &msg) throw() {
......
......@@ -62,6 +62,14 @@ public:
*/
~DummyLogger() throw();
/**
* Prepares the logger object for a call to fork().
*
* No further calls to logMsg() should be made after calling this method
* until the call to fork() has completed.
*/
void prepareForFork() throw(castor::exception::Internal);
/**
* Dummy logMsg() method that does nothing.
*
......
......@@ -62,15 +62,23 @@ public:
throw(castor::exception::Internal, castor::exception::InvalidArgument);
/**
* Returns the name of the program that is to be prepended to every log
* message.
* Destructor.
*/
const std::string &getProgramName() const throw();
virtual ~Logger() throw() = 0;
/**
* Destructor.
* Prepares the logger object for a call to fork().
*
* No further calls to logMsg() should be made after calling this method
* until the call to fork() has completed.
*/
virtual ~Logger() throw() = 0;
virtual void prepareForFork() throw(castor::exception::Internal) = 0;
/**
* Returns the name of the program that is to be prepended to every log
* message.
*/
const std::string &getProgramName() const throw();
/**
* Writes a message into the CASTOR logging system. Note that no exception
......
......@@ -164,6 +164,36 @@ void castor::log::LoggerImplementation::initMutex()
castor::log::LoggerImplementation::~LoggerImplementation() throw() {
}
//------------------------------------------------------------------------------
// prepareForFork
//------------------------------------------------------------------------------
void castor::log::LoggerImplementation::prepareForFork()
throw(castor::exception::Internal) {
// Enter critical section
{
const int mutex_lock_rc = pthread_mutex_lock(&m_mutex);
if(0 != mutex_lock_rc) {
castor::exception::Internal ex;
ex.getMessage() << "Failed to lock mutex of logger's critcial section: "
<< sstrerror(mutex_lock_rc);
throw(ex);
}
}
closeLog();
// Leave critical section.
{
const int mutex_unlock_rc = pthread_mutex_unlock(&m_mutex);
if(0 != mutex_unlock_rc) {
castor::exception::Internal ex;
ex.getMessage() << "Failed to unlock mutex of logger's critcial section: "
<< sstrerror(mutex_unlock_rc);
throw(ex);
}
}
}
//------------------------------------------------------------------------------
// openLog
//------------------------------------------------------------------------------
......@@ -383,7 +413,11 @@ void castor::log::LoggerImplementation::reducedSyslog(std::string msg)
send_flags = MSG_NOSIGNAL;
#endif
// enter critical section
pthread_mutex_lock(&m_mutex);
const int mutex_lock_rc = pthread_mutex_lock(&m_mutex);
// Do nothing if we failed to enter the critical section
if(0 != mutex_lock_rc) {
return;
}
// Try to connect if not already connected
if(!m_connected) {
......
......@@ -56,6 +56,14 @@ public:
*/
~LoggerImplementation() throw();
/**
* Prepares the logger object for a call to fork().
*
* No further calls to logMsg() should be made after calling this method
* until the call to fork() has completed.
*/
void prepareForFork() throw(castor::exception::Internal);
/**
* Writes a message into the CASTOR logging system. Note that no exception
* will ever be thrown in case of failure. Failures will actually be
......
......@@ -36,12 +36,15 @@
#include "castor/metrics/InternalCounter.hpp"
// Initialization of the singleton
castor::metrics::MetricsCollector* castor::metrics::MetricsCollector::s_instance(0);
castor::metrics::MetricsCollector*
castor::metrics::MetricsCollector::s_instance(0);
//------------------------------------------------------------------------------
// getInstance
//------------------------------------------------------------------------------
castor::metrics::MetricsCollector* castor::metrics::MetricsCollector::getInstance(castor::server::BaseDaemon* daemon)
castor::metrics::MetricsCollector*
castor::metrics::MetricsCollector::getInstance(
castor::server::MultiThreadedDaemon* daemon)
{
if(s_instance == 0 && daemon) {
// No need to protect this with mutexes as the instantiation
......@@ -56,9 +59,9 @@ castor::metrics::MetricsCollector* castor::metrics::MetricsCollector::getInstanc
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
castor::metrics::MetricsCollector::MetricsCollector(castor::server::BaseDaemon& daemon) :
castor::server::SignalThreadPool("metrics",
new UpdateThread()),
castor::metrics::MetricsCollector::MetricsCollector(
castor::server::MultiThreadedDaemon& daemon) :
castor::server::SignalThreadPool("metrics", new UpdateThread()),
m_daemon(daemon)
{
m_nbThreads = 1;
......
......@@ -31,7 +31,7 @@
#include <map>
#include "castor/IObject.hpp"
#include "castor/server/SignalThreadPool.hpp"
#include "castor/server/BaseDaemon.hpp"
#include "castor/server/MultiThreadedDaemon.hpp"
#include "castor/exception/Exception.hpp"
#include "castor/metrics/Histogram.hpp"
......@@ -49,7 +49,8 @@ namespace castor {
public:
/// This class is a singleton
static MetricsCollector* getInstance(castor::server::BaseDaemon* daemon = 0);
static MetricsCollector* getInstance(
castor::server::MultiThreadedDaemon* daemon = 0);
/// Default destructor
virtual ~MetricsCollector() throw ();
......@@ -109,13 +110,13 @@ namespace castor {
static MetricsCollector* s_instance;
/// Default constructor
MetricsCollector(castor::server::BaseDaemon& daemon);
MetricsCollector(castor::server::MultiThreadedDaemon& daemon);
/// Hash map of all histograms, indexed by name
std::map<std::string, Histogram*> m_histograms;
/// Reference to the daemon for logging purposes and for resetAllMetrics
castor::server::BaseDaemon& m_daemon;
castor::server::MultiThreadedDaemon& m_daemon;
/// Dump file location
std::string m_dumpFileLocation;
......
......@@ -283,6 +283,7 @@ void castor::rh::Server::parseCommandLine(int argc, char *argv[]) throw (castor:
if(metrics) {
// initialize the metrics collector thread and add custom metrics
/*
castor::metrics::MetricsCollector* mc =
castor::metrics::MetricsCollector::getInstance(this);
mc->addHistogram(new castor::metrics::Histogram(
......@@ -291,6 +292,7 @@ void castor::rh::Server::parseCommandLine(int argc, char *argv[]) throw (castor:
"SvcClasses", &castor::rh::SvcClassCounter::instantiate));
mc->addHistogram(new castor::metrics::Histogram(
"Users", &castor::rh::UserCounter::instantiate));
*/
}
}
......
......@@ -118,11 +118,12 @@ void castor::server::BaseDaemon::addNotifierThreadPool(int port)
// This is a pool for internal use, we don't use addThreadPool
// so to not change the command line parsing behavior
/*
m_threadPools['_'] =
new castor::server::UDPListenerThreadPool("_NotifierThread",
castor::server::NotifierThread::getInstance(this),
port);
*/
// we run the notifier in the same thread as the listening one
m_threadPools['_']->setNbThreads(0);
}
......@@ -185,7 +186,7 @@ void castor::server::BaseDaemon::parseCommandLine(int argc, char *argv[])
break;
case 'm':
// initialize the metrics collector thread
castor::metrics::MetricsCollector::getInstance(this);
//castor::metrics::MetricsCollector::getInstance(this);
break;
default:
tp = m_threadPools.find(c);
......
/*******************************************************************************
* castor/server/Daemon.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* @author castor dev team
******************************************************************************/
#include "castor/dlf/Dlf.hpp"
#include "castor/io/UDPSocket.hpp"
#include "castor/server/Daemon.hpp"
#include "castor/server/ThreadNotification.hpp"
#include "castor/System.hpp"
#include <signal.h>
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
castor::server::Daemon::Daemon(log::Logger &logger):
m_foreground(false),
m_runAsStagerSuperuser(false),
m_logger(logger) {
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
castor::server::Daemon::~Daemon() throw() {
}
//------------------------------------------------------------------------------
// getServerName
//------------------------------------------------------------------------------
const std::string &castor::server::Daemon::getServerName() const throw() {
return m_logger.getProgramName();
}
//------------------------------------------------------------------------------
// runAsStagerSuperuser
//------------------------------------------------------------------------------
void castor::server::Daemon::runAsStagerSuperuser() throw() {
m_runAsStagerSuperuser = true;
}
//-----------------------------------------------------------------------------
// dlfInit
//-----------------------------------------------------------------------------
void castor::server::Daemon::dlfInit(castor::dlf::Message messages[])
throw (castor::exception::Exception) {
castor::dlf::dlf_init((char*)m_logger.getProgramName().c_str(), messages);
// Add framework specific messages
castor::dlf::Message frameworkMessages[] =
{{ 1, "Error while reading datagrams" },
{ 2, "Error while accepting connections" },
{ 3, "Thread pool started" },
{ 4, "Exception caught in the user thread" },
{ 5, "Thread run error" },
{ 6, "NotifierThread exception" },
{ 8, "Exception caught while initializing the child process" },
{ 9, "Error while processing an object from the pipe" },
{ 10, "Uncaught exception in a thread from pool" },
{ 11, "Uncaught GENERAL exception in a thread from pool" },
{ 12, "Caught signal - GRACEFUL STOP" },
{ 14, "Caught signal - CHILD STOPPED" },
{ 15, "Signal caught but not handled - IMMEDIATE STOP" },
{ 16, "Exception during wait for signal loop" },
{ 18, "No idle thread in pool to process request" },
{ 19, "Error while dispatching to a thread" },
{ 20, "Spawning a new thread in pool" },
{ 21, "Terminating a thread in pool" },
{ 22, "Task processed" },
{ -1, "" }};
castor::dlf::dlf_addMessages(DLF_BASE_FRAMEWORK, frameworkMessages);
}
//------------------------------------------------------------------------------
// daemonize
//------------------------------------------------------------------------------
void castor::server::Daemon::daemonize() throw (castor::exception::Exception) {
// If the daemon is to be run in the background
if (!m_foreground) {
m_logger.prepareForFork();
// We could set our working directory to '/' here with a call to chdir(2).
// For the time being we don't and leave it to the initd script to change
// to a suitable directory for us!
int pid = fork();
if (pid < 0) {
castor::exception::Internal ex;
ex.getMessage() << "Background daemon initialization failed with result "
<< pid << std::endl;
throw ex;
}
else if (pid > 0) {
// The parent exits normally
exit(EXIT_SUCCESS);
}
// Run the daemon in a new session
setsid();
// Redirect the standard file descriptors to /dev/null
if ((freopen("/dev/null", "r", stdin) == NULL) ||
(freopen("/dev/null", "w", stdout) == NULL) ||
(freopen("/dev/null", "w", stderr) == NULL)) {
castor::exception::Internal ex;
ex.getMessage() << "Failed to redirect standard file descriptors to "
<< "/dev/null" << std::endl;
throw ex;
}
}
// Change the user of the daemon process to the Castor superuser if requested
if (m_runAsStagerSuperuser) {
castor::System::switchToCastorSuperuser();
}
// Ignore SIGPIPE (connection lost with client)
// and SIGXFSZ (a file is too big)
signal(SIGPIPE, SIG_IGN);
signal(SIGXFSZ, SIG_IGN);
}
//------------------------------------------------------------------------------
// sendNotification
//------------------------------------------------------------------------------
void castor::server::Daemon::sendNotification(const std::string &host,
const int port, const char tpName, const int nbThreads) throw() {
try {
// Create notification message
castor::server::ThreadNotification notif;
notif.setTpName(tpName);
notif.setNbThreads(nbThreads);
// Create UDP socket and send packet
castor::io::UDPSocket sock(port, host);
sock.sendObject(notif);
sock.close();
} catch (castor::exception::Exception& ignored) {
// This is a best effort service, ignore any failure
} catch(...) {
// This is a best effort service, ignore any failure
}
}
//-----------------------------------------------------------------------------
// logMsg
//-----------------------------------------------------------------------------
void castor::server::Daemon::logMsg(
const int priority,
const std::string &msg,
const int numParams,
const log::Param params[],
const struct timeval &timeStamp) throw() {
m_logger.logMsg(priority, msg, numParams, params, timeStamp);
}
//-----------------------------------------------------------------------------
// logMsg
//-----------------------------------------------------------------------------
void castor::server::Daemon::logMsg(
const int priority,
const std::string &msg,
const int numParams,
const log::Param params[]) throw() {
m_logger.logMsg(priority, msg, numParams, params);
}
//-----------------------------------------------------------------------------
// logMsg
//-----------------------------------------------------------------------------
void castor::server::Daemon::logMsg(
const int priority,
const std::string &msg) throw() {
m_logger.logMsg(priority, msg);
}
/*******************************************************************************
* castor/server/Daemon.hpp
*
* This file is part of the Castor project.
* See http://castor.web.cern.ch/castor
*
* Copyright (C) 2003 CERN
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* @author castor dev team
******************************************************************************/
#ifndef CASTOR_SERVER_DAEMON_HPP
#define CASTOR_SERVER_DAEMON_HPP 1
#include "castor/dlf/Message.hpp"
#include "castor/log/Logger.hpp"
namespace castor {
namespace server {
/**
* This abstract class represents a daemon and contains all of the code common
* to both single and mutli threaded daemons.
*
* The code common to both single and mutli-threaded daemons includes
* daemonization and logging.
*/
class Daemon {
public:
/**
* Constructor
*
* @param logger Object representing the API of the CASTOR logging system.
*/
Daemon(log::Logger &logger);
/**
* Pure virtual destructor that purposely makes this class abstract.
*/
virtual ~Daemon() throw() = 0;
/**
* Returns this server's name as used by the CASTOR logging system.
*/
const std::string &getServerName() const throw();
protected:
/**
* Parses a command line to set the server options.
*
* @param argc The size of the command-line vector.
* @param argv The command-line vector.
*/
virtual void parseCommandLine(int argc, char *argv[]) = 0;
/**
* Sets the runAsStagerSuperuser flag to true.
*
* The default value of the runAsStagerSuperuser flag at construction time is
* false.
*/
void runAsStagerSuperuser() throw();
/**
* Initializes the DLF, both for streaming and regular messages
* Does not create the DLF thread, this is created after daemonization
* @param messages the messages to be passed to dlf_init
*/
void dlfInit(castor::dlf::Message messages[])
throw (castor::exception::Exception);
/**
* Daemonizes the daemon.
*
* Please make sure that the setForeground() and runAsStagerSuperuser()
* methods have been called as appropriate before this method is called.
* This method takes into account whether the dameon should run in foregreound
* or background mode (m_foreground) and whether or not the user of daemon
* should be changed to the stager superuser (m_runAsStagerSuperuser).
*/
void daemonize() throw(castor::exception::Exception);
/**
* Sends a notification message to the given host,port
* to wake up nbThreads threads to handle pending requests.
* @param host the destination host
* @param port the destination port
* @param tpName the name of the thread pool to be signaled
* @param nbThreads the number of threads to be signaled
*/
static void sendNotification(const std::string &host, const int port,
const char tpName, const int nbThreads = 1) throw();
/**
* Writes a message into the CASTOR logging system. Note that no exception
* will ever be thrown in case of failure. Failures will actually be
* silently ignored in order to not impact the processing.
*
* Note that this version of logMsg() allows the caller to specify the
* time stamp of the log message.
*
* @param priority the priority of the message as defined by the syslog
* API.
* @param msg the message.
* @param numParams the number of parameters in the message.
* @param params the parameters of the message.
* @param timeStamp the time stamp of the log message.
*/
void logMsg(
const int priority,
const std::string &msg,
const int numParams,
const log::Param params[],
const struct timeval &timeStamp) throw();
/**