Commit d3e90821 authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Putting in archive and retrieve job

parent d6acca9b
......@@ -21,9 +21,9 @@
namespace cta {
ArchiveFileInfo::ArchiveFileInfo(
const std::string& lastKnownPath, uint64_t fileId, uint64_t size,
const Checksum & checksum):
const Checksum & checksum, const time_t lastModificationTime):
lastKnownPath(lastKnownPath),
fileId(fileId),
size(size),
checksum(checksum) {}
checksum(checksum), lastModificationTime(lastModificationTime) {}
}
\ No newline at end of file
......@@ -28,10 +28,11 @@ namespace cta {
class ArchiveFileInfo {
public:
ArchiveFileInfo(const std::string & lastKnownPath, uint64_t fileId,
uint64_t size, const Checksum & checksum);
uint64_t size, const Checksum & checksum, const time_t lastModificationTime);
const std::string lastKnownPath; /**< The location of the file at NS lookup time */
const uint64_t fileId; /**< The file ID (to be used in the tape copy header, among other */
const uint64_t size; /**< The file's size */
const Checksum checksum;
const time_t lastModificationTime;
};
}
\ No newline at end of file
......@@ -67,6 +67,11 @@ struct TapeCopyInfo {
* The block identifier of the file.
*/
uint64_t blockId;
/**
* The hostname of the nameserver holding the file
*/
std::string nsHostName;
}; // struct TapeCopyLocation
......
......@@ -50,6 +50,11 @@ struct TapeCopyLocationAndStatus {
uint64_t size; /**< The tape copy's size */
uint64_t fileId; /**< The file ID as recorded in the tape copy's header */
Checksum checksum;/**< The tape copy's checksum */
/**
* The hostname of the nameserver holding the file
*/
std::string nsHostName;
}; // class TapeCopyLocationAndStatus
} // namespace cta
......@@ -31,10 +31,12 @@ cta::ArchiveJob::ArchiveJob(
const std::string &tapePoolName,
const ArchiveFileInfo & archiveFile,
const RemotePathAndStatus &remoteFile,
const TapeCopyLocationAndStatus &copyLocation,
const uint16_t copyNb):
tapePoolName(tapePoolName),
archiveFile(archiveFile),
remoteFile(remoteFile),
copyLocation(copyLocation),
copyNumber(copyNb) {}
//------------------------------------------------------------------------------
......
......@@ -61,6 +61,7 @@ private:
const std::string &tapePoolName,
const ArchiveFileInfo & archiveFile,
const RemotePathAndStatus &remoteFile,
const TapeCopyLocationAndStatus &copyLocation,
const uint16_t copyNb);
public:
......@@ -136,10 +137,11 @@ public:
* are recording this tape copy. */
const RemotePathAndStatus remoteFile;
/**< The size of the file to be archived in bytes. */
TapeCopyLocationAndStatus copyLocation;
const uint16_t copyNumber;
/**< The copy number for this tape copy */
CTA_GENERATE_EXCEPTION_CLASS(LocationNotSet);
const TapeCopyLocationAndStatus & getTapeCopyLocationAndStatus();
/**< Accessor to the tape location status */
}; // class ArchiveJob
......
......@@ -58,6 +58,11 @@ std::string cta::ArchiveMount::getDensity() const throw() {
std::string cta::ArchiveMount::getMountTransactionId() const throw(){
return "UNKNOWN_MOUNTTRANSACTIONID_FOR_ARCHIVE_MOUNT";
}
std::unique_ptr<cta::ArchiveJob> cta::ArchiveMount::getNextJob(){
cta::exception::Exception ex("Not implemented");
throw ex;
}
//------------------------------------------------------------------------------
// complete
......
......@@ -36,7 +36,7 @@ cta::RetrieveJob::RetrieveJob(
const uint32_t copyNb,
const std::string &remoteFile,
const uint64_t castorNsFileId):
tapeCopyLocation(tapeCopyLocation) {
tapeCopyLocation(tapeCopyLocation), m_id(id) {
}
//------------------------------------------------------------------------------
......
......@@ -115,6 +115,8 @@ public:
TapeCopyInfo tapeCopyLocation; /**<The location of the source tape file. */
PositioningMethod positioningMethod; /**< The desired positioning method. */
RemotePath remoteFilePath; /** <The location of the destination file. */
std::string m_id;
uint64_t m_fileSize;
}; // struct RetrieveJob
} // namespace cta
......@@ -59,6 +59,12 @@ std::string cta::RetrieveMount::getMountTransactionId() const throw(){
return "UNKNOWN_MOUNTTRANSACTIONID_FOR_RETRIEVE_MOUNT";
}
std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob() {
cta::exception::Exception ex("Not implemented");
throw ex;
}
//------------------------------------------------------------------------------
// complete
//------------------------------------------------------------------------------
......
......@@ -28,7 +28,7 @@ include_directories(${PROJECT_BINARY_DIR})
find_package( ZLIB REQUIRED )
add_library(castorTapeServerDaemon
add_library(castorTapeServerDaemon
AdminAcceptHandler.cpp
AdminConnectionHandler.cpp
../client/ClientProxy.cpp
......@@ -76,7 +76,7 @@ add_library(castorTapeServerDaemon
TpconfigLine.cpp
TpconfigLines.cpp)
target_link_libraries(castorTapeServerDaemon castormessages castortapereactor ctascheduler ctacommon ctanameserver ctaremotens ctaOStoreSchedulerDB protobuf CTAObjectStore)
target_link_libraries(castorTapeServerDaemon castormessages castortapereactor ctacommon ctanameserver ctaremotens ctaOStoreSchedulerDB protobuf CTAObjectStore ctascheduler)
add_dependencies(castorTapeServerDaemon castormessagesprotobuf)
add_library(castorTapeServerDaemonTestDummies
......@@ -87,6 +87,7 @@ add_library(castorTapeServerDaemonTestDummies
../../tpcp/TapeFseqRange.cpp)
add_executable(tapeserverd TapeDaemon.cpp)
target_link_libraries(tapeserverd castorTapeServerDaemon SCSI System Utils File TapeDrive castorcommon castorclient castorlegacymsg castorserver castortapegatewayprotocol castortapereactor ${LIBCAP_LIB} ${ZLIB_LIBRARIES} castormessages zmq)
# With the exception of shared-library plugins, the CASTOR rpms only install the
# /usr/lib64/libcastor*.so symbolic links for libraries used by end-user
......@@ -101,21 +102,3 @@ set_target_properties(castorserver PROPERTIES
add_library(castortapegatewayprotocol SHARED IMPORTED)
set_target_properties(castortapegatewayprotocol PROPERTIES
IMPORTED_LOCATION /usr/lib64/libcastortapegatewayprotocol.so.2.1)
target_link_libraries(tapeserverd
castorTapeServerDaemon
SCSI
System
Utils
File
TapeDrive
castorcommon
castorclient
castorlegacymsg
castorserver
castortapegatewayprotocol
castortapereactor
${LIBCAP_LIB}
${ZLIB_LIBRARIES}
castormessages
zmq)
......@@ -158,7 +158,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
m_castorConf.remoteFileProtocol,
m_castorConf.xrootPrivateKey,
m_castorConf.moverHandlerPort);
RecallTaskInjector rti(mm, trst, dwtp, m_clientProxy,
RecallTaskInjector rti(mm, trst, dwtp, *retrieveMount,
m_castorConf.bulkRequestRecallMaxFiles,
m_castorConf.bulkRequestRecallMaxBytes,lc);
// Workaround for bug CASTOR-4829: tapegateway: should request positioning by blockid for recalls instead of fseq
......@@ -255,7 +255,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
m_castorConf.remoteFileProtocol,
m_castorConf.xrootPrivateKey,
m_castorConf.moverHandlerPort);
MigrationTaskInjector mti(mm, drtp, twst, m_clientProxy,
MigrationTaskInjector mti(mm, drtp, twst, *archiveMount,
m_castorConf.bulkRequestMigrationMaxFiles,
m_castorConf.bulkRequestMigrationMaxBytes,lc);
drtp.setTaskInjector(&mti);
......
......@@ -35,9 +35,9 @@ namespace daemon {
// constructor
//------------------------------------------------------------------------------
DiskReadTask::DiskReadTask(DataConsumer & destination,
tape::tapegateway::FileToMigrateStruct* file,
cta::ArchiveJob *archiveJob,
size_t numberOfBlock,castor::server::AtomicFlag& errorFlag):
m_nextTask(destination),m_migratedFile(file),
m_nextTask(destination),m_archiveJob(archiveJob),
m_numberOfBlock(numberOfBlock),m_errorFlag(errorFlag)
{}
......@@ -52,7 +52,7 @@ void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & file
castor::utils::Timer localTime;
castor::utils::Timer totalTime(localTime);
size_t blockId=0;
size_t migratingFileSize=m_migratedFile->fileSize();
size_t migratingFileSize=m_archiveJob->archiveFile.size;
MemBlock* mb=NULL;
// This out-of-try-catch variables allows us to record the stage of the
// process we're in, and to count the error if it occurs.
......@@ -66,9 +66,9 @@ void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & file
checkMigrationFailing();
currentErrorToCount = "Error_diskOpenForRead";
std::unique_ptr<tape::diskFile::ReadFile> sourceFile(
fileFactory.createReadFile(m_migratedFile->path()));
fileFactory.createReadFile(m_archiveJob->archiveFile.lastKnownPath));
log::ScopedParamContainer URLcontext(lc);
URLcontext.add("path", m_migratedFile->path())
URLcontext.add("path", m_archiveJob->archiveFile.lastKnownPath)
.add("actualURL", sourceFile->URL());
currentErrorToCount = "Error_diskFileToReadSizeMismatch";
if(migratingFileSize != sourceFile->size()){
......@@ -79,7 +79,7 @@ void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & file
m_stats.openingTime+=localTime.secs(castor::utils::Timer::resetCounter);
LogContext::ScopedParam sp(lc, Param("filePath",m_migratedFile->path()));
LogContext::ScopedParam sp(lc, Param("filePath",m_archiveJob->archiveFile.lastKnownPath));
lc.log(LOG_INFO,"Opened disk file for read");
while(migratingFileSize>0){
......@@ -90,7 +90,7 @@ void DiskReadTask::execute(log::LogContext& lc, diskFile::DiskFileFactory & file
m_stats.waitFreeMemoryTime+=localTime.secs(castor::utils::Timer::resetCounter);
//set metadata and read the data
mb->m_fileid = m_migratedFile->fileid();
mb->m_fileid = m_archiveJob->copyLocation.fileId;
mb->m_fileBlock = blockId++;
currentErrorToCount = "Error_diskRead";
......@@ -180,7 +180,7 @@ void DiskReadTask::circulateAllBlocks(size_t fromBlockId, MemBlock * mb){
mb = m_nextTask.getFreeBlock();
++blockId;
}
mb->m_fileid = m_migratedFile->fileid();
mb->m_fileid = m_archiveJob->copyLocation.fileId;
mb->markAsCancelled();
m_nextTask.pushDataBlock(mb);
mb=NULL;
......@@ -208,8 +208,8 @@ void DiskReadTask::logWithStat(int level,const std::string& msg,log::LogContext&
m_stats.transferTime?1.0*m_stats.dataVolume/1000/1000/m_stats.transferTime:0)
.add("openRWCloseToTransferTimeRatio",
m_stats.transferTime?(m_stats.openingTime+m_stats.readWriteTime+m_stats.closingTime)/m_stats.transferTime:0.0)
.add("FILEID",m_migratedFile->fileid())
.add("path",m_migratedFile->path());
.add("FILEID",m_archiveJob->copyLocation.fileId)
.add("path",m_archiveJob->archiveFile.lastKnownPath);
lc.log(level,msg);
}
......
......@@ -46,7 +46,7 @@ public:
* @param numberOfBlock number of memory block we need read the whole file
*/
DiskReadTask(DataConsumer & destination,
tape::tapegateway::FileToMigrateStruct* file,size_t numberOfBlock,
cta::ArchiveJob *archiveJob,size_t numberOfBlock,
castor::server::AtomicFlag& errorFlag);
void execute(log::LogContext& lc, diskFile::DiskFileFactory & fileFactory,
......@@ -96,7 +96,7 @@ private:
/**
* All we need to know about the file we are migrating
*/
std::unique_ptr<tape::tapegateway::FileToMigrateStruct> m_migratedFile;
std::unique_ptr<cta::ArchiveJob> m_archiveJob;
/**
* The number of memory block we will need to read the whole file
......
......@@ -36,8 +36,8 @@ namespace daemon {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
DiskWriteTask::DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,RecallMemoryManager& mm):
m_recallingFile(file),m_memManager(mm){
DiskWriteTask::DiskWriteTask(cta::RetrieveJob *retrieveJob, RecallMemoryManager& mm):
m_retrieveJob(retrieveJob),m_memManager(mm){
}
......@@ -52,10 +52,10 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
castor::utils::Timer totalTime(localTime);
castor::utils::Timer transferTime(localTime);
log::ScopedParamContainer URLcontext(lc);
URLcontext.add("NSFILEID",m_recallingFile->fileid())
.add("path", m_recallingFile->path())
.add("fileTransactionId",m_recallingFile->fileTransactionId())
.add("fSeq",m_recallingFile->fseq());
URLcontext.add("NSFILEID",m_retrieveJob->tapeCopyLocation.fileId)
.add("path", m_retrieveJob->tapeCopyLocation.filePath)
.add("fileTransactionId",m_retrieveJob->m_id)
.add("fSeq",m_retrieveJob->tapeCopyLocation.fseq);
// This out-of-try-catch variables allows us to record the stage of the
// process we're in, and to count the error if it occurs.
// We will not record errors for an empty string. This will allow us to
......@@ -90,7 +90,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
// Synchronise the counter with the open time counter.
currentErrorToCount = "Error_diskOpenForWrite";
transferTime = localTime;
writeFile.reset(fileFactory.createWriteFile(m_recallingFile->path()));
writeFile.reset(fileFactory.createWriteFile(m_retrieveJob->tapeCopyLocation.filePath));
URLcontext.add("actualURL", writeFile->URL());
lc.log(LOG_INFO, "Opened disk file for writing");
m_stats.openingTime+=localTime.secs(castor::utils::Timer::resetCounter);
......@@ -120,7 +120,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
currentErrorToCount = "";
}
} //end of while(1)
reporter.reportCompletedJob(*m_recallingFile,checksum,m_stats.dataVolume);
reporter.reportCompletedJob(std::move(m_retrieveJob),checksum,m_stats.dataVolume);
m_stats.waitReportingTime+=localTime.secs(castor::utils::Timer::resetCounter);
m_stats.transferTime = transferTime.secs();
m_stats.totalTime = totalTime.secs();
......@@ -153,7 +153,7 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
.add("errorCode", e.code());
logWithStat(LOG_ERR, "File writing to disk failed.", lc);
lc.logBacktrace(LOG_ERR, e.backtrace());
reporter.reportFailedJob(*m_recallingFile,e.getMessageValue(),e.code());
reporter.reportFailedJob(std::move(m_retrieveJob),e.getMessageValue(),e.code());
//got an exception, return false
......@@ -200,7 +200,7 @@ void DiskWriteTask::releaseAllBlock(){
//------------------------------------------------------------------------------
void DiskWriteTask::checkErrors(MemBlock* mb,int blockId,castor::log::LogContext& lc){
using namespace castor::log;
if(m_recallingFile->fileid() != static_cast<unsigned int>(mb->m_fileid)
if(m_retrieveJob->tapeCopyLocation.fileId != static_cast<unsigned int>(mb->m_fileid)
|| blockId != mb->m_fileBlock || mb->isFailed() ){
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(lc, Param("received_NSFILEID", mb->m_fileid)),
......@@ -253,8 +253,8 @@ void DiskWriteTask::logWithStat(int level,const std::string& msg,log::LogContext
m_stats.transferTime?1.0*m_stats.dataVolume/1000/1000/m_stats.transferTime:0)
.add("openRWCloseToTransferTimeRatio",
m_stats.transferTime?(m_stats.openingTime+m_stats.readWriteTime+m_stats.closingTime)/m_stats.transferTime:0.0)
.add("FILEID",m_recallingFile->fileid())
.add("path",m_recallingFile->path());
.add("FILEID",m_retrieveJob->tapeCopyLocation.fileId)
.add("path",m_retrieveJob->tapeCopyLocation.filePath);
lc.log(level,msg);
}
}}}}
......
......@@ -58,7 +58,8 @@ public:
* @param file: All we need to know about the file we are recalling
* @param mm: memory manager of the session
*/
DiskWriteTask(tape::tapegateway::FileToRecallStruct* file,RecallMemoryManager& mm);
DiskWriteTask(cta::RetrieveJob *retrieveJob, RecallMemoryManager& mm);
/**
* Main routine: takes each memory block in the fifo and writes it to disk
* @return true if the file has been successfully written false otherwise.
......@@ -115,10 +116,11 @@ private:
* The fifo containing the memory blocks holding data to be written to disk
*/
castor::server::BlockingQueue<MemBlock *> m_fifo;
/**
* All we need to know about the file we are currently recalling
*/
std::unique_ptr<tape::tapegateway::FileToRecallStruct> m_recallingFile;
std::unique_ptr<cta::RetrieveJob> m_retrieveJob;
/**
* Reference to the Memory Manager in use
......
......@@ -62,18 +62,18 @@ MigrationReportPacker::~MigrationReportPacker(){
//reportCompletedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportCompletedJob(
const tapegateway::FileToMigrateStruct& migratedFile,u_int32_t checksum,
std::unique_ptr<cta::ArchiveJob> successfulArchiveJob,u_int32_t checksum,
u_int32_t blockId) {
std::unique_ptr<Report> rep(new ReportSuccessful(migratedFile,checksum,blockId));
std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulArchiveJob),checksum,blockId));
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------
void MigrationReportPacker::reportFailedJob(const tapegateway::FileToMigrateStruct& migratedFile,
void MigrationReportPacker::reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,
const std::string& msg,int error_code){
std::unique_ptr<Report> rep(new ReportError(migratedFile,msg,error_code));
std::unique_ptr<Report> rep(new ReportError(std::move(failedArchiveJob),msg,error_code));
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
......
......@@ -29,6 +29,7 @@
#include "castor/tape/tapeserver/daemon/ReportPackerInterface.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "scheduler/ArchiveMount.hpp"
#include "scheduler/ArchiveJob.hpp"
#include <list>
#include <memory>
......@@ -56,7 +57,7 @@ public:
* of the file. This is 0 (instead of 1) for the first file on the tape (aka
* fseq = 1).
*/
void reportCompletedJob(const tapegateway::FileToMigrateStruct& migratedFile,
void reportCompletedJob(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob,
u_int32_t checksum, u_int32_t blockId);
/**
......@@ -66,7 +67,7 @@ public:
* @param msg the error message to the failure
* @param error_code the error code related to the failure
*/
void reportFailedJob(const tapegateway::FileToMigrateStruct& migratedFile,const std::string& msg,int error_code);
void reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob,const std::string& msg,int error_code);
/**
* Create into the MigrationReportPacker a report for the signaling a flusing on tape
......@@ -104,7 +105,6 @@ private:
virtual void execute(MigrationReportPacker& packer)=0;
};
class ReportSuccessful : public Report {
const FileStruct m_migratedFile;
const unsigned long m_checksum;
const uint32_t m_blockId;
......@@ -113,9 +113,8 @@ private:
*/
std::unique_ptr<cta::ArchiveJob> m_successfulArchiveJob;
public:
ReportSuccessful(const FileStruct& file,unsigned long checksum,
u_int32_t blockId):
m_migratedFile(file),m_checksum(checksum),m_blockId(blockId){}
ReportSuccessful(std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, unsigned long checksum, u_int32_t blockId):
m_checksum(checksum),m_blockId(blockId), m_successfulArchiveJob(std::move(successfulArchiveJob)) {}
virtual void execute(MigrationReportPacker& reportPacker);
};
class ReportFlush : public Report {
......@@ -146,7 +145,6 @@ private:
void execute(MigrationReportPacker& reportPacker);
};
class ReportError : public Report {
const FileStruct m_migratedFile;
const std::string m_error_msg;
const int m_error_code;
......@@ -155,8 +153,8 @@ private:
*/
std::unique_ptr<cta::ArchiveJob> m_failedArchiveJob;
public:
ReportError(const FileStruct& file,std::string msg,int error_code):
m_migratedFile(file),m_error_msg(msg),m_error_code(error_code){}
ReportError(std::unique_ptr<cta::ArchiveJob> failedArchiveJob, std::string msg,int error_code):
m_error_msg(msg), m_error_code(error_code), m_failedArchiveJob(std::move(failedArchiveJob)){}
virtual void execute(MigrationReportPacker& reportPacker);
};
......
......@@ -24,22 +24,6 @@
#include "castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp"
#include "castor/tape/tapegateway/FilesToMigrateList.hpp"
#include "castor/tape/tapeserver/daemon/ErrorFlag.hpp"
//#include "log.h"
namespace{
/*
* function to set a NULL the owning FilesToMigrateList of a FileToMigrateStruct
* Indeed, a clone of a structure will only do a shallow copy (sic).
* Otherwise at the second destruction the object will try to remove itself
* from the owning list and then boom !
* @param ptr a pointer to an object to change
* @return the parameter ptr
*/
castor::tape::tapegateway::FileToMigrateStruct* removeOwningList(castor::tape::tapegateway::FileToMigrateStruct* ptr){
ptr->setFilesToMigrateList(0);
return ptr;
}
}
using castor::log::LogContext;
using castor::log::Param;
......@@ -54,10 +38,10 @@ namespace daemon {
//------------------------------------------------------------------------------
MigrationTaskInjector::MigrationTaskInjector(MigrationMemoryManager & mm,
DiskReadThreadPool & diskReader,
TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,client::ClientInterface& client,
TapeSingleThreadInterface<TapeWriteTask> & tapeWriter,cta::ArchiveMount &archiveMount,
uint64_t maxFiles, uint64_t byteSizeThreshold,castor::log::LogContext lc):
m_thread(*this),m_memManager(mm),m_tapeWriter(tapeWriter),
m_diskReader(diskReader),m_client(client),m_lc(lc),
m_diskReader(diskReader),m_archiveMount(archiveMount),m_lc(lc),
m_maxFiles(maxFiles), m_maxBytes(byteSizeThreshold)
{
......@@ -67,29 +51,23 @@ namespace daemon {
//------------------------------------------------------------------------------
//injectBulkMigrations
//------------------------------------------------------------------------------
void MigrationTaskInjector::injectBulkMigrations(
const std::vector<tapegateway::FileToMigrateStruct*>& jobs){
void MigrationTaskInjector::injectBulkMigrations(const std::vector<cta::ArchiveJob *>& jobs){
const u_signed64 blockCapacity = m_memManager.blockCapacity();
for(std::vector<tapegateway::FileToMigrateStruct*>::const_iterator it= jobs.begin();it!=jobs.end();++it){
const u_signed64 fileSize = (*it)->fileSize();
for(auto it= jobs.begin();it!=jobs.end();++it){
const u_signed64 fileSize = (*it)->archiveFile.size;
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("NSHOSTNAME", (*it)->nshost())),
LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->fileid())),
LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->fseq())),
LogContext::ScopedParam(m_lc, Param("path", (*it)->path()))
LogContext::ScopedParam(m_lc, Param("NSHOSTNAME", (*it)->copyLocation.nsHostName)),
LogContext::ScopedParam(m_lc, Param("NSFILEID", (*it)->copyLocation.fileId)),
LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->copyLocation.fSeq)),
LogContext::ScopedParam(m_lc, Param("path", (*it)->archiveFile.lastKnownPath))
};
tape::utils::suppresUnusedVariable(sp);
tape::utils::suppresUnusedVariable(sp);
const u_signed64 neededBlock = howManyBlocksNeeded(fileSize,blockCapacity);
std::unique_ptr<TapeWriteTask> twt(
new TapeWriteTask(neededBlock,removeOwningList((*it)->clone()),m_memManager,m_errorFlag)
);
std::unique_ptr<DiskReadTask> drt(
new DiskReadTask(*twt,removeOwningList((*it)->clone()),neededBlock,m_errorFlag)
);
std::unique_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock, *it, m_memManager, m_errorFlag));
std::unique_ptr<DiskReadTask> drt(new DiskReadTask(*twt, *it, neededBlock, m_errorFlag));
m_tapeWriter.push(twt.release());
m_diskReader.push(drt.release());
......@@ -125,15 +103,20 @@ namespace daemon {
//synchronousInjection
//------------------------------------------------------------------------------
bool MigrationTaskInjector::synchronousInjection() {
client::ClientProxy::RequestReport reqReport;
std::unique_ptr<tapegateway::FilesToMigrateList>
filesToMigrateList;
std::vector<cta::ArchiveJob *> jobs;
try {
filesToMigrateList.reset(m_client.getFilesToMigrate(m_maxFiles,
m_maxBytes,reqReport));
uint64_t files=0;
uint64_t bytes=0;
while(files<=m_maxFiles && bytes<=m_maxBytes) {
std::unique_ptr<cta::ArchiveJob> job=m_archiveMount.getNextJob();
if(!job.get()) break;
jobs.push_back(job.release());
files++;
bytes+=job->archiveFile.size;
}
} catch (castor::exception::Exception & ex) {
castor::log::ScopedParamContainer scoped(m_lc);
scoped.add("transactionId", reqReport.transactionId)
scoped.add("transactionId", m_archiveMount.getMountTransactionId())
.add("byteSizeThreshold",m_maxBytes)
.add("maxFiles", m_maxFiles)
.add("message", ex.getMessageValue());
......@@ -141,16 +124,13 @@ namespace daemon {
return false;
}
castor::log::ScopedParamContainer scoped(m_lc);
scoped.add("sendRecvDuration", reqReport.sendRecvDuration)
.add("byteSizeThreshold",m_maxBytes)
.add("transactionId", reqReport.transactionId)