From d36f15796d790fbc1cc39231ba506ca1f8ffed44 Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Wed, 14 Jun 2017 17:11:48 +0200 Subject: [PATCH] Switched migration task inject to bulk mode. --- .../daemon/MigrationTaskInjector.cpp | 36 +++++++------------ .../daemon/MigrationTaskInjector.hpp | 2 +- 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp index 922f25cbd4..38d155f28e 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.cpp @@ -50,7 +50,7 @@ namespace daemon { //------------------------------------------------------------------------------ //injectBulkMigrations //------------------------------------------------------------------------------ - void MigrationTaskInjector::injectBulkMigrations(const std::vector<cta::ArchiveJob *>& jobs){ + void MigrationTaskInjector::injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs){ const uint64_t blockCapacity = m_memManager.blockCapacity(); for(auto it= jobs.begin();it!=jobs.end();++it){ @@ -64,8 +64,12 @@ namespace daemon { const uint64_t neededBlock = howManyBlocksNeeded(fileSize,blockCapacity); - std::unique_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock, *it, m_memManager, m_errorFlag)); - std::unique_ptr<DiskReadTask> drt(new DiskReadTask(*twt, *it, neededBlock, m_errorFlag)); + // We give owner ship on the archive job to the tape write task (as last user). + // disk read task gets a bare pointer. + // TODO: could be changed as a shared_ptr. + auto archiveJobPtr=it->get(); + std::unique_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock, it->release(), m_memManager, m_errorFlag)); + std::unique_ptr<DiskReadTask> drt(new DiskReadTask(*twt, archiveJobPtr, neededBlock, m_errorFlag)); m_tapeWriter.push(twt.release()); m_diskReader.push(drt.release()); @@ -101,17 +105,9 @@ namespace daemon { //synchronousInjection //------------------------------------------------------------------------------ bool MigrationTaskInjector::synchronousInjection() { - std::vector<cta::ArchiveJob *> jobs; + std::list<std::unique_ptr<cta::ArchiveJob> > jobs; try { - uint64_t files=0; - uint64_t bytes=0; - while(files<=m_maxFiles && bytes<=m_maxBytes) { - std::unique_ptr<cta::ArchiveJob> job=m_archiveMount.getNextJob(m_lc); - if(!job.get()) break; - files++; - bytes+=job->archiveFile.fileSize; - jobs.push_back(job.release()); - } + jobs = m_archiveMount.getNextJobBatch(m_maxFiles, m_maxBytes,m_lc); } catch (cta::exception::Exception & ex) { cta::log::ScopedParamContainer scoped(m_lc); scoped.add("transactionId", m_archiveMount.getMountTransactionId()) @@ -162,18 +158,10 @@ namespace daemon { throw castor::tape::tapeserver::daemon::ErrorFlag(); } Request req = m_parent.m_queue.pop(); - std::vector<cta::ArchiveJob *> jobs; - - uint64_t files=0; + auto jobs = m_parent.m_archiveMount.getNextJobBatch(req.filesRequested, req.filesRequested, m_parent.m_lc); + uint64_t files=jobs.size(); uint64_t bytes=0; - while(files<=req.filesRequested && bytes<=req.bytesRequested) { - std::unique_ptr<cta::ArchiveJob> job=m_parent.m_archiveMount.getNextJob(m_parent.m_lc); - if(!job.get()) break; - files++; - bytes+=job->archiveFile.archiveFileID; - jobs.push_back(job.release()); - } - + for (auto & j:jobs) bytes+=j->archiveFile.fileSize; if(jobs.empty()){ if (req.lastCall) { m_parent.m_lc.log(cta::log::INFO,"No more file to migrate: triggering the end of session."); diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp index ddd2fb027e..0cf7808f47 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationTaskInjector.hpp @@ -124,7 +124,7 @@ private: * Create all the tape-read and write-disk tasks for set of files to retrieve * @param jobs the list of FileToMigrateStructs we have to transform in a pair of task */ - void injectBulkMigrations(const std::vector<cta::ArchiveJob *>& jobs); + void injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs); /*Compute how many blocks are needed for a file of fileSize bytes*/ size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){ -- GitLab