Skip to content
Snippets Groups Projects
Commit d36f1579 authored by Eric Cano's avatar Eric Cano
Browse files

Switched migration task inject to bulk mode.

parent 7048f4b5
No related branches found
No related tags found
No related merge requests found
......@@ -50,7 +50,7 @@ namespace daemon {
//------------------------------------------------------------------------------
//injectBulkMigrations
//------------------------------------------------------------------------------
void MigrationTaskInjector::injectBulkMigrations(const std::vector<cta::ArchiveJob *>& jobs){
void MigrationTaskInjector::injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs){
const uint64_t blockCapacity = m_memManager.blockCapacity();
for(auto it= jobs.begin();it!=jobs.end();++it){
......@@ -64,8 +64,12 @@ namespace daemon {
const uint64_t neededBlock = howManyBlocksNeeded(fileSize,blockCapacity);
std::unique_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock, *it, m_memManager, m_errorFlag));
std::unique_ptr<DiskReadTask> drt(new DiskReadTask(*twt, *it, neededBlock, m_errorFlag));
// We give owner ship on the archive job to the tape write task (as last user).
// disk read task gets a bare pointer.
// TODO: could be changed as a shared_ptr.
auto archiveJobPtr=it->get();
std::unique_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock, it->release(), m_memManager, m_errorFlag));
std::unique_ptr<DiskReadTask> drt(new DiskReadTask(*twt, archiveJobPtr, neededBlock, m_errorFlag));
m_tapeWriter.push(twt.release());
m_diskReader.push(drt.release());
......@@ -101,17 +105,9 @@ namespace daemon {
//synchronousInjection
//------------------------------------------------------------------------------
bool MigrationTaskInjector::synchronousInjection() {
std::vector<cta::ArchiveJob *> jobs;
std::list<std::unique_ptr<cta::ArchiveJob> > jobs;
try {
uint64_t files=0;
uint64_t bytes=0;
while(files<=m_maxFiles && bytes<=m_maxBytes) {
std::unique_ptr<cta::ArchiveJob> job=m_archiveMount.getNextJob(m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.fileSize;
jobs.push_back(job.release());
}
jobs = m_archiveMount.getNextJobBatch(m_maxFiles, m_maxBytes,m_lc);
} catch (cta::exception::Exception & ex) {
cta::log::ScopedParamContainer scoped(m_lc);
scoped.add("transactionId", m_archiveMount.getMountTransactionId())
......@@ -162,18 +158,10 @@ namespace daemon {
throw castor::tape::tapeserver::daemon::ErrorFlag();
}
Request req = m_parent.m_queue.pop();
std::vector<cta::ArchiveJob *> jobs;
uint64_t files=0;
auto jobs = m_parent.m_archiveMount.getNextJobBatch(req.filesRequested, req.filesRequested, m_parent.m_lc);
uint64_t files=jobs.size();
uint64_t bytes=0;
while(files<=req.filesRequested && bytes<=req.bytesRequested) {
std::unique_ptr<cta::ArchiveJob> job=m_parent.m_archiveMount.getNextJob(m_parent.m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.archiveFileID;
jobs.push_back(job.release());
}
for (auto & j:jobs) bytes+=j->archiveFile.fileSize;
if(jobs.empty()){
if (req.lastCall) {
m_parent.m_lc.log(cta::log::INFO,"No more file to migrate: triggering the end of session.");
......
......@@ -124,7 +124,7 @@ private:
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs the list of FileToMigrateStructs we have to transform in a pair of task
*/
void injectBulkMigrations(const std::vector<cta::ArchiveJob *>& jobs);
void injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs);
/*Compute how many blocks are needed for a file of fileSize bytes*/
size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment