Commit 9a7deaa7 authored by Steven Murray's avatar Steven Murray
Browse files

Developed further the "get more work" part of the protocol with respect to

migrations.  Fixed a bug with a mix up of sockets.
parent f92e1458
......@@ -34,16 +34,18 @@ namespace castor {
namespace tape {
namespace aggregator {
const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
const size_t HOSTNAMEBUFLEN = 256;
const int LISTENBACKLOG = 2;
const int MAXINITMIGFILES = 2;
const size_t MSGBUFSIZ = 1024;
const int RTCPDNETRWTIMEOUT = 5;
const int RTCPDCALLBACKTIMEOUT = 5;
const int RTCPDPINGTIMEOUT = 10;
const size_t SERVICENAMEBUFLEN = 256;
const size_t STRERRORBUFLEN = 256;
const size_t HDRBUFSIZ = 3 * sizeof(uint32_t);
const size_t HOSTNAMEBUFLEN = 256;
const int LISTENBACKLOG = 2;
const int MAXINITMIGFILES = 2;
const uint32_t MIGRATEUMASK = 022;
const size_t MSGBUFSIZ = 1024;
const uint32_t RECALLUMASK = 077;
const int RTCPDNETRWTIMEOUT = 5;
const int RTCPDCALLBACKTIMEOUT = 5;
const int RTCPDPINGTIMEOUT = 10;
const size_t SERVICENAMEBUFLEN = 256;
const size_t STRERRORBUFLEN = 256;
} // namespace aggregator
} // namespace tape
......
......@@ -25,6 +25,7 @@
#include "castor/tape/aggregator/AggregatorDlfMessageConstants.hpp"
#include "castor/tape/aggregator/Constants.hpp"
#include "castor/tape/aggregator/GatewayTxRx.hpp"
#include "castor/tape/aggregator/RtcpFileRqstErrMsgBody.hpp"
#include "castor/tape/aggregator/RtcpFileRqstMsgBody.hpp"
#include "castor/tape/aggregator/RtcpTapeRqstErrMsgBody.hpp"
......@@ -32,6 +33,7 @@
#include "castor/tape/aggregator/TapeDiskRqstHandler.hpp"
#include "castor/tape/aggregator/RtcpTxRx.hpp"
#include "castor/tape/aggregator/Utils.hpp"
#include "h/Ctape_constants.h"
#include "h/rtcp_constants.h"
#include <errno.h>
......@@ -100,100 +102,70 @@ bool castor::tape::aggregator::TapeDiskRqstHandler::rtcpFileReqHandler(
RtcpTxRx::receiveRtcpFileRqstBody(socketFd, RTCPDNETRWTIMEOUT, header, body);
switch(body.procStatus) {
/*
case RTCP_REQUEST_MORE_WORK:
{
// If migrating
if(mode == WRITE_ENABLE) {
char filePath[CA_MAXPATHLEN+1];
char nsHost[CA_MAXHOSTNAMELEN+1];
uint64_t fileId;
uint32_t tapeFseq;
uint64_t fileSize;
char lastKnownFileName[CA_MAXPATHLEN+1];
uint64_t lastModificationTime;
// If there is a file to migrate
if(GatewayTxRx::getFileToMigrateFromGateway(gatewayHost, gatewayPort,
volReqId, filePath, nsHost, fileId, rtcpFileRequest.tapeFseq,
fileSize, lastKnownFileName, lastModificationTime)) {
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId" , volReqId ),
castor::dlf::Param("gatewayHost" , gatewayHost ),
castor::dlf::Param("gatewayPort" , gatewayPort ),
castor::dlf::Param("filePath" , filePath ),
castor::dlf::Param("nsHost" , nsHost ),
castor::dlf::Param("fileId" , fileId ),
castor::dlf::Param("tapeFseq" , tapeFseq ),
castor::dlf::Param("fileSize" , fileSize ),
castor::dlf::Param("lastKnownFileName" , lastKnownFileName ),
castor::dlf::Param("lastModificationTime", lastModificationTime)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_FILE_TO_MIGRATE, params);
I AM HERE
// Else there is no file to migrate
} else {
volReqId, filePath, nsHost, fileId, rtcpFileRequest.tapeFseq,
fileSize, lastKnownFileName, lastModificationTime)) {
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", volReqId ),
castor::dlf::Param("Port" , volPort ),
castor::dlf::Param("HostName", volHost )};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_NO_MORE_FILES_TO_MIGRATE, params);
// Tell RTCPD there is no file by sending an empty file list
RtcpTxRx::tellRtcpdEndOfFileList(rtcpdCallbackSocketFd,
RTCPDNETRWTIMEOUT);
}
return;
}
// Give file information to RTCPD
try {
// Send file to migrate to RTCPD
// If migrating
if(mode == WRITE_ENABLE) {
char filePath[CA_MAXPATHLEN+1];
char nsHost[CA_MAXHOSTNAMELEN+1];
uint64_t fileId;
uint32_t tapeFseq;
uint64_t fileSize;
char lastKnownFileName[CA_MAXPATHLEN+1];
uint64_t lastModificationTime;
// If there is a file to migrate
if(GatewayTxRx::getFileToMigrateFromGateway(gatewayHost, gatewayPort,
volReqId, filePath, nsHost, fileId, tapeFseq, fileSize,
lastKnownFileName, lastModificationTime)) {
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId" , volReqId ),
castor::dlf::Param("gatewayHost" , gatewayHost ),
castor::dlf::Param("gatewayPort" , gatewayPort ),
castor::dlf::Param("filePath" , filePath ),
castor::dlf::Param("nsHost" , nsHost ),
castor::dlf::Param("fileId" , fileId ),
castor::dlf::Param("tapeFseq" , tapeFseq ),
castor::dlf::Param("fileSize" , fileSize ),
castor::dlf::Param("lastKnownFileName" , lastKnownFileName ),
castor::dlf::Param("lastModificationTime", lastModificationTime)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_FILE_TO_MIGRATE, params);
char tapePath[CA_MAXPATHLEN+1];
Utils::toHex(fileId, tapePath);
RtcpTxRx::giveFileToRtcpd(socketFd, RTCPDNETRWTIMEOUT,
volReqId,
"lxc2disk07:/tmp/murrayc3/test_04_02_09", body.tapePath, 18);
volReqId, filePath, tapePath, MIGRATEUMASK);
// Send offer of more work to RTCPD
RtcpTxRx::giveRequestForMoreWorkToRtcpd(socketFd, RTCPDNETRWTIMEOUT,
volReqId);
// Send end of file list to RTPCD
RtcpTxRx::signalNoMoreRequestsToRtcpd(socketFd, RTCPDNETRWTIMEOUT);
RtcpTxRx::tellRtcpdEndOfFileList(socketFd, RTCPDNETRWTIMEOUT);
// Else there is no file to migrate
} else {
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", volReqId),
castor::dlf::Param("filePath","lxc2disk07:/dev/null"),
castor::dlf::Param("tapePath", body.tapePath)};
castor::dlf::Param("volReqId" , volReqId ),
castor::dlf::Param("gatewayHost", gatewayHost),
castor::dlf::Param("gatewayPort", gatewayPort)};
castor::dlf::dlf_writep(cuuid, DLF_LVL_SYSTEM,
AGGREGATOR_GAVE_FILE_INFO, params);
} catch(castor::exception::Exception &ex) {
castor::dlf::Param params[] = {
castor::dlf::Param("volReqId", volReqId),
castor::dlf::Param("filePath","lxc2disk07:/dev/null"),
castor::dlf::Param("tapePath", body.tapePath),
castor::dlf::Param("Message" , ex.getMessage().str()),
castor::dlf::Param("Code" , ex.code())};
CASTOR_DLF_WRITEPC(cuuid, DLF_LVL_ERROR,
AGGREGATOR_FAILED_TO_GIVE_FILE_INFO, params);
AGGREGATOR_NO_MORE_FILES_TO_MIGRATE, params);
// Tell RTCPD there is no file by sending an empty file list
RtcpTxRx::tellRtcpdEndOfFileList(socketFd, RTCPDNETRWTIMEOUT);
thereIsMoreWork = false;
}
// Acknowledge request for more work from RTCPD
RtcpAcknowledgeMsg ackMsg;
ackMsg.magic = RTCOPY_MAGIC;
ackMsg.reqType = RTCP_FILE_REQ;
ackMsg.status = 0;
RtcpTxRx::sendRtcpAcknowledge(socketFd, RTCPDNETRWTIMEOUT, ackMsg);
// Else recalling
} else {
// TBD
}
break;
*/
case RTCP_POSITIONED:
{
castor::dlf::Param params[] = {
......
......@@ -483,7 +483,7 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::coordinateRemoteCopy(
RtcpFileRqstErrMsgBody rtcpFileReply;
Utils::setBytes(rtcpFileReply, '\0');
// If it is a Migration
// If migrating
if(rtcpVolume.mode == WRITE_ENABLE) {
char filePath[CA_MAXPATHLEN+1];
......@@ -526,38 +526,30 @@ void castor::tape::aggregator::VdqmRequestHandlerThread::coordinateRemoteCopy(
return;
}
// Send volume to RTCPD
RtcpTxRx::giveVolumeToRtcpd(rtcpdCallbackSocketFd, RTCPDNETRWTIMEOUT,
RtcpTxRx::giveVolumeToRtcpd(rtcpdInitialSocketFd.get(), RTCPDNETRWTIMEOUT,
rtcpVolume);
// Send file to migrate to RTCPD
// Always use a umask of 022 when migrating
char tapePath[CA_MAXPATHLEN+1];
Utils::toHex(fileId, tapePath);
RtcpTxRx::giveFileToRtcpd(rtcpdCallbackSocketFd, RTCPDNETRWTIMEOUT,
volReqId, filePath, tapePath, 022);
RtcpTxRx::giveFileToRtcpd(rtcpdInitialSocketFd.get(), RTCPDNETRWTIMEOUT,
volReqId, filePath, tapePath, MIGRATEUMASK);
// Send offer of more work to RTCPD
RtcpTxRx::giveRequestForMoreWorkToRtcpd(rtcpdCallbackSocketFd,
RtcpTxRx::giveRequestForMoreWorkToRtcpd(rtcpdInitialSocketFd.get(),
RTCPDNETRWTIMEOUT, volReqId);
// Send end of file list to RTPCD
RtcpTxRx::tellRtcpdEndOfFileList(rtcpdCallbackSocketFd, RTCPDNETRWTIMEOUT);
RtcpTxRx::tellRtcpdEndOfFileList(rtcpdInitialSocketFd.get(), RTCPDNETRWTIMEOUT);
} else { // It is a Recall
// Send volume to RTCPD
RtcpTxRx::giveVolumeToRtcpd(rtcpdCallbackSocketFd, RTCPDNETRWTIMEOUT,
// Else recalling
} else {
RtcpTxRx::giveVolumeToRtcpd(rtcpdInitialSocketFd.get(), RTCPDNETRWTIMEOUT,
rtcpVolume);
// Send offer of more work to RTCPD
RtcpTxRx::giveRequestForMoreWorkToRtcpd(rtcpdCallbackSocketFd,
RtcpTxRx::giveRequestForMoreWorkToRtcpd(rtcpdInitialSocketFd.get(),
RTCPDNETRWTIMEOUT, volReqId);
// Send end of file list to RTPCD
RtcpTxRx::tellRtcpdEndOfFileList(rtcpdCallbackSocketFd, RTCPDNETRWTIMEOUT);
RtcpTxRx::tellRtcpdEndOfFileList(rtcpdInitialSocketFd.get(), RTCPDNETRWTIMEOUT);
}
// Process the RTCPD sockets
processRtcpdSockets(cuuid, volReqId, gatewayHost, gatewayPort,
rtcpVolume.mode, rtcpdCallbackSocketFd, rtcpdInitialSocketFd.get());
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment