Commit 5b5500a6 authored by Steven Murray's avatar Steven Murray
Browse files

First step of refactoring the drive-catalogue tape-sessions

Please note that the "integration" code of tapserverd is not fully
functional.  One should however be able to run a data transfer
session.
parent 8b17b0cb
...@@ -701,8 +701,7 @@ bool castor::io::readBytesFromCloseable( ...@@ -701,8 +701,7 @@ bool castor::io::readBytesFromCloseable(
const int socketFd, const int socketFd,
const int timeout, const int timeout,
const int nbBytes, const int nbBytes,
char *const buf) char *const buf) {
{
// Throw an exception if socketFd is invalid // Throw an exception if socketFd is invalid
if(socketFd < 0) { if(socketFd < 0) {
......
...@@ -65,10 +65,34 @@ void castor::tape::reactor::ZMQReactor::registerHandler( ...@@ -65,10 +65,34 @@ void castor::tape::reactor::ZMQReactor::registerHandler(
ZMQPollEventHandler *const handler) { ZMQPollEventHandler *const handler) {
zmq_pollitem_t item; zmq_pollitem_t item;
handler->fillPollFd(item); handler->fillPollFd(item);
//TODO, handle double registration
std::ostringstream socketInHex;
socketInHex << std::hex << item.socket;
log::Param params[] = {log::Param("fd", item.fd),
log::Param("socket", socketInHex)};
m_log(LOG_DEBUG, "ZMQReactor registering a new handler", params);
checkDoubleRegistration(item);
m_handlers.push_back(std::make_pair(item,handler)); m_handlers.push_back(std::make_pair(item,handler));
} }
//------------------------------------------------------------------------------
// checkDoubleRegistration
//------------------------------------------------------------------------------
void castor::tape::reactor::ZMQReactor::checkDoubleRegistration(
const zmq_pollitem_t &item) const {
for(HandlerMap::const_iterator it=m_handlers.begin(); it!=m_handlers.end();
++it) {
const std::pair<zmq_pollitem_t, ZMQPollEventHandler*> &maplet = *it;
if(item == maplet.first) {
castor::exception::Exception ex;
ex.getMessage() << "ZMQReactor detected a double registration: fd=" <<
item.fd << " socket=" << std::hex << item.socket;
throw ex;
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// handleEvents // handleEvents
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
......
...@@ -91,6 +91,14 @@ private: ...@@ -91,6 +91,14 @@ private:
typedef std::vector<std::pair<zmq_pollitem_t, ZMQPollEventHandler*> > typedef std::vector<std::pair<zmq_pollitem_t, ZMQPollEventHandler*> >
HandlerMap; HandlerMap;
/**
* Throws a cator::exception::Exception if a handler has already been
* registered for the specified poll item.
*
* @prama item The poll item.
*/
void checkDoubleRegistration(const zmq_pollitem_t &item) const;
/** /**
* Builds the vector of file descriptors to be passed to poll(). * Builds the vector of file descriptors to be passed to poll().
* *
......
...@@ -188,8 +188,8 @@ void castor::tape::tapeserver::daemon::AdminConnectionHandler:: ...@@ -188,8 +188,8 @@ void castor::tape::tapeserver::daemon::AdminConnectionHandler::
const std::string unitName(body.drive); const std::string unitName(body.drive);
DriveCatalogueEntry *drive = m_driveCatalogue.findDrive(unitName); DriveCatalogueEntry &drive = m_driveCatalogue.findDrive(unitName);
const utils::DriveConfig &driveConfig = drive->getConfig(); const utils::DriveConfig &driveConfig = drive.getConfig();
log::Param params[] = { log::Param params[] = {
log::Param("unitName", unitName), log::Param("unitName", unitName),
...@@ -198,18 +198,19 @@ void castor::tape::tapeserver::daemon::AdminConnectionHandler:: ...@@ -198,18 +198,19 @@ void castor::tape::tapeserver::daemon::AdminConnectionHandler::
switch(body.status) { switch(body.status) {
case CONF_UP: case CONF_UP:
m_vdqm.setDriveUp(m_hostName, unitName, driveConfig.dgn); m_vdqm.setDriveUp(m_hostName, unitName, driveConfig.dgn);
drive->configureUp(); drive.configureUp();
m_log(LOG_INFO, "Drive configured up", params); m_log(LOG_INFO, "Drive configured up", params);
break; break;
case CONF_DOWN: case CONF_DOWN:
m_vdqm.setDriveDown(m_hostName, unitName, driveConfig.dgn); m_vdqm.setDriveDown(m_hostName, unitName, driveConfig.dgn);
drive->configureDown(); drive.configureDown();
m_log(LOG_INFO, "Drive configured down", params); m_log(LOG_INFO, "Drive configured down", params);
break; break;
default: default:
{ {
castor::exception::Exception ex; castor::exception::Exception ex;
ex.getMessage() << "Failed to configure drive. Reason: wrong drive status requested: " << body.status; ex.getMessage() << "Failed to configure drive"
": Unexpected drive status requested: status=" << body.status;
throw ex; throw ex;
} }
} }
...@@ -311,8 +312,8 @@ void castor::tape::tapeserver::daemon::AdminConnectionHandler:: ...@@ -311,8 +312,8 @@ void castor::tape::tapeserver::daemon::AdminConnectionHandler::
for(std::list<std::string>::const_iterator itor = unitNames.begin(); for(std::list<std::string>::const_iterator itor = unitNames.begin();
itor!=unitNames.end() and i<CA_MAXNBDRIVES; itor++) { itor!=unitNames.end() and i<CA_MAXNBDRIVES; itor++) {
const std::string &unitName = *itor; const std::string &unitName = *itor;
const DriveCatalogueEntry * drive = m_driveCatalogue.findDrive(unitName); const DriveCatalogueEntry &drive = m_driveCatalogue.findDrive(unitName);
body.drives[i] = drive->getTapeStatDriveEntry(); body.drives[i] = drive.getTapeStatDriveEntry();
i++; i++;
} }
......
...@@ -26,6 +26,7 @@ add_library(castorTapeServerDaemon ...@@ -26,6 +26,7 @@ add_library(castorTapeServerDaemon
ProcessForker.cpp ProcessForker.cpp
ProcessForkerConnectionHandler.cpp ProcessForkerConnectionHandler.cpp
ProcessForkerProxy.cpp ProcessForkerProxy.cpp
ProcessForkerProxyDummy.cpp
ProcessForkerProxySocket.cpp ProcessForkerProxySocket.cpp
ProcessForkerUtils.cpp ProcessForkerUtils.cpp
RecallMemoryManager.cpp RecallMemoryManager.cpp
......
...@@ -27,6 +27,22 @@ ...@@ -27,6 +27,22 @@
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
//-----------------------------------------------------------------------------
// constructor
//-----------------------------------------------------------------------------
castor::tape::tapeserver::daemon::DriveCatalogue::DriveCatalogue(
log::Logger &log,
const DataTransferSession::CastorConf &dataTransferConfig,
ProcessForkerProxy &processForker,
legacymsg::VdqmProxy &vdqm,
const std::string &hostName):
m_log(log),
m_dataTransferConfig(dataTransferConfig),
m_processForker(processForker),
m_vdqm(vdqm),
m_hostName(hostName) {
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// destructor // destructor
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
...@@ -81,7 +97,8 @@ void castor::tape::tapeserver::daemon::DriveCatalogue::enterDriveConfig( ...@@ -81,7 +97,8 @@ void castor::tape::tapeserver::daemon::DriveCatalogue::enterDriveConfig(
// If the drive is not in the catalogue // If the drive is not in the catalogue
if(m_drives.end() == itor) { if(m_drives.end() == itor) {
// Insert it // Insert it
m_drives[driveConfig.unitName] = new DriveCatalogueEntry(driveConfig, m_drives[driveConfig.unitName] = new DriveCatalogueEntry(m_log,
m_processForker, m_vdqm, m_hostName, driveConfig, m_dataTransferConfig,
DriveCatalogueEntry::DRIVE_STATE_DOWN); DriveCatalogueEntry::DRIVE_STATE_DOWN);
// Else the drive is already in the catalogue // Else the drive is already in the catalogue
} else { } else {
...@@ -142,122 +159,77 @@ std::list<std::string> ...@@ -142,122 +159,77 @@ std::list<std::string>
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// getUnitNamesWaitingForTransferFork // findDrive
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
std::list<std::string> castor::tape::tapeserver::daemon::DriveCatalogue::getUnitNamesWaitingForTransferFork() const { const castor::tape::tapeserver::daemon::DriveCatalogueEntry
std::list<std::string> unitNames; &castor::tape::tapeserver::daemon::DriveCatalogue::findDrive(
const std::string &unitName) const {
for(DriveMap::const_iterator itor = m_drives.begin(); std::ostringstream task;
itor != m_drives.end(); itor++) { task << "find tape drive in catalogue by unit name: unitName=" << unitName;
const std::string &unitName = itor->first;
const DriveCatalogueEntry &drive = *(itor->second);
const utils::DriveConfig &driveConfig = drive.getConfig();
// Sanity check
if(unitName != driveConfig.unitName) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Failed to get unit names of drives waiting for forking a transfer session" <<
": unit name mismatch: unitName=" << unitName <<
" driveConfig.unitName=" << driveConfig.unitName;
throw ex;
}
if(DriveCatalogueEntry::SESSION_TYPE_DATATRANSFER==drive.getSessionType() DriveMap::const_iterator itor = m_drives.find(unitName);
&& DriveCatalogueSession::SESSION_STATE_WAITFORK == drive.getSessionState()) { if(m_drives.end() == itor) {
unitNames.push_back(itor->first); castor::exception::Exception ex;
} ex.getMessage() << "Failed to " << task.str() << ": Entry does not exist";
throw ex;
} }
return unitNames; if(NULL == itor->second) {
} // Should never get here
castor::exception::Exception ex;
//----------------------------------------------------------------------------- ex.getMessage() << "Failed to " << task <<
// getUnitNamesWaitingForLabelFork ": Pointer to drive entry is unexpectedly NULL";
//----------------------------------------------------------------------------- throw ex;
std::list<std::string> castor::tape::tapeserver::daemon::DriveCatalogue::
getUnitNamesWaitingForLabelFork() const {
std::list<std::string> unitNames;
for(DriveMap::const_iterator itor = m_drives.begin();
itor != m_drives.end(); itor++) {
const std::string &unitName = itor->first;
const DriveCatalogueEntry &drive = *(itor->second);
const utils::DriveConfig &driveConfig = drive.getConfig();
// Sanity check
if(unitName != driveConfig.unitName) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Failed to get unit names of drives waiting for forking a label session" <<
": unit name mismatch: unitName=" << unitName <<
" driveConfig.unitName=" << driveConfig.unitName;
throw ex;
}
if(DriveCatalogueEntry::SESSION_TYPE_LABEL==drive.getSessionType() &&
DriveCatalogueSession::SESSION_STATE_WAITFORK == drive.getSessionState()) {
unitNames.push_back(itor->first);
}
} }
return unitNames; const DriveCatalogueEntry &drive = *(itor->second);
} const utils::DriveConfig &driveConfig = drive.getConfig();
//-----------------------------------------------------------------------------
// getUnitNamesWaitingForCleanerFork
//-----------------------------------------------------------------------------
std::list<std::string> castor::tape::tapeserver::daemon::DriveCatalogue::
getUnitNamesWaitingForCleanerFork() const {
std::list<std::string> unitNames;
for(DriveMap::const_iterator itor = m_drives.begin();
itor != m_drives.end(); itor++) {
const std::string &unitName = itor->first;
const DriveCatalogueEntry &drive = *(itor->second);
const utils::DriveConfig &driveConfig = drive.getConfig();
// Sanity check // Sanity check
if(unitName != driveConfig.unitName) { if(unitName != driveConfig.unitName) {
// Should never get here // Should never get here
castor::exception::Exception ex; castor::exception::Exception ex;
ex.getMessage() << "Failed to get unit names of drives waiting for forking a cleaner session" << ex.getMessage() << "Failed to " << task <<
": unit name mismatch: unitName=" << unitName << ": Found inconsistent entry in tape-drive catalogue"
" driveConfig.unitName=" << driveConfig.unitName; ": Unit name mismatch: actual=" << driveConfig.unitName;
throw ex; throw ex;
}
if(DriveCatalogueEntry::SESSION_TYPE_CLEANER==drive.getSessionType() &&
DriveCatalogueSession::SESSION_STATE_WAITFORK == drive.getSessionState()) {
unitNames.push_back(itor->first);
}
} }
return unitNames; return drive;
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// findConstDrive // findDrive
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
const castor::tape::tapeserver::daemon::DriveCatalogueEntry castor::tape::tapeserver::daemon::DriveCatalogueEntry
*castor::tape::tapeserver::daemon::DriveCatalogue::findDrive( &castor::tape::tapeserver::daemon::DriveCatalogue::findDrive(
const std::string &unitName) const { const std::string &unitName) {
std::ostringstream task;
task << "find tape drive in catalogue by unit name: unitName=" << unitName;
DriveMap::const_iterator itor = m_drives.find(unitName); DriveMap::iterator itor = m_drives.find(unitName);
if(m_drives.end() == itor) { if(m_drives.end() == itor) {
castor::exception::Exception ex; castor::exception::Exception ex;
ex.getMessage() << "Failed to find tape-drive in catalogue: " << ex.getMessage() << "Failed to " << task.str() << ": Entry does not exist";
": No entry for tape-drive " << unitName; throw ex;
}
if(NULL == itor->second) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Failed to " << task <<
": Pointer to drive entry is unexpectedly NULL";
throw ex; throw ex;
} }
const DriveCatalogueEntry * const drive = itor->second;
const utils::DriveConfig &driveConfig = drive->getConfig(); DriveCatalogueEntry &drive = *(itor->second);
const utils::DriveConfig &driveConfig = drive.getConfig();
// Sanity check // Sanity check
if(unitName != driveConfig.unitName) { if(unitName != driveConfig.unitName) {
// This should never happen // This should never happen
castor::exception::Exception ex; castor::exception::Exception ex;
ex.getMessage() << "Failed to find tape-drive in catalogue: " << ex.getMessage() << "Failed to " << task <<
": Found inconsistent entry in tape-drive catalogue" ": Found inconsistent entry in tape-drive catalogue"
": Unit name mismatch: expected=" << unitName << ": Unit name mismatch: expected=" << unitName <<
" actual=" << driveConfig.unitName; " actual=" << driveConfig.unitName;
...@@ -268,16 +240,29 @@ const castor::tape::tapeserver::daemon::DriveCatalogueEntry ...@@ -268,16 +240,29 @@ const castor::tape::tapeserver::daemon::DriveCatalogueEntry
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// findConstDrive // findDrive
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
const castor::tape::tapeserver::daemon::DriveCatalogueEntry const castor::tape::tapeserver::daemon::DriveCatalogueEntry
*castor::tape::tapeserver::daemon::DriveCatalogue::findDrive( &castor::tape::tapeserver::daemon::DriveCatalogue::findDrive(
const pid_t sessionPid) const { const pid_t sessionPid) const {
std::ostringstream task;
task << "find tape drive in catalogue by session pid: sessionPid=" <<
sessionPid;
for(DriveMap::const_iterator itor = m_drives.begin(); itor != m_drives.end();
itor++) {
if(NULL == itor->second) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Failed to " << task.str() <<
": Encountered NULL drive-entry pointer: unitName=" << itor->first;
throw ex;
}
for(DriveMap::const_iterator i = m_drives.begin(); i!=m_drives.end(); i++) { const DriveCatalogueEntry &drive = *(itor->second);
const DriveCatalogueEntry * const drive = i->second;
try { try {
if(sessionPid == drive->getSessionPid()) { if(sessionPid == drive.getSessionPid()) {
return drive; return drive;
} }
} catch(...) { } catch(...) {
...@@ -286,8 +271,7 @@ const castor::tape::tapeserver::daemon::DriveCatalogueEntry ...@@ -286,8 +271,7 @@ const castor::tape::tapeserver::daemon::DriveCatalogueEntry
} }
castor::exception::Exception ex; castor::exception::Exception ex;
ex.getMessage() << "Failed to find tape-drive in catalogue: " << ex.getMessage() << "Failed to " << task.str() << ": Entry does not exist";
": No drive associated with session process-ID " << sessionPid;
throw ex; throw ex;
} }
...@@ -295,44 +279,26 @@ const castor::tape::tapeserver::daemon::DriveCatalogueEntry ...@@ -295,44 +279,26 @@ const castor::tape::tapeserver::daemon::DriveCatalogueEntry
// findDrive // findDrive
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
castor::tape::tapeserver::daemon::DriveCatalogueEntry castor::tape::tapeserver::daemon::DriveCatalogueEntry
*castor::tape::tapeserver::daemon::DriveCatalogue::findDrive( &castor::tape::tapeserver::daemon::DriveCatalogue::findDrive(
const std::string &unitName) { const pid_t sessionPid) {
std::ostringstream task;
DriveMap::iterator itor = m_drives.find(unitName); task << "find tape drive in catalogue by session pid: sessionPid=" <<
if(m_drives.end() == itor) { sessionPid;
castor::exception::Exception ex;
ex.getMessage() << "Failed to find tape-drive in catalogue: " <<
"No entry for tape-drive " << unitName;
throw ex;
}
DriveCatalogueEntry * drive = itor->second;
const utils::DriveConfig &driveConfig = drive->getConfig();
// Sanity check
if(unitName != driveConfig.unitName) {
// This should never happen
castor::exception::Exception ex;
ex.getMessage() << "Failed to find tape-drive in catalogue" <<
": Found inconsistent entry in tape-drive catalogue"
": Unit name mismatch: expected=" << unitName <<
" actual=" << driveConfig.unitName;
throw ex;
}
return drive; for(DriveMap::iterator itor = m_drives.begin(); itor != m_drives.end();
} itor++) {
//----------------------------------------------------------------------------- if(NULL == itor->second) {
// findDrive // Should never get here
//----------------------------------------------------------------------------- castor::exception::Exception ex;
castor::tape::tapeserver::daemon::DriveCatalogueEntry ex.getMessage() << "Failed to " << task.str() <<
*castor::tape::tapeserver::daemon::DriveCatalogue::findDrive( ": Encountered NULL drive-entry pointer: unitName=" << itor->first;
const pid_t sessionPid) { throw ex;
}
for(DriveMap::iterator i = m_drives.begin(); i!=m_drives.end(); i++) { DriveCatalogueEntry &drive = *(itor->second);
DriveCatalogueEntry * drive = i->second;
try { try {
if(sessionPid == drive->getSessionPid()) { if(sessionPid == drive.getSessionPid()) {
return drive; return drive;
} }
} catch(...) { } catch(...) {
...@@ -341,7 +307,6 @@ castor::tape::tapeserver::daemon::DriveCatalogueEntry ...@@ -341,7 +307,6 @@ castor::tape::tapeserver::daemon::DriveCatalogueEntry
} }
castor::exception::Exception ex; castor::exception::Exception ex;
ex.getMessage() << "Failed to find tape-drive in catalogue: " << ex.getMessage() << "Failed to " << task.str() << ": Entry does not exist";
": No drive associated with session process-ID " << sessionPid;
throw ex; throw ex;
} }
...@@ -26,9 +26,12 @@ ...@@ -26,9 +26,12 @@
#include "castor/exception/Exception.hpp" #include "castor/exception/Exception.hpp"
#include "castor/legacymsg/RtcpJobRqstMsgBody.hpp" #include "castor/legacymsg/RtcpJobRqstMsgBody.hpp"
#include "castor/legacymsg/TapeLabelRqstMsgBody.hpp" #include "castor/legacymsg/TapeLabelRqstMsgBody.hpp"
#include "castor/legacymsg/VdqmProxy.hpp"
#include "castor/log/Logger.hpp"
#include "castor/tape/utils/DriveConfigMap.hpp" #include "castor/tape/utils/DriveConfigMap.hpp"
#include "castor/tape/tapeserver/client/ClientProxy.hpp" #include "castor/tape/tapeserver/client/ClientProxy.hpp"
#include "castor/tape/tapeserver/daemon/DriveCatalogueEntry.hpp" #include "castor/tape/tapeserver/daemon/DriveCatalogueEntry.hpp"
#include "castor/tape/tapeserver/daemon/ProcessForkerProxy.hpp"
#include <map> #include <map>
#include <string> #include <string>
...@@ -46,6 +49,23 @@ namespace daemon { ...@@ -46,6 +49,23 @@ namespace daemon {
class DriveCatalogue { class DriveCatalogue {
public: public:
/**
* Constructor.
*
* @param log Object representing the API of the CASTOR logging system.
* @param dataTransferConfig The configuration of a data-transfer session.
* @param processForker Proxy object representing the ProcessForker.
* @param vdqm Proxy object representing the vdqmd daemon.
* @param hostName The name of the host on which the daemon is running. This
* name is needed to fill in messages to be sent to the vdqmd daemon.
*/
DriveCatalogue(
log::Logger &log,
const DataTransferSession::CastorConf &dataTransferConfig,
ProcessForkerProxy &processForker,
legacymsg::VdqmProxy &m_vdqm,
const std::string &hostName);
/** /**
* Destructor. * Destructor.
* *
...@@ -69,7 +89,7 @@ public: ...@@ -69,7 +89,7 @@ public:
* *
* @param unitName The unit name of the tape drive. * @param unitName The unit name of the tape drive.
*/ */
const DriveCatalogueEntry *findDrive(const std::string &unitName) const DriveCatalogueEntry &findDrive(const std::string &unitName)
const; const;
/** /**
...@@ -80,7 +100,7 @@ public: ...@@ -80,7 +100,7 @@ public:
* *
* @param sessionPid The process ID of the session. * @param sessionPid The process ID of the session.
*/ */