Skip to content
Snippets Groups Projects
Commit eda359ee authored by Steven Murray's avatar Steven Murray
Browse files

writetp now supports the bulk protocol

Tests using writetp were no longer passing due to the new
"terminate session early" logic of the tapeserverd daemon.
The daemon was askng for a minimum batch of 250 files. The
writetp command only ever gave one file per batch of work.
parent cb309df0
Branches
Tags
No related merge requests found
......@@ -426,8 +426,7 @@ void castor::tape::tpcp::WriteTpCommand::performTransfer() {
// dispatchMsgHandler
//------------------------------------------------------------------------------
bool castor::tape::tpcp::WriteTpCommand::dispatchMsgHandler(
castor::IObject *const obj, castor::io::AbstractSocket &sock)
{
castor::IObject *const obj, castor::io::AbstractSocket &sock) {
switch(obj->type()) {
case OBJ_FilesToMigrateListRequest:
return handleFilesToMigrateListRequest(obj, sock);
......@@ -458,71 +457,73 @@ bool castor::tape::tpcp::WriteTpCommand::dispatchMsgHandler(
}
}
//------------------------------------------------------------------------------
// handleFilesMigrateListRequest
//------------------------------------------------------------------------------
bool castor::tape::tpcp::WriteTpCommand::handleFilesToMigrateListRequest(
castor::IObject *const obj, castor::io::AbstractSocket &sock) {
IObject *const obj, io::AbstractSocket &sock) {
tapegateway::FilesToMigrateListRequest *msg = NULL;
castMessage(obj, msg, sock);
Helper::displayRcvdMsgIfDebug(*msg, m_cmdLine.debugSet);
const bool anotherFile = m_filenameItor != m_filenames.end();
tapegateway::FilesToMigrateList fileList = createFilesToMigrateList(*msg,
sock);
if(!fileList.filesToMigrate().empty()) {
// Send the FilesToMigrateList message to the tape server
sock.sendObject(fileList);
Helper::displaySentMsgIfDebug(fileList, m_cmdLine.debugSet);
// Else no more files
} else {
// Create the NoMoreFiles message for the tape server
castor::tape::tapegateway::NoMoreFiles noMore;
noMore.setMountTransactionId(m_volReqId);
noMore.setAggregatorTransactionId(msg->aggregatorTransactionId());
// Send the NoMoreFiles message to the tape server
sock.sendObject(noMore);
Helper::displaySentMsgIfDebug(noMore, m_cmdLine.debugSet);
}
return true;
}
//------------------------------------------------------------------------------
// createFilesToMigrateList
//------------------------------------------------------------------------------
castor::tape::tapegateway::FilesToMigrateList
castor::tape::tpcp::WriteTpCommand::createFilesToMigrateList(
tapegateway::FilesToMigrateListRequest &rqst, io::AbstractSocket &sock) {
tapegateway::FilesToMigrateList fileList;
fileList.setMountTransactionId(m_volReqId);
fileList.setAggregatorTransactionId(rqst.aggregatorTransactionId());
if(anotherFile) {
uint64_t bytesToBeMigrated = 0;
// While there is more work and it can be added to the batch
while(m_filenameItor != m_filenames.end() &&
fileList.filesToMigrate().size() < rqst.maxFiles() &&
bytesToBeMigrated < rqst.maxBytes()) {
const std::string filename = *(m_filenameItor++);
// Determine the local file-name
const std::string::size_type localIdx = filename.find(':');
const std::string localFilename =
localIdx == std::string::npos || localIdx == filename.length() - 1 ?
filename : filename.substr(localIdx + 1);
// stat the disk file in order to get the file size
struct stat statBuf;
try {
localStat(localFilename.c_str(), statBuf);
} catch(castor::exception::Exception &ex) {
// Notify the tape server about the exception and rethrow
sendEndNotificationErrorReport(msg->aggregatorTransactionId(),
ex.code(), ex.getMessage().str(), sock);
throw ex;
}
std::auto_ptr<tapegateway::FileToMigrateStruct> file =
createFileToMigrateStruct(rqst, sock, filename);
// Create a FilesToMigrateList message for the tape server
std::auto_ptr<tapegateway::FileToMigrateStruct> file(
new tapegateway::FileToMigrateStruct());
file->setFileTransactionId(m_fileTransactionId);
file->setNshost("tpcp");
file->setFileid(0);
file->setFileSize(statBuf.st_size);
file->setLastKnownFilename(filename);
file->setLastModificationTime(statBuf.st_mtime);
file->setUmask(RTCOPYCONSERVATIVEUMASK);
file->setPath(filename);
file->setPositionCommandCode(tapegateway::TPPOSIT_FSEQ);
file->setFseq(m_nextTapeFseq++);
tapegateway::FilesToMigrateList fileList;
fileList.setMountTransactionId(m_volReqId);
fileList.setAggregatorTransactionId(msg->aggregatorTransactionId());
bytesToBeMigrated += file->fileSize();
fileList.addFilesToMigrate(file.release());
// Update the map of current file transfers and increment the file
// transaction ID
{
m_pendingFileTransfers[m_fileTransactionId] = filename;
m_fileTransactionId++;
}
// Send the FilesToMigrateList message to the tape server
sock.sendObject(fileList);
Helper::displaySentMsgIfDebug(fileList, m_cmdLine.debugSet);
m_pendingFileTransfers[m_fileTransactionId] = filename;
m_fileTransactionId++;
{
// Command-line user feedback
......@@ -534,31 +535,58 @@ bool castor::tape::tpcp::WriteTpCommand::handleFilesToMigrateListRequest(
" Migrating"
" \"" << filename << "\"" << std::endl;
}
}
// Else no more files
} else {
return fileList;
}
// Create the NoMoreFiles message for the tape server
castor::tape::tapegateway::NoMoreFiles noMore;
noMore.setMountTransactionId(m_volReqId);
noMore.setAggregatorTransactionId(msg->aggregatorTransactionId());
//------------------------------------------------------------------------------
// createFileToMigrateStruct
//------------------------------------------------------------------------------
std::auto_ptr<castor::tape::tapegateway::FileToMigrateStruct>
castor::tape::tpcp::WriteTpCommand::createFileToMigrateStruct(
tapegateway::FilesToMigrateListRequest &rqst, io::AbstractSocket &sock,
const std::string &filename) {
// Send the NoMoreFiles message to the tape server
sock.sendObject(noMore);
// Determine the local file-name
const std::string::size_type localIdx = filename.find(':');
const std::string localFilename =
localIdx == std::string::npos || localIdx == filename.length() - 1 ?
filename : filename.substr(localIdx + 1);
Helper::displaySentMsgIfDebug(noMore, m_cmdLine.debugSet);
// stat the disk file in order to get the file size
struct stat statBuf;
try {
localStat(localFilename.c_str(), statBuf);
} catch(castor::exception::Exception &ex) {
// Notify the tape server about the exception and rethrow
sendEndNotificationErrorReport(rqst.aggregatorTransactionId(),
ex.code(), ex.getMessage().str(), sock);
throw ex;
}
return true;
// Create and fill the FileToMigrateStruct
std::auto_ptr<tapegateway::FileToMigrateStruct> file(
new tapegateway::FileToMigrateStruct());
file->setFileTransactionId(m_fileTransactionId);
file->setNshost("tpcp");
file->setFileid(0);
file->setFileSize(statBuf.st_size);
file->setLastKnownFilename(filename);
file->setLastModificationTime(statBuf.st_mtime);
file->setUmask(RTCOPYCONSERVATIVEUMASK);
file->setPath(filename);
file->setPositionCommandCode(tapegateway::TPPOSIT_FSEQ);
file->setFseq(m_nextTapeFseq++);
return file;
}
//------------------------------------------------------------------------------
// handleFileMigrationReportList
//------------------------------------------------------------------------------
bool castor::tape::tpcp::WriteTpCommand::handleFileMigrationReportList(
castor::IObject *const obj, castor::io::AbstractSocket &sock)
{
castor::IObject *const obj, castor::io::AbstractSocket &sock) {
tapegateway::FileMigrationReportList *msg = NULL;
......
......@@ -24,8 +24,13 @@
#pragma once
#include "castor/tape/tapegateway/FileMigratedNotificationStruct.hpp"
#include "castor/tape/tapegateway/FilesToMigrateList.hpp"
#include "castor/tape/tapegateway/FilesToMigrateListRequest.hpp"
#include "castor/tape/tapegateway/FileToMigrateStruct.hpp"
#include "castor/tape/tpcp/TpcpCommand.hpp"
#include <memory>
namespace castor {
namespace tape {
namespace tpcp {
......@@ -155,8 +160,32 @@ private:
* @param sock The socket on which to reply to the tape server.
* @return True if there is more work to be done else false.
*/
bool handleFilesToMigrateListRequest(castor::IObject *const obj,
castor::io::AbstractSocket &sock);
bool handleFilesToMigrateListRequest(IObject *const obj,
io::AbstractSocket &sock);
/**
* Creates a FilesToMigrateList based on the remaining work to be done and on
* how much can be put into the current batch.
*
* @param rqst The request for more work from the tape server.
* @param sock The socket on which to reply to the tape server.
* @return The FilesToMigrateList mesaage.
*/
tapegateway::FilesToMigrateList createFilesToMigrateList(
tapegateway::FilesToMigrateListRequest &rqst, io::AbstractSocket &sock);
/**
* Creates a FileToMigrateStruct for the specified file to be migrated,
*
* @param rqst The request for more work from the tape server.
* @param sock The socket on which to reply to the tape server.
* @param filename The name of the file to be migrated.
* @return The FileToMigrateStruct.
*/
std::auto_ptr<tapegateway::FileToMigrateStruct> createFileToMigrateStruct(
tapegateway::FilesToMigrateListRequest &rqst, io::AbstractSocket &sock,
const std::string &filename);
/**
* FileMigrationReportList message handler.
......@@ -229,5 +258,3 @@ private:
} // namespace tpcp
} // namespace tape
} // namespace castor
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment