Commit 9b9847c6 authored by Steven Murray's avatar Steven Murray
Browse files

bug #82673: RFE: Tapebridged and rtcpd should support buffered tape-marks over multiple files

This commit modifies the tapebridged/rtcpd protocol that is internal to a
tape-server.  The new version of the protocol is now capable of supporting
buffered tape-marks, though the version of rtcpd in this commit still only
supports at best one synchronised tape-mark per file, i.e. it does not support
buffered-tape marks over more than one file.

The change with respect to the previous version of to the tapebridged/rtcpd
protocol is simple.  When the rtcpd daemon knows that its client is is the
tapebridged daemon it now sends an explicit TAPEBRIDGE_FLUSHEDTOTAPE message to
the tapebridged daemon when it can guarantee one or more files have been
flushed to tape.  The tapebridge daemon will now only send
FileMigratedNotification messages to the tapegatewayd daemon for the files that
have been explicitly flushed to tape.  The PendingMigrationsStore is used to
hold back FileMigratedNotification messages until the appropriate
TAPEBRIDGE_FLUSHEDTOTAPE message has been received.  The rtcpd daemon in this
commit always does at least one synchronised tape-mark per file and sends
always sends one TAPEBRIDGE_FLUSHEDTOTAPE per file when its client is the
tapegatewayd daemon.
parent c96bfc06
......@@ -31,6 +31,7 @@
#include "castor/tape/tapebridge/ConfigParamAndSource.hpp"
#include "castor/tape/tapebridge/Constants.hpp"
#include "castor/tape/tapebridge/Counter.hpp"
#include "castor/tape/tapebridge/PendingMigrationsStore.hpp"
#include "castor/tape/legacymsg/CommonMarshal.hpp"
#include "castor/tape/legacymsg/RtcpMarshal.hpp"
#include "castor/tape/tapegateway/Volume.hpp"
......@@ -39,6 +40,7 @@
#include "h/Castor_limits.h"
#include "h/Cuuid.h"
#include <list>
#include <map>
#include <stdint.h>
......@@ -189,6 +191,13 @@ private:
*/
utils::IndexedContainer<uint64_t> m_pendingTransferIds;
/**
* The store of pending file-migrations. A pending file-migration is one
* that has either just been sent to the rtcpd daemon or has been both sent
* and written to tape but not yet flushed to tape.
*/
PendingMigrationsStore m_pendingMigrationsStore;
/**
* In-line helper function that returns a 64-bit rtcpd message body handler
* key to be used in the m_rtcpdHandler map.
......@@ -265,7 +274,8 @@ private:
ClientCallbackMap m_clientHandlers;
/**
* Processes the rtcpd and client sockets.
* Processes the rtcpd and client sockets in a loop until the end of the
* rtcpd session has been reached.
*/
void processSocks() throw(castor::exception::Exception);
......@@ -371,16 +381,67 @@ private:
* This function implements the common logic of the rtcpFileReqRtcpdCallback
* and rtcpFileErrReqRtcpdCallback functions.
*
* @param header The header of the request.
* @param body The body of the request.
* @param rtcpdSock The file descriptor of the socket from which both
* the header and the body of the message have
* already been read from.
* @param header The header of the request.
* @param body The body of the request.
* @param rtcpdSock The file descriptor of the socket from which both the
* header and the body of the message have already been read
* from.
*/
void processRtcpFileErrReq(const legacymsg::MessageHeader &header,
legacymsg::RtcpFileRqstErrMsgBody &body, const int rtcpdSock)
throw(castor::exception::Exception);
/**
* Processes the specified RTCP file request in the context of dumping a
* tape.
*
* @param header The header of the request.
* @param rtcpdSock The file descriptor of the socket from which both the
* header and the body of the message have already been read
* from.
*/
void processRtcpFileErrReqDump(const legacymsg::MessageHeader &header,
const int rtcpdSock) throw(castor::exception::Exception);
/**
* Processes the specified RTCP file request for more work.
*
* @param header The header of the request.
* @param body The body of the request.
* @param rtcpdSock The file descriptor of the socket from which both the
* header and the body of the message have already been read
* from.
*/
void processRtcpRequestMoreWork(const legacymsg::MessageHeader &header,
legacymsg::RtcpFileRqstErrMsgBody &body, const int rtcpdSock)
throw(castor::exception::Exception);
/**
* Processes the specified RTCP file positioned request.
*
* @param header The header of the request.
* @param body The body of the request.
* @param rtcpdSock The file descriptor of the socket from which both the
* header and the body of the message have already been read
* from.
*/
void processRtcpFilePositionedRequest(const legacymsg::MessageHeader &header,
legacymsg::RtcpFileRqstErrMsgBody &body, const int rtcpdSock)
throw(castor::exception::Exception);
/**
* Processes the specified RTCP file transfer finished request.
*
* @param header The header of the request.
* @param body The body of the request.
* @param rtcpdSock The file descriptor of the socket from which both the
* header and the body of the message have already been read
* from.
*/
void processRtcpFileFinishedRequest(const legacymsg::MessageHeader &header,
legacymsg::RtcpFileRqstErrMsgBody &body, const int rtcpdSock)
throw(castor::exception::Exception);
/**
* RTCP_TAPEREQ rtcpd message-body handler.
*
......@@ -440,6 +501,17 @@ private:
const int socketFd, bool &receivedENDOF_REQ)
throw(castor::exception::Exception);
/**
* Sends the specified file-migrated notification to the client (the
* tapegatewayd daemon or the writetp command-line tool).
*
* This method sets the aggregatorTransactionId of each notification message
* ccordingly before sending the message.
*/
void sendFlushedMigrationsToClient(
std::list<tapegateway::FileMigratedNotification> &fileMigratedNotifications)
throw (castor::exception::Exception);
/**
* GIVE_OUTP rtcpd message-body handler.
*
......
......@@ -97,12 +97,19 @@ castor::tape::tapegateway::Volume
if(reply == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to get volume"
": Failed to down cast reply object to tapegateway::Volume");
}
checkTransactionIds("Volume",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
try {
checkTransactionIds("Volume",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to get volume"
": " << ex.getMessage().str());
}
LogHelper::logMsg(cuuid, DLF_LVL_SYSTEM,
TAPEBRIDGE_GOT_VOLUME_FROM_CLIENT, *reply, connectDuration,
......@@ -121,6 +128,7 @@ castor::tape::tapegateway::Volume
if(reply == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to get volume"
": Failed to down cast reply object to tapegateway::NoMoreFiles");
}
......@@ -133,9 +141,15 @@ castor::tape::tapegateway::Volume
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
TAPEBRIDGE_GOT_NO_MORE_FILES_FROM_CLIENT, params);
checkTransactionIds("NoMoreFiles",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
try {
checkTransactionIds("NoMoreFiles",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to get volume"
": " << ex.getMessage().str());
}
}
return NULL;
......@@ -147,6 +161,7 @@ castor::tape::tapegateway::Volume
default:
{
TAPE_THROW_CODE(EBADMSG,
": Failed to get volume"
": Unknown reply type "
": Reply type = " << obj->type());
}
......@@ -178,13 +193,9 @@ int castor::tape::tapebridge::ClientTxRx::sendFileToMigrateRequest(
return sendMessage(clientHost, clientPort, CLIENTNETRWTIMEOUT, request,
connectDuration);
} catch(castor::exception::Exception &ex) {
castor::exception::Exception ce(ECANCELED);
ce.getMessage() <<
"Failed to send FileToMigrateRequest"
": " << ex.getMessage().str();
throw(ce);
TAPE_THROW_CODE(ex.code(),
": Failed to send FileToMigrateRequest"
": " << ex.getMessage().str());
}
}
......@@ -211,12 +222,19 @@ castor::tape::tapegateway::FileToMigrate
if(reply == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to receive file to migrate reply and close connection"
": Failed to down cast reply object to tapegateway::FileToMigrate");
}
checkTransactionIds("FileToMigrate",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
try {
checkTransactionIds("FileToMigrate",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to receive file to migrate reply and close connection"
": " << ex.getMessage().str());
}
// Release the reply message from its smart pointer and return it
obj.release();
......@@ -232,12 +250,19 @@ castor::tape::tapegateway::FileToMigrate
if(reply == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to receive file to migrate reply and close connection"
": Failed to down cast reply object to tapegateway::NoMoreFiles");
}
checkTransactionIds("NoMoreFiles",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
try {
checkTransactionIds("NoMoreFiles",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to receive file to migrate reply and close connection"
": " << ex.getMessage().str());
}
}
return NULL;
......@@ -249,6 +274,7 @@ castor::tape::tapegateway::FileToMigrate
default:
{
TAPE_THROW_CODE(EBADMSG,
": Failed to receive file to migrate reply and close connection"
": Unknown reply type "
": Reply type = " << obj->type());
}
......@@ -280,13 +306,9 @@ int castor::tape::tapebridge::ClientTxRx::sendFileToRecallRequest(
return sendMessage(clientHost, clientPort, CLIENTNETRWTIMEOUT, request,
connectDuration);
} catch(castor::exception::Exception &ex) {
castor::exception::Exception ce(ECANCELED);
ce.getMessage() <<
"Failed to send FileToRecallRequest"
": " << ex.getMessage().str();
throw(ce);
TAPE_THROW_CODE(ex.code(),
": Failed to send FileToRecallRequest"
": " << ex.getMessage().str());
}
}
......@@ -313,12 +335,19 @@ castor::tape::tapegateway::FileToRecall
if(reply == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to receive file to recall reply and close connection"
": Failed to down cast reply object to tapegateway::FileToRecall");
}
checkTransactionIds("FileToRecall",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
try {
checkTransactionIds("FileToRecall",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to receive file to recall reply and close connection"
": " << ex.getMessage().str());
}
// Release the reply message from its smart pointer and return it
obj.release();
......@@ -334,12 +363,19 @@ castor::tape::tapegateway::FileToRecall
if(reply == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to receive file to recall reply and close connection"
": Failed to down cast reply object to tapegateway::NoMoreFiles");
}
checkTransactionIds("NoMoreFiles",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
try {
checkTransactionIds("NoMoreFiles",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to receive file to recall reply and close connection"
": " << ex.getMessage().str());
}
}
return NULL;
......@@ -351,6 +387,7 @@ castor::tape::tapegateway::FileToRecall
default:
{
TAPE_THROW_CODE(EBADMSG,
": Failed to receive file to recall reply and close connection"
": Unknown reply type "
": Reply type = " << obj->type());
}
......@@ -361,63 +398,6 @@ castor::tape::tapegateway::FileToRecall
}
//-----------------------------------------------------------------------------
// sendFileMigratedNotification
//-----------------------------------------------------------------------------
int castor::tape::tapebridge::ClientTxRx::sendFileMigratedNotification(
const uint32_t mountTransactionId,
const uint64_t aggregatorTransactionId,
const char *clientHost,
const unsigned short clientPort,
time_t &connectDuration,
const uint64_t fileTransactionId,
const char (&nsHost)[CA_MAXHOSTNAMELEN+1],
const uint64_t fileId,
const int32_t tapeFileSeq,
const unsigned char (&blockId)[4],
const int32_t positionCommandCode,
const char (&checksumAlgorithm)[CA_MAXCKSUMNAMELEN+1],
const uint32_t checksum,
const uint64_t fileSize,
const uint64_t compressedFileSize)
throw(castor::exception::Exception) {
// Prepare the notification
tapegateway::FileMigratedNotification notification;
notification.setMountTransactionId(mountTransactionId);
notification.setAggregatorTransactionId(aggregatorTransactionId);
notification.setFileTransactionId(fileTransactionId);
notification.setNshost(nsHost);
notification.setFileid(fileId);
notification.setBlockId0(blockId[0]); // block ID=4 integers
// (little indian order)
notification.setBlockId1(blockId[1]);
notification.setBlockId2(blockId[2]);
notification.setBlockId3(blockId[3]);
notification.setFseq(tapeFileSeq);
notification.setPositionCommandCode(
(tapegateway::PositionCommandCode)positionCommandCode);
notification.setChecksumName(checksumAlgorithm);
notification.setChecksum(checksum);
notification.setFileSize(fileSize);
notification.setCompressedFileSize( compressedFileSize);
// Send the notification and return the socket-descriptor of the connection
try {
return sendMessage(clientHost, clientPort, CLIENTNETRWTIMEOUT,
notification, connectDuration);
} catch(castor::exception::Exception &ex) {
castor::exception::Exception ce(ECANCELED);
ce.getMessage() <<
"Failed to send FileMigratedNotification"
": " << ex.getMessage().str();
throw(ce);
}
}
//-----------------------------------------------------------------------------
// sendFileRecalledNotification
//-----------------------------------------------------------------------------
......@@ -458,13 +438,9 @@ int castor::tape::tapebridge::ClientTxRx::sendFileRecalledNotification(
return sendMessage(clientHost, clientPort, CLIENTNETRWTIMEOUT,
notification, connectDuration);
} catch(castor::exception::Exception &ex) {
castor::exception::Exception ce(ECANCELED);
ce.getMessage() <<
TAPE_THROW_CODE(ex.code(),
"Failed to send FileRecalledNotification"
": " << ex.getMessage().str();
throw(ce);
": " << ex.getMessage().str());
}
}
......@@ -554,6 +530,7 @@ castor::tape::tapegateway::DumpParameters
if(reply == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to get dump parameters"
": Failed to down cast reply object to tapegateway::DumpParameters");
}
......@@ -561,9 +538,15 @@ castor::tape::tapegateway::DumpParameters
TAPEBRIDGE_GOT_DUMP_PARAMETERS_FROM_CLIENT, *reply, connectDuration,
sendRecvDuration);
checkTransactionIds("DumpParameters",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
try {
checkTransactionIds("DumpParameters",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to get dump parameters"
": " << ex.getMessage().str());
}
// Release the reply message from its smart pointer and return it
obj.release();
......@@ -578,6 +561,7 @@ castor::tape::tapegateway::DumpParameters
default:
{
TAPE_THROW_CODE(EBADMSG,
": Failed to get dump parameters"
": Unknown reply type "
": Reply type = " << obj->type());
}
......@@ -822,6 +806,7 @@ int castor::tape::tapebridge::ClientTxRx::sendMessage(
connectDuration = time(NULL) - connectStartTime;
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to send message"
": Failed to connect to client"
": clientHost=" << clientHost <<
": clientPort=" << clientPort <<
......@@ -834,6 +819,7 @@ int castor::tape::tapebridge::ClientTxRx::sendMessage(
sock.sendObject(message);
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to send message"
": Failed to send request to client"
": clientHost=" << clientHost <<
": clientPort=" << clientPort <<
......@@ -867,13 +853,17 @@ castor::IObject *castor::tape::tapebridge::ClientTxRx::receiveReplyAndClose(
if(obj.get() == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to receive reply and close connection"
": ClientSocket::readObject() returned null");
}
} catch(castor::exception::Exception &ex) {
std::stringstream oss;
oss << ": Failed to read reply from client: " << ex.getMessage().str();
oss <<
": Failed to receive reply and close connection"
": Failed to read reply from client"
": " << ex.getMessage().str();
if(ex.code() == SETIMEDOUT) {
oss << ": Timed out after " << sock.timeout() << " seconds";
......@@ -912,6 +902,7 @@ castor::IObject
connectDuration = time(NULL) - connectStartTime;
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to send request and receive reply"
": Failed to connect to client to send " << requestTypeName <<
": " << ex.getMessage().str());
}
......@@ -922,6 +913,7 @@ castor::IObject
sock.sendObject(request);
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to send request and receive reply"
": Failed to send " << requestTypeName << " to client"
": " << ex.getMessage().str());
}
......@@ -934,6 +926,7 @@ castor::IObject
if(obj.get() == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to send request and receive reply"
": ClientSocket::readObject() returned null");
}
sendRecvDuration = time(NULL) - sendAndReceiveStartTime;
......@@ -941,8 +934,10 @@ castor::IObject
std::stringstream oss;
oss << ": Failed to read " << requestTypeName << " reply from client: "
<< ex.getMessage().str();
oss <<
": Failed to send request and receive reply"
": Failed to read " << requestTypeName << " reply from client"
": " << ex.getMessage().str();
if(ex.code() == SETIMEDOUT) {
oss << ": Timed out after " << sock.timeout() << " seconds";
......@@ -979,13 +974,20 @@ void castor::tape::tapebridge::ClientTxRx::receiveNotificationReplyAndClose(
if(reply == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to receive notification reply and close connection"
": Failed to down cast reply object to "
"tapegateway::NotificationAcknowledge");
}
checkTransactionIds("NotificationAcknowledge",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
try {
checkTransactionIds("NotificationAcknowledge",
mountTransactionId , reply->mountTransactionId(),
aggregatorTransactionId, reply->aggregatorTransactionId());
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(ex.code(),
": Failed to receive notification reply and close connection"
": " << ex.getMessage().str());
}
}
break;
......@@ -997,6 +999,7 @@ void castor::tape::tapebridge::ClientTxRx::receiveNotificationReplyAndClose(
default:
{
TAPE_THROW_CODE(EBADMSG,
": Failed to receive notification reply and close connection"
": Unknown reply type "
": Reply type = " << obj->type());
}
......@@ -1034,6 +1037,7 @@ void castor::tape::tapebridge::ClientTxRx::notifyClient(
if(reply == NULL) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to notify client"
": Failed to down cast reply object to "
"tapegateway::NotificationAcknowledge");
}
......@@ -1052,6 +1056,7 @@ void castor::tape::tapebridge::ClientTxRx::notifyClient(
default:
{
TAPE_THROW_CODE(EBADMSG,
": Failed to notify client"
": Unknown reply type "
": Reply type = " << obj->type());
}
......@@ -1100,19 +1105,19 @@ void castor::tape::tapebridge::ClientTxRx::checkTransactionIds(
if(expectedMountTransactionId != actualMountTransactionId) {
nbErrors++;
errorMessage <<
": " << messageTypeName <<
" message contains a mount transaction ID mismatch"
": expectedMountTransactionId=" << expectedMountTransactionId <<
": actualMountTransactionId=" << actualMountTransactionId;
": Mount transaction ID mismatch"
": messageTypeName=" << messageTypeName <<
" expectedMountTransactionId=" << expectedMountTransactionId <<
" actualMountTransactionId=" << actualMountTransactionId;
}
if(expectedTapeBridgeTransactionId != actualTapeBridgeTransactionId) {
nbErrors++;
errorMessage <<
": " << messageTypeName <<
" message contains an tapebridge transaction ID mismatch"
": expectedTapeBridgeTransactionId=" << expectedTapeBridgeTransactionId <<
": actualTapeBridgeTransactionId=" << actualTapeBridgeTransactionId;
": Tape-bridge transaction ID mismatch"
": messageTypeName=" << messageTypeName <<
" expectedTapeBridgeTransactionId=" << expectedTapeBridgeTransactionId <<
" actualTapeBridgeTransactionId=" << actualTapeBridgeTransactionId;
}
if(nbErrors > 0) {
TAPE_THROW_CODE(EBADMSG,
......
......@@ -167,51 +167,6 @@ public:
const int clientSock)
throw(castor::exception::Exception);
/**
* Sends a FileMigratedNotification to the client and returns the
* socket-descriptor of the client connection from which the reply will be
* read later.
*
* @param mountTransactionId The mount transaction ID to be sent to the
* client.
* @param aggregatorTransactionId The tapebridge transaction ID to be sent to
* the client.
* @param clientHost The client host name.
* @param clientPort The client port number.
* @param connectDuration Out parameter: The number of seconds it took to
* connect to the client.
* @param fileTransactionId The file transaction ID.
* @param nsHost The name server host.
* @param fileId The CASTOR file ID.
* @param tapeFileSeq The tape file seuence number.
* @param blockId The tape block ID.
* @param positionCommandCode The position method uesd by the drive.
* @param checksumAlgorithm The name of the checksum algorithm.
* @param checksum The file checksum.
* @param fileSize The size of the file without compression.
* @param compressedFileSize The size of on-tape compressed file.
* @return The socket-descriptor of the tape-gateway
* connection from which the reply will be read
* later.
*/
static int sendFileMigratedNotification(
const uint32_t mountTransactionId,
const uint64_t aggregatorTransactionId,
const char *clientHost,
const unsigned short clientPort,