Commit 78883f16 authored by Eric Cano's avatar Eric Cano
Browse files

Moved and renamed TapeserverProxy class as TapedProxy.

Added new functions allowing representation of the full life cycle of the tape sessions.
Phased out the first old function (gotArchiveJobFromCTA).
Removed some dead code.
parent 3385361a
......@@ -2,12 +2,12 @@ cmake_minimum_required (VERSION 2.6)
# Old CASTOR's tapeserverd daemon code
add_subdirectory (castor)
add_subdirectory (test)
# CTA's cta-taped code
add_subdirectory (daemon)
# The tape session's threads are in a separate directory
add_subdirectory (session)
# The tape session's threads are in a separate directory (session, but compiled
# from the previous one to create a single library).
add_executable (cta-taped cta-taped.cpp)
target_link_libraries(cta-taped
......
......@@ -64,10 +64,6 @@ const char *castor::messages::msgTypeToString(const MsgType msgType) throw() {
return "TapeMountedForMigration";
case MSG_TYPE_TAPEMOUNTEDFORRECALL:
return "TapeMountedForRecall";
case MSG_TYPE_TAPEUNMOUNTSTARTED:
return "TapeUnmounStarted";
case MSG_TYPE_TAPEUNMOUNTED:
return "TapeUnmounted";
case MSG_TYPE_LABELERROR:
return "LabelError";
case MSG_TYPE_ACSMOUNTTAPEREADONLY:
......
......@@ -50,8 +50,6 @@ enum MsgType {
/* 15 */ MSG_TYPE_STOPPROCESSFORKER,
/* 16 */ MSG_TYPE_TAPEMOUNTEDFORMIGRATION,
/* 17 */ MSG_TYPE_TAPEMOUNTEDFORRECALL,
/* 18 */ MSG_TYPE_TAPEUNMOUNTSTARTED,
/* 19 */ MSG_TYPE_TAPEUNMOUNTED,
/* 20 */ MSG_TYPE_LABELERROR,
/* 21 */ MSG_TYPE_ACSMOUNTTAPEREADONLY,
/* 22 */ MSG_TYPE_ACSMOUNTTAPEREADWRITE,
......
......@@ -22,48 +22,21 @@
#include "castor/messages/TapeserverProxyDummy.hpp"
//------------------------------------------------------------------------------
// gotArchiveJobFromCTA
// reportState
//------------------------------------------------------------------------------
uint32_t castor::messages::TapeserverProxyDummy::gotArchiveJobFromCTA(
const std::string &vid, const std::string &unitName, const uint32_t nbFiles) {
return 0;
}
//------------------------------------------------------------------------------
// gotRetrieveJobFromCTA
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::gotRetrieveJobFromCTA(
const std::string &vid, const std::string &unitName) {
}
//------------------------------------------------------------------------------
// gotRecallJobFromTapeGateway
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::gotRecallJobFromTapeGateway(
const std::string &vid, const std::string &unitName) {
}
void castor::messages::TapeserverProxyDummy::reportState(const cta::tape::session::SessionState state,
const cta::tape::session::SessionType type, const std::string& vid) {}
//------------------------------------------------------------------------------
// gotRecallJobFromReadTp
// reportHeartbeat
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyDummy::gotRecallJobFromReadTp(
const std::string &vid, const std::string &unitName) {
}
void castor::messages::TapeserverProxyDummy::reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) {}
//------------------------------------------------------------------------------
// gotMigrationJobFromTapeGateway
//------------------------------------------------------------------------------
uint32_t castor::messages::TapeserverProxyDummy::gotMigrationJobFromTapeGateway(
const std::string &vid, const std::string &unitName) {
return 0;
}
//------------------------------------------------------------------------------
// gotMigrationJobFromWriteTp
// gotRetrieveJobFromCTA
//------------------------------------------------------------------------------
uint32_t castor::messages::TapeserverProxyDummy::gotMigrationJobFromWriteTp(
void castor::messages::TapeserverProxyDummy::gotRetrieveJobFromCTA(
const std::string &vid, const std::string &unitName) {
return 0;
}
//------------------------------------------------------------------------------
......
......@@ -27,146 +27,43 @@ namespace castor {
namespace messages {
/**
* A dummy tapeserverd-proxy.
* A dummy taped-proxy.
*/
class TapeserverProxyDummy: public cta::daemon::TapedProxy {
class TapeserverProxyDummy: public cta::tape::daemon::TapedProxy {
public:
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
* an archive job from CTA
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
* @return The number of files currently stored on the tape
*/
virtual uint32_t gotArchiveJobFromCTA(const std::string &vid,
const std::string &unitName, const uint32_t nbFiles);
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
* a retrieve job from CTA
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
virtual void gotRetrieveJobFromCTA(const std::string &vid,
const std::string &unitName);
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
* a recall job from the tapegatewayd daemon.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void gotRecallJobFromTapeGateway(const std::string &vid,
const std::string &unitName);
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
* a recall job from the readtp command-line tool.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void gotRecallJobFromReadTp(const std::string &vid,
const std::string &unitName);
void reportState(const cta::tape::session::SessionState state,
const cta::tape::session::SessionType type,
const std::string & vid) override;
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
* a migration job from the tapegatewayd daemon.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
* @return The number of files currently stored on the tape as given by the
* vmgrd daemon.
*/
uint32_t gotMigrationJobFromTapeGateway(const std::string &vid,
const std::string &unitName);
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
* a migration job from the writetp command-line tool.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
* @return The number of files currently stored on the tape as given by the
* vmgrd daemon.
*/
uint32_t gotMigrationJobFromWriteTp(const std::string &vid,
const std::string &unitName);
void reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) override;
void gotRetrieveJobFromCTA(const std::string &vid,
const std::string &unitName) override;
/**
* Notifies the tapeserverd daemon that the specified tape has been mounted.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void tapeMountedForRecall(const std::string &vid,
const std::string &unitName);
const std::string &unitName) override;
/**
* Notifies the tapeserverd daemon that the specified tape has been mounted.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void tapeMountedForMigration(const std::string &vid,
const std::string &unitName);
const std::string &unitName) override;
/**
* Notifies the tapeserverd daemon that the specified tape is unmounting.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void tapeUnmountStarted(const std::string &vid,
const std::string &unitName);
const std::string &unitName) override;
/**
* Notifies the tapeserverd daemon that the specified tape has been unmounted.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void tapeUnmounted(const std::string &vid,
const std::string &unitName);
const std::string &unitName) override;
/**
* Notifies the tapeserverd daemon that the data-transfer session is still
* alive and gives an indication of how much data has been moved.
*
* @param unitName The unit name of the tape drive.
* @param nbBlocksMoved Delta value giving the number of blocks moved
* since the last heartbeat message.
*/
void notifyHeartbeat(const std::string &unitName,
const uint64_t nbBlocksMoved);
const uint64_t nbBlocksMoved) override;
/**
* Sends a new set of parameters, to be logged by the mother process when the
* transfer session is over.
* @param params: a vector of log parameters
*/
virtual void addLogParams(const std::string &unitName,
const std::list<castor::log::Param> & params);
void addLogParams(const std::string &unitName,
const std::list<castor::log::Param> & params) override;
/**
* Sends a list of parameters to remove from the end of session logging.
*/
virtual void deleteLogParams(const std::string &unitName,
const std::list<std::string> & paramNames);
void deleteLogParams(const std::string &unitName,
const std::list<std::string> & paramNames) override;
/**
* Notifies the tapeserverd daemon that a label session has encountered the
* specified error.
*
* @param unitName The unit name of the tape drive.
* @param message The error message.
*/
void labelError(const std::string &unitName,
const std::string &message);
const std::string &message) override;
}; // class TapeserverProxyDummy
......
......@@ -45,13 +45,31 @@
// constructor
//------------------------------------------------------------------------------
castor::messages::TapeserverProxyZmq::TapeserverProxyZmq(log::Logger &log,
const unsigned short serverPort, void *const zmqContext) throw():
const unsigned short serverPort, void *const zmqContext,
const std::string &driveName) throw():
m_log(log),
m_driveName(driveName),
m_serverPort(serverPort),
m_serverSocket(zmqContext, ZMQ_REQ) {
connectZmqSocketToLocalhost(m_serverSocket, serverPort);
}
//------------------------------------------------------------------------------
// reportState
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::reportState(const cta::tape::session::SessionState state,
const cta::tape::session::SessionType type, const std::string& vid) {
if ((type == cta::tape::session::SessionType::Archive)
&& (state == cta::tape::session::SessionState::Mounting)) {
gotArchiveJobFromCTA(vid, m_driveName, 0);
}
}
//------------------------------------------------------------------------------
// reportHeartbeat
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) {}
//------------------------------------------------------------------------------
// gotArchiveJobFromCTA
//------------------------------------------------------------------------------
......@@ -275,120 +293,6 @@ castor::messages::Frame castor::messages::TapeserverProxyZmq::
}
}
//------------------------------------------------------------------------------
// tapeUnmountStarted
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::tapeUnmountStarted(
const std::string &vid, const std::string &unitName) {
MutexLocker lock(&m_mutex);
try {
const Frame rqst = createTapeUnmountStartedFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Received an unexpected return value"
": expected=0 actual=" << reply.value();
throw ex;
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to notify tapeserver of start of tape unmount: " <<
"vid=" << vid << " unitName=" << unitName << ": " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// createTapeUnmountStartedFrame
//------------------------------------------------------------------------------
castor::messages::Frame castor::messages::TapeserverProxyZmq::
createTapeUnmountStartedFrame(const std::string &vid,
const std::string &unitName) {
try {
Frame frame;
frame.header = messages::protoTapePreFillHeader();
frame.header.set_msgtype(messages::MSG_TYPE_TAPEUNMOUNTSTARTED);
frame.header.set_bodysignature("PIPO");
TapeUnmountStarted body;
body.set_vid(vid);
body.set_unitname(unitName);
frame.serializeProtocolBufferIntoBody(body);
return frame;
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to create TapeUnmountStarted frame: " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// tapeUnmounted
//------------------------------------------------------------------------------
void castor::messages::TapeserverProxyZmq::tapeUnmounted(
const std::string &vid, const std::string &unitName) {
MutexLocker lock(&m_mutex);
try {
const Frame rqst = createTapeUnmountedFrame(vid, unitName);
sendFrame(m_serverSocket, rqst);
ReturnValue reply;
recvTapeReplyOrEx(m_serverSocket, reply);
if(0 != reply.value()) {
// Should never get here
castor::exception::Exception ex;
ex.getMessage() << "Received an unexpected return value"
": expected=0 actual=" << reply.value();
throw ex;
}
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() <<
"Failed to notify tapeserver that tape is unmounted: " <<
"vid=" << vid << " unitName=" << unitName << ": " <<
ne.getMessage().str();
throw ex;
}
}
//------------------------------------------------------------------------------
// createTapeUnmountedFrame
//------------------------------------------------------------------------------
castor::messages::Frame castor::messages::TapeserverProxyZmq::
createTapeUnmountedFrame(const std::string &vid,
const std::string &unitName) {
try {
Frame frame;
frame.header = messages::protoTapePreFillHeader();
frame.header.set_msgtype(messages::MSG_TYPE_TAPEUNMOUNTED);
frame.header.set_bodysignature("PIPO");
TapeUnmounted body;
body.set_vid(vid);
body.set_unitname(unitName);
frame.serializeProtocolBufferIntoBody(body);
return frame;
} catch(castor::exception::Exception &ne) {
castor::exception::Exception ex;
ex.getMessage() << "Failed to create TapeUnmounted frame: " <<
ne.getMessage().str();
throw ex;
}
}
//-----------------------------------------------------------------------------
// notifyHeartbeat
//-----------------------------------------------------------------------------
......
......@@ -34,9 +34,8 @@ namespace messages {
* A concrete implementation of the interface to the internal network
* communications of the tapeserverd daemon.
*/
class TapeserverProxyZmq: public cta::daemon::TapedProxy {
class TapeserverProxyZmq: public cta::tape::daemon::TapedProxy {
public:
/**
* Constructor.
*
......@@ -46,98 +45,42 @@ public:
* @param zmqContext The ZMQ context.
*/
TapeserverProxyZmq(log::Logger &log, const unsigned short serverPort,
void *const zmqContext) throw();
void *const zmqContext, const std::string & driveName) throw();
void reportState(const cta::tape::session::SessionState state,
const cta::tape::session::SessionType type, const std::string& vid) override;
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
* an archive job from CTA
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
* @return The number of files currently stored on the tape
*/
virtual uint32_t gotArchiveJobFromCTA(const std::string &vid,
void reportHeartbeat(uint64_t totalTapeBytesMoved, uint64_t totalDiskBytesMoved) override;
private:
uint32_t gotArchiveJobFromCTA(const std::string &vid,
const std::string &unitName, const uint32_t nbFiles);
/**
* Notifies the tapeserverd daemon that the mount-session child-process got
* a retrieve job from CTA
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
virtual void gotRetrieveJobFromCTA(const std::string &vid,
const std::string &unitName);
public:
void gotRetrieveJobFromCTA(const std::string &vid,
const std::string &unitName) override;
/**
* Notifies the tapeserverd daemon that the specified tape has been mounted.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void tapeMountedForRecall(const std::string &vid,
const std::string &unitName);
const std::string &unitName) override;
/**
* Notifies the tapeserverd daemon that the specified tape has been mounted.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void tapeMountedForMigration(const std::string &vid,
const std::string &unitName);
const std::string &unitName) override;
/**
* Notifies the tapeserverd daemon that the specified tape is unmounting.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void tapeUnmountStarted(const std::string &vid,
const std::string &unitName);
const std::string &unitName) override {}
/**
* Notifies the tapeserverd daemon that the specified tape has been unmounted.
*
* @param vid The tape to be mounted for recall.
* @param unitName The unit name of the tape drive.
*/
void tapeUnmounted(const std::string &vid,
const std::string &unitName);
const std::string &unitName) override {}
/**
* Notifies the tapeserverd daemon that the data-transfer session is still
* alive and gives an indication of how much data has been moved.
*
* @param unitName The unit name of the tape drive.
* @param nbBlocksMoved Delta value giving the number of blocks moved
* since the last heartbeat message.
*/
void notifyHeartbeat(const std::string &unitName,
const uint64_t nbBlocksMoved);
const uint64_t nbBlocksMoved) override;
/**
* Sends a new set of parameters, to be logged by the mother process when the
* transfer session is over.
* @param params: a vector of log parameters
*/
virtual void addLogParams(const std::string &unitName,
const std::list<castor::log::Param> & params);
const std::list<castor::log::Param> & params) override;
/**
* Sends a list of parameters to remove from the end of session logging.
*/
virtual void deleteLogParams(const std::string &unitName,
const std::list<std::string> & paramNames);
const std::list<std::string> & paramNames) override;
/**
* Notifies the tapeserverd daemon that a label session has encountered the
* specified error.
*
* @param unitName The unit name of the tape drive.
* @param message The error message.
*/
void labelError(const std::string &unitName, const std::string &message);
void labelError(const std::string &unitName, const std::string &message) override;
private:
......@@ -151,6 +94,11 @@ private:
* The object representing the API of the CASTOR logging system.
*/
log::Logger &m_log;
/**
* The name of the drive managed by this proxy.
*/
std::string m_driveName;
/**
* The name of the host on which the vdqmd daemon is running.
......@@ -207,26 +155,6 @@ private:
Frame createTapeMountedForMigrationFrame(const std::string &vid,
const std::string &unitName);
/**
* Creates a frame containing a TapeUnmountStarted message.
*
* @param vid The volume identifier of the tape.
* @param unitName The unit name of the tape drive.
* @return The frame.
*/
Frame createTapeUnmountStartedFrame(const std::string &vid,
const std::string &unitName);
/**
* Creates a frame containing a TapeUnmounted message.
*
* @param vid The volume identifier of the tape.
* @param unitName The unit name of the tape drive.
* @return The frame.
*/
Frame createTapeUnmountedFrame(const std::string &vid,
const std::string &unitName);
/**
* Creates a frame containing a Heartbeat message.
*
......
......@@ -52,7 +52,7 @@ castor::tape::tapeserver::daemon::DataTransferSession::DataTransferSession(
System::virtualWrapper & sysWrapper,
const DriveConfig & driveConfig,
castor::mediachanger::MediaChangerFacade & mc,
cta::daemon::TapedProxy & initialProcess,
cta::tape::daemon::TapedProxy & initialProcess,
castor::server::ProcessCap & capUtils,
const DataTransferConfig & castorConf,
cta::Scheduler & scheduler):
......@@ -268,15 +268,12 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
twst.setlastFseq(firstFseqFromClient-1);
//we retrieved the detail from the client in execute, so at this point
//we can report. We get in exchange the number of files on the tape.