Commit 14a4ee90 authored by Steven Murray's avatar Steven Murray
Browse files

bug #85949: RFE: Add bulk messages to tapegatewayd/tapebridged protocol

Merged v2_1_11Version-TapeBulkProtocols branch into trunk.
parent 855e3eda
......@@ -35,15 +35,25 @@
#include "castor/tape/tapebridge/Constants.hpp"
#include "castor/tape/tapebridge/ClientTxRx.hpp"
#include "castor/tape/tapebridge/DlfMessageConstants.hpp"
#include "castor/tape/tapebridge/FailedToCopyTapeFile.hpp"
#include "castor/tape/tapebridge/FailedToMigrateFileToTape.hpp"
#include "castor/tape/tapebridge/FailedToRecallFileFromTape.hpp"
#include "castor/tape/tapebridge/LegacyTxRx.hpp"
#include "castor/tape/tapebridge/RequestToMigrateFile.hpp"
#include "castor/tape/tapebridge/RtcpTxRx.hpp"
#include "castor/tape/tapebridge/TapeFlushConfigParams.hpp"
#include "castor/tape/net/net.hpp"
#include "castor/tape/tapegateway/EndNotification.hpp"
#include "castor/tape/tapegateway/EndNotificationErrorReport.hpp"
#include "castor/tape/tapegateway/FileMigratedNotification.hpp"
#include "castor/tape/tapegateway/FileToMigrate.hpp"
#include "castor/tape/tapegateway/FileToRecall.hpp"
#include "castor/tape/tapegateway/FileMigratedNotificationStruct.hpp"
#include "castor/tape/tapegateway/FileMigrationReportList.hpp"
#include "castor/tape/tapegateway/FileRecalledNotificationStruct.hpp"
#include "castor/tape/tapegateway/FileRecallReportList.hpp"
#include "castor/tape/tapegateway/FilesToMigrateList.hpp"
#include "castor/tape/tapegateway/FilesToMigrateListRequest.hpp"
#include "castor/tape/tapegateway/FilesToRecallList.hpp"
#include "castor/tape/tapegateway/FilesToRecallListRequest.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tapegateway/FileToRecallStruct.hpp"
#include "castor/tape/tapegateway/NoMoreFiles.hpp"
#include "castor/tape/tapegateway/NotificationAcknowledge.hpp"
#include "castor/tape/utils/SmartFd.hpp"
......@@ -79,14 +89,15 @@ castor::tape::tapebridge::BridgeProtocolEngine::BridgeProtocolEngine(
m_cuuid(cuuid),
m_jobRequest(jobRequest),
m_volume(volume),
m_nextDestinationFseq(nbFilesOnDestinationTape + 1),
m_nextDestinationTapeFSeq(nbFilesOnDestinationTape + 1),
m_stoppingGracefully(stoppingGracefully),
m_tapebridgeTransactionCounter(tapebridgeTransactionCounter),
m_nbReceivedENDOF_REQs(0),
m_pendingTransferIds(MAXPENDINGTRANSFERS),
m_pendingMigrationsStore(
tapeFlushConfigParams.getMaxBytesBeforeFlush().value,
tapeFlushConfigParams.getMaxFilesBeforeFlush().value) {
tapeFlushConfigParams.getMaxFilesBeforeFlush().value),
m_sessionWithRtcpdIsFinished(false) {
// Store the listen socket and initial rtcpd connection in the socket
// catalogue
......@@ -111,10 +122,10 @@ castor::tape::tapebridge::BridgeProtocolEngine::BridgeProtocolEngine(
&BridgeProtocolEngine::giveOutpRtcpdCallback;
// Build the map of client message handlers
m_clientHandlers[OBJ_FileToMigrate] =
&BridgeProtocolEngine::fileToMigrateClientCallback;
m_clientHandlers[OBJ_FileToRecall] =
&BridgeProtocolEngine::fileToRecallClientCallback;
m_clientHandlers[OBJ_FilesToMigrateList] =
&BridgeProtocolEngine::filesToMigrateListClientCallback;
m_clientHandlers[OBJ_FilesToRecallList] =
&BridgeProtocolEngine::filesToRecallListClientCallback;
m_clientHandlers[OBJ_NoMoreFiles] =
&BridgeProtocolEngine::noMoreFilesClientCallback;
m_clientHandlers[OBJ_EndNotificationErrorReport] =
......@@ -211,8 +222,8 @@ void castor::tape::tapebridge::BridgeProtocolEngine::processSocks()
timeval timeout;
// Select loop
bool continueRtcopySession = true;
while(continueRtcopySession) {
bool continueProcessingSocks = true;
while(continueProcessingSocks) {
// Throw an exception if the daemon is stopping gracefully
if(m_stoppingGracefully()) {
castor::exception::Exception ex(ECANCELED);
......@@ -235,8 +246,10 @@ void castor::tape::tapebridge::BridgeProtocolEngine::processSocks()
nbOneSecondTimeouts++;
// Ping rtcpd if the RTCPDPINGTIMEOUT has been reached
if(nbOneSecondTimeouts % RTCPDPINGTIMEOUT == 0) {
// Ping rtcpd if the session with rtcpd is not finished and the
// RTCPDPINGTIMEOUT has been reached
if(!m_sessionWithRtcpdIsFinished &&
(nbOneSecondTimeouts % RTCPDPINGTIMEOUT == 0)) {
RtcpTxRx::pingRtcpd(m_cuuid, m_jobRequest.volReqId,
m_sockCatalogue.getInitialRtcpdConn(), RTCPDNETRWTIMEOUT);
}
......@@ -299,21 +312,27 @@ void castor::tape::tapebridge::BridgeProtocolEngine::processSocks()
default: // One or more select file descriptors require attention
continueRtcopySession = processAPendingSocket(readFdSet);
processAPendingSocket(readFdSet);
} // switch(selectRc)
// Throw an exception if a timeout has occured
m_sockCatalogue.checkForTimeout();
} // while(continueRtcopySession)
// Determine whether or not continue processing sockets
continueProcessingSocks = !(
m_sessionWithRtcpdIsFinished &&
m_flushedBatches.empty() &&
!m_sockCatalogue.clientMigrationReportSockIsSet());
} // while(continueProcessingSocks)
}
//------------------------------------------------------------------------------
// processAPendingSocket
//------------------------------------------------------------------------------
bool castor::tape::tapebridge::BridgeProtocolEngine::processAPendingSocket(
void castor::tape::tapebridge::BridgeProtocolEngine::processAPendingSocket(
fd_set &readFdSet) throw(castor::exception::Exception) {
BridgeSocketCatalogue::SocketType sockType = BridgeSocketCatalogue::LISTEN;
......@@ -325,13 +344,20 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::processAPendingSocket(
switch(sockType) {
case BridgeSocketCatalogue::LISTEN:
return processPendingListenSocket();
processPendingListenSocket();
break;
case BridgeSocketCatalogue::INITIAL_RTCPD:
return processPendingInitialRtcpdSocket(pendingSock);
processPendingInitialRtcpdSocket(pendingSock);
break;
case BridgeSocketCatalogue::RTCPD_DISK_TAPE_IO_CONTROL:
return processPendingRtcpdDiskTapeIOControlSocket(pendingSock);
processPendingRtcpdDiskTapeIOControlSocket(pendingSock);
break;
case BridgeSocketCatalogue::CLIENT:
return processPendingClientSocket(pendingSock);
processPendingClientSocket(pendingSock);
break;
case BridgeSocketCatalogue::CLIENT_MIGRATION_REPORT:
processPendingClientMigrationReportSocket(pendingSock);
break;
default:
TAPE_THROW_EX(exception::Internal,
"Unknown socket type"
......@@ -343,25 +369,22 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::processAPendingSocket(
//------------------------------------------------------------------------------
// processPendingListenSocket
//------------------------------------------------------------------------------
bool castor::tape::tapebridge::BridgeProtocolEngine::
void castor::tape::tapebridge::BridgeProtocolEngine::
processPendingListenSocket() throw(castor::exception::Exception) {
int acceptedConnection = 0;
const int acceptedConnection = acceptRtcpdConnection();
// Accept the connection
m_sockCatalogue.addRtcpdDiskTapeIOControlConn(
acceptedConnection = acceptRtcpdConnection());
m_sockCatalogue.addRtcpdDiskTapeIOControlConn(acceptedConnection);
// Throw an exception if connection is not from localhost
checkPeerIsLocalhost(acceptedConnection);
return true; // Continue the RTCOPY session
}
//------------------------------------------------------------------------------
// processPendingInitialRtcpdSocket
//------------------------------------------------------------------------------
bool castor::tape::tapebridge::BridgeProtocolEngine::
void castor::tape::tapebridge::BridgeProtocolEngine::
processPendingInitialRtcpdSocket(const int pendingSock)
throw(castor::exception::Exception) {
......@@ -373,31 +396,33 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::
": value=" << pendingSock);
}
// Not expecting any data from or a close of this connection. Determine
// which and throw an exception with the appropriate error message.
bool connClosed = false;
char dummyBuf[1];
net::readBytesFromCloseable(connClosed, pendingSock, RTCPDNETRWTIMEOUT,
sizeof(dummyBuf), dummyBuf);
exception::Exception ce(ECANCELED);
if(connClosed) {
ce.getMessage() <<
"Initial rtcpd connection un-expectedly closed";
} else {
ce.getMessage() <<
"Received un-expected data from the initial rtcpd connection";
// Determine whether rtcpd closed the connection or whether it sent some data
bool rtcpdClosedConnection = false;
try {
char dummyBuf[1];
net::readBytesFromCloseable(rtcpdClosedConnection, pendingSock,
RTCPDNETRWTIMEOUT, sizeof(dummyBuf), dummyBuf);
} catch(castor::exception::Exception &ex) {
TAPE_THROW_EX(castor::exception::Internal,
": Failed to determine why the socket of the initial rtcpd connection"
" has become pending"
": " << ex.getMessage().str());
}
throw(ce);
// Impossible to reach this line
return true;
if(rtcpdClosedConnection) {
TAPE_THROW_CODE(ECANCELED,
": Initial rtcpd connection un-expectedly closed");
} else {
TAPE_THROW_CODE(ECANCELED,
": Received un-expected data from the initial rtcpd connection");
}
}
//------------------------------------------------------------------------------
// processPendingRtcpdDiskTapeIOControlSocket
//------------------------------------------------------------------------------
bool castor::tape::tapebridge::BridgeProtocolEngine::
void castor::tape::tapebridge::BridgeProtocolEngine::
processPendingRtcpdDiskTapeIOControlSocket(const int pendingSock)
throw(castor::exception::Exception) {
......@@ -420,8 +445,7 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::
m_jobRequest.volReqId, pendingSock, RTCPDNETRWTIMEOUT, header);
// If the peer closed its side of the connection, then close this side
// of the connection and return true in order to continue the RTCOPY
// session
// of the connection and return in order to continue the RTCOPY session
if(peerClosed) {
close(m_sockCatalogue.releaseRtcpdDiskTapeIOControlConn(pendingSock));
......@@ -439,7 +463,7 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::
castor::dlf::dlf_writep(m_cuuid, DLF_LVL_SYSTEM,
TAPEBRIDGE_CLOSED_RTCPD_DISK_TAPE_CONNECTION_DUE_TO_PEER, params);
return true; // Continue the RTCOPY session
return; // Continue the RTCOPY session
}
}
......@@ -448,10 +472,10 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::
bool receivedENDOF_REQ = false;
processRtcpdRequest(header, pendingSock, receivedENDOF_REQ);
// If the message processed was not an ENDOF_REQ, then return true in
// order to continue the RTCOPY session
// If the message processed was not an ENDOF_REQ, then return in order to
// continue the RTCOPY session
if(!receivedENDOF_REQ) {
return true; // Continue the RTCOPY session
return; // Continue the RTCOPY session
}
}
......@@ -478,63 +502,19 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::
close(m_sockCatalogue.releaseRtcpdDiskTapeIOControlConn(pendingSock));
// If only the initial callback connection is open, then send an
// RTCP_ENDOF_REQ message to rtcpd and close the connection
if(m_sockCatalogue.getNbDiskTapeIOControlConns() == 0) {
// Send an RTCP_ENDOF_REQ message to rtcpd via the initial callback
// connection
legacymsg::MessageHeader endofReqMsg;
endofReqMsg.magic = RTCOPY_MAGIC;
endofReqMsg.reqType = RTCP_ENDOF_REQ;
endofReqMsg.lenOrStatus = 0;
LegacyTxRx::sendMsgHeader(m_cuuid, m_jobRequest.volReqId,
m_sockCatalogue.getInitialRtcpdConn(), RTCPDNETRWTIMEOUT,
endofReqMsg);
{
castor::dlf::Param params[] = {
castor::dlf::Param("mountTransactionId" , m_jobRequest.volReqId ),
castor::dlf::Param("volReqId" , m_jobRequest.volReqId ),
castor::dlf::Param("TPVID" , m_volume.vid() ),
castor::dlf::Param("driveUnit" , m_jobRequest.driveUnit ),
castor::dlf::Param("dgn" , m_jobRequest.dgn ),
castor::dlf::Param("clientHost" , m_jobRequest.clientHost),
castor::dlf::Param("clientPort" , m_jobRequest.clientPort),
castor::dlf::Param("clientType",
utils::volumeClientTypeToString(m_volume.clientType())),
castor::dlf::Param("initialSocketFd" ,
m_sockCatalogue.getInitialRtcpdConn()),
castor::dlf::Param("nbReceivedENDOF_REQs", m_nbReceivedENDOF_REQs)};
castor::dlf::dlf_writep(m_cuuid, DLF_LVL_SYSTEM,
TAPEBRIDGE_SEND_END_OF_SESSION_TO_RTCPD, params);
}
// Receive the acknowledge of the RTCP_ENDOF_REQ message
legacymsg::MessageHeader ackMsg;
try {
LegacyTxRx::receiveMsgHeader(m_cuuid, m_jobRequest.volReqId,
m_sockCatalogue.getInitialRtcpdConn(), RTCPDNETRWTIMEOUT, ackMsg);
} catch(castor::exception::Exception &ex) {
TAPE_THROW_CODE(EPROTO,
": Failed to receive acknowledge of RTCP_ENDOF_REQ from rtcpd: "
<< ex.getMessage().str());
}
// The initial callback connection will be closed shortly by the
// VdqmRequestHandler
return false; // End the RTCOPY session
// If only the initial callback connection is open, then record the fact that
// the session with the rtcpd daemon is over and the rtcpd daemon is awaiting
// the final RTCP_ENDOF_REQ message from the tapebridged daemon
if(0 == m_sockCatalogue.getNbDiskTapeIOControlConns()) {
m_sessionWithRtcpdIsFinished = true;
}
return true; // Continue the RTCOPY session
}
//------------------------------------------------------------------------------
// processPendingClientSocket
//------------------------------------------------------------------------------
bool castor::tape::tapebridge::BridgeProtocolEngine::
void castor::tape::tapebridge::BridgeProtocolEngine::
processPendingClientSocket(const int pendingSock)
throw(castor::exception::Exception) {
......@@ -551,7 +531,7 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::
int rtcpdSock = 0;
uint32_t rtcpdReqMagic = 0;
uint32_t rtcpdReqType = 0;
char *rtcpdReqTapePath = NULL;
const char *rtcpdReqTapePath = NULL;
uint64_t tapebridgeTransId = 0;
struct timeval clientReqTimeStamp = {0, 0};
m_sockCatalogue.getRtcpdConn(pendingSock, rtcpdSock, rtcpdReqMagic,
......@@ -595,8 +575,65 @@ bool castor::tape::tapebridge::BridgeProtocolEngine::
": message type=" << utils::objectTypeToString(obj->type()) <<
": " << ex.getMessage().str());
}
}
return true; // Continue the RTCOPY session
//------------------------------------------------------------------------------
// processPendingClientMigrationReportSocket
//------------------------------------------------------------------------------
void castor::tape::tapebridge::BridgeProtocolEngine::
processPendingClientMigrationReportSocket(const int pendingSock)
throw(castor::exception::Exception) {
// Check function arguments
if(pendingSock < 0) {
TAPE_THROW_EX(castor::exception::InvalidArgument,
": Invalid method argument"
": pendingSock is an invalid socket descriptor"
": value=" << pendingSock);
}
// Release the socket-descriptor from the catalogue
uint64_t tapebridgeTransId = 0;
utils::SmartFd catalogueSock(
m_sockCatalogue.releaseClientMigrationReportSock(tapebridgeTransId));
// Check for a mismatch between the pending and catalogue socket-decriptors
if(pendingSock != catalogueSock.get()) {
TAPE_THROW_EX(castor::exception::Internal,
": Client migration-report socket-descriptor mismatch"
": pendingSock=" << pendingSock <<
": catalogueSock=" << catalogueSock.get());
}
// Receive reply from client and close the connection
const int closedClientSock = catalogueSock.release();
ClientTxRx::receiveNotificationReplyAndClose(m_jobRequest.volReqId,
tapebridgeTransId, closedClientSock);
{
castor::dlf::Param params[] = {
castor::dlf::Param("tapebridgeTransId" , tapebridgeTransId ),
castor::dlf::Param("mountTransActionId", m_jobRequest.volReqId ),
castor::dlf::Param("volReqId" , m_jobRequest.volReqId ),
castor::dlf::Param("TPVID" , m_volume.vid() ),
castor::dlf::Param("driveUnit" , m_jobRequest.driveUnit ),
castor::dlf::Param("dgn" , m_jobRequest.dgn ),
castor::dlf::Param("clientHost" , m_jobRequest.clientHost),
castor::dlf::Param("clientPort" , m_jobRequest.clientPort),
castor::dlf::Param("clientType",
utils::volumeClientTypeToString(m_volume.clientType())),
castor::dlf::Param("clientSock" , closedClientSock )};
castor::dlf::dlf_writep(m_cuuid, DLF_LVL_SYSTEM,
TAPEBRIDGE_RECEIVED_ACK_OF_NOTIFICATION, params);
}
// If there is a pending batch of flushed files to be reported to the client,
// then send the batch to the client
if(!m_flushedBatches.empty()) {
const FileWrittenNotificationList migrations = m_flushedBatches.front();
m_flushedBatches.pop_front();
sendFlushedMigrationsToClient(migrations);
}
}
......@@ -634,9 +671,11 @@ void castor::tape::tapebridge::BridgeProtocolEngine::runMigrationSession()
time_t connectDuration = 0;
const uint64_t tapebridgeTransId =
m_tapebridgeTransactionCounter.next();
utils::SmartFd clientSock(ClientTxRx::sendFileToMigrateRequest(
const uint64_t maxFiles = 1;
const uint64_t maxBytes = 1;
utils::SmartFd clientSock(ClientTxRx::sendFilesToMigrateListRequest(
m_jobRequest.volReqId, tapebridgeTransId, m_jobRequest.clientHost,
m_jobRequest.clientPort, connectDuration));
m_jobRequest.clientPort, maxFiles, maxBytes, connectDuration));
{
castor::dlf::Param params[] = {
castor::dlf::Param("tapebridgeTransId" , tapebridgeTransId ),
......@@ -647,26 +686,28 @@ void castor::tape::tapebridge::BridgeProtocolEngine::runMigrationSession()
castor::dlf::Param("dgn" , m_jobRequest.dgn ),
castor::dlf::Param("clientHost" , m_jobRequest.clientHost),
castor::dlf::Param("clientPort" , m_jobRequest.clientPort),
castor::dlf::Param("maxFiles" , maxFiles ),
castor::dlf::Param("maxBytes" , maxBytes ),
castor::dlf::Param("clientType" ,
utils::volumeClientTypeToString(m_volume.clientType())),
castor::dlf::Param("clientSock" , clientSock.get() ),
castor::dlf::Param("connectDuration" , connectDuration )};
castor::dlf::dlf_writep(m_cuuid, DLF_LVL_SYSTEM,
TAPEBRIDGE_SENT_FILETOMIGRATEREQUEST, params);
TAPEBRIDGE_SENT_FILESTOMIGRATELISTREQUEST, params);
}
// Receive the reply
int closedClientSock = 0;
std::auto_ptr<tapegateway::FileToMigrate> fileFromClient(
ClientTxRx::receiveFileToMigrateReplyAndClose(m_jobRequest.volReqId,
tapebridgeTransId, closedClientSock = clientSock.release()));
const int closedClientSock = clientSock.release();
std::auto_ptr<tapegateway::FilesToMigrateList> filesFromClient(
ClientTxRx::receiveFilesToMigrateListRequestReplyAndClose(
m_jobRequest.volReqId, tapebridgeTransId, closedClientSock));
if(fileFromClient.get() != NULL) {
if(filesFromClient.get() != NULL) {
castor::dlf::Param params[] = {
castor::dlf::Param("tapebridgeTransId",
fileFromClient->aggregatorTransactionId()),
filesFromClient->aggregatorTransactionId()),
castor::dlf::Param("mountTransactionId",
fileFromClient->mountTransactionId()),
filesFromClient->mountTransactionId()),
castor::dlf::Param("volReqId" , m_jobRequest.volReqId ),
castor::dlf::Param("TPVID" , m_volume.vid() ),
castor::dlf::Param("driveUnit" , m_jobRequest.driveUnit ),
......@@ -676,11 +717,10 @@ void castor::tape::tapebridge::BridgeProtocolEngine::runMigrationSession()
castor::dlf::Param("clientType",
utils::volumeClientTypeToString(m_volume.clientType())),
castor::dlf::Param("clientSock", closedClientSock ),
castor::dlf::Param("fileTransactionId",
fileFromClient->fileTransactionId() ),
castor::dlf::Param("path" , fileFromClient->path() )};
castor::dlf::Param("nbFiles" ,
filesFromClient->filesToMigrate().size())};
castor::dlf::dlf_writep(m_cuuid, DLF_LVL_SYSTEM,
TAPEBRIDGE_RECEIVED_FILETOMIGRATE, params);
TAPEBRIDGE_RECEIVED_FILESTOMIGRATELIST, params);
} else {
castor::dlf::Param params[] = {
castor::dlf::Param("mountTransactionId", m_jobRequest.volReqId ),
......@@ -697,61 +737,46 @@ void castor::tape::tapebridge::BridgeProtocolEngine::runMigrationSession()
TAPEBRIDGE_RECEIVED_NOMOREFILES, params);
}
// If there is no file to migrate
if(fileFromClient.get() == NULL) {
const uint64_t tapebridgeTransId =
m_tapebridgeTransactionCounter.next();
// Notify client of end of session and return
try {
ClientTxRx::notifyEndOfSession(m_cuuid, m_jobRequest.volReqId,
tapebridgeTransId, m_jobRequest.clientHost,
m_jobRequest.clientPort);
} catch(castor::exception::Exception &ex) {
// Don't rethrow, just log the exception
castor::dlf::Param params[] = {
castor::dlf::Param("tapebridgeTransId" , tapebridgeTransId ),
castor::dlf::Param("mountTransactionId", m_jobRequest.volReqId ),
castor::dlf::Param("volReqId" , m_jobRequest.volReqId ),
castor::dlf::Param("TPVID" , m_volume.vid() ),
castor::dlf::Param("driveUnit" , m_jobRequest.driveUnit ),
castor::dlf::Param("dgn" , m_jobRequest.dgn ),
castor::dlf::Param("clientHost" , m_jobRequest.clientHost),
castor::dlf::Param("clientPort" , m_jobRequest.clientPort),
castor::dlf::Param("clientType",
utils::volumeClientTypeToString(m_volume.clientType())),
castor::dlf::Param("Message" , ex.getMessage().str() ),
castor::dlf::Param("Code" , ex.code() )};
castor::dlf::dlf_writep(m_cuuid, DLF_LVL_ERROR,
TAPEBRIDGE_FAILED_TO_NOTIFY_CLIENT_END_OF_SESSION, params);
}
// If there is no file to migrate, then notify the client of the end of
// session and return
if(filesFromClient.get() == NULL) {
notifyClientEndOfSession();
return;
}
// If the client is the tape gateway
// For the very first file to migrate the FilesToMigrateList message
// should only contain one file
if(1 != filesFromClient->filesToMigrate().size()) {
TAPE_THROW_CODE(ENOTSUP,
": The FilesToMigrateList message for the first file to migrate should"
" only contain one file"
": nbFiles=" << filesFromClient->filesToMigrate().size());
}
const tapegateway::FileToMigrateStruct *const firstFileToMigrate =
filesFromClient->filesToMigrate()[0];
// If the client is the tape-gateway then cross-check and update the next
// expected tape-file sequence-number
if(tapegateway::TAPE_GATEWAY == m_volume.clientType()) {
// If the tape file sequence number from the client is invalid
if((uint32_t)fileFromClient->fseq() != m_nextDestinationFseq) {
if(firstFileToMigrate->fseq() != m_nextDestinationTapeFSeq) {
castor::exception::Exception ex(ECANCELED);
ex.getMessage() <<
"Invalid tape file sequence number from client"
": expected=" << (m_nextDestinationFseq) <<
": actual=" << fileFromClient->fseq();
": expected=" << (m_nextDestinationTapeFSeq) <<
": actual=" << firstFileToMigrate->fseq();
throw(ex);
}
// Update the next expected tape file sequence number
m_nextDestinationFseq++;
m_nextDestinationTapeFSeq++;
}
// Remember the file transaction ID and get its unique index to be passed
// to rtcpd through the "rtcpFileRequest.disk_fseq" message field
const uint32_t diskFseq =
m_pendingTransferIds.insert(fileFromClient->fileTransactionId());
const uint32_t diskFSeq = m_pendingTransferIds.insert(
firstFileToMigrate->fileTransactionId());
// Give volume to rtcpd
legacymsg::RtcpTapeRqstErrMsgBody rtcpVolume;
......@@ -770,23 +795,58 @@ void castor::tape::tapebridge::BridgeProtocolEngine::runMigrationSession()
RtcpTxRx::giveVolumeToRtcpd(m_cuuid, m_jobRequest.volReqId,
m_sockCatalogue.getInitialRtcpdConn(), RTCPDNETRWTIMEOUT, rtcpVolume);
// Add the first file to the pending migrations store
m_pendingMigrationsStore.add(*(fileFromClient.get()));
// Add the first file to migrate to the pending migrations store
{
RequestToMigrateFile request;
request.fileTransactionId = firstFileToMigrate->fileTransactionId();
request.nsHost = firstFileToMigrate->nshost();
request.nsFileId = firstFileToMigrate->fileid();
request.positionCommandCode =
firstFileToMigrate->positionCommandCode();
request.tapeFSeq = firstFileToMigrate->fseq();
request.fileSize = firstFileToMigrate->fileSize();
request.lastKnownFilename = firstFileToMigrate->lastKnownFilename();
request.lastModificationTime =
firstFileToMigrate->lastModificationTime();
request.path = firstFileToMigrate->path();
request.umask = firstFileToMigrate->umask();
m_pendingMigrationsStore.receivedRequestToMigrateFile(request);
}
{
castor::dlf::Param params[] = {
castor::dlf::Param("fseq", fileFromClient->fseq())};
castor::dlf::Param("tapebridgeTransId" ,
filesFromClient->aggregatorTransactionId()),
castor::dlf::Param("mountTransactionId",
filesFromClient->mountTransactionId()),
castor::dlf::Param("volReqId", m_jobRequest.volReqId),
castor::dlf::Param("TPVID", m_volume.vid()),
castor::dlf::Param("driveUnit", m_jobRequest.driveUnit),
castor::dlf::Param("dgn", m_jobRequest.dgn),
castor::dlf::Param("clientHost", m_jobRequest.clientHost),
castor::dlf::Param("clientPort", m_jobRequest.clientPort),
castor::dlf::Param("clientType",
utils::volumeClientTypeToString(m_volume.clientType())),
castor::dlf::Param("clientSock", closedClientSock),
castor::dlf::Param("fileTransactionId",
firstFileToMigrate->fileTransactionId()),
castor::dlf::Param("NSHOSTNAME", firstFileToMigrate->nshost()),
castor::dlf::Param("NSFILEID", firstFileToMigrate->fileid()),
castor::dlf::Param("tapeFSeq", firstFileToMigrate->fseq())};
castor::dlf::dlf_writep(m_cuuid, DLF_LVL_SYSTEM,
TAPEBRIDGE_ADDED_PENDING_MIGRATION_TO_STORE, params);