Commit c6bd96e1 authored by Eric Cano's avatar Eric Cano
Browse files

Switched migration task inject to bulk mode.

parent a1ae4844
......@@ -50,7 +50,7 @@ namespace daemon {
//------------------------------------------------------------------------------
//injectBulkMigrations
//------------------------------------------------------------------------------
void MigrationTaskInjector::injectBulkMigrations(const std::vector<cta::ArchiveJob *>& jobs){
void MigrationTaskInjector::injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs){
const uint64_t blockCapacity = m_memManager.blockCapacity();
for(auto it= jobs.begin();it!=jobs.end();++it){
......@@ -64,8 +64,12 @@ namespace daemon {
const uint64_t neededBlock = howManyBlocksNeeded(fileSize,blockCapacity);
std::unique_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock, *it, m_memManager, m_errorFlag));
std::unique_ptr<DiskReadTask> drt(new DiskReadTask(*twt, *it, neededBlock, m_errorFlag));
// We give owner ship on the archive job to the tape write task (as last user).
// disk read task gets a bare pointer.
// TODO: could be changed as a shared_ptr.
auto archiveJobPtr=it->get();
std::unique_ptr<TapeWriteTask> twt(new TapeWriteTask(neededBlock, it->release(), m_memManager, m_errorFlag));
std::unique_ptr<DiskReadTask> drt(new DiskReadTask(*twt, archiveJobPtr, neededBlock, m_errorFlag));
m_tapeWriter.push(twt.release());
m_diskReader.push(drt.release());
......@@ -101,17 +105,9 @@ namespace daemon {
//synchronousInjection
//------------------------------------------------------------------------------
bool MigrationTaskInjector::synchronousInjection() {
std::vector<cta::ArchiveJob *> jobs;
std::list<std::unique_ptr<cta::ArchiveJob> > jobs;
try {
uint64_t files=0;
uint64_t bytes=0;
while(files<=m_maxFiles && bytes<=m_maxBytes) {
std::unique_ptr<cta::ArchiveJob> job=m_archiveMount.getNextJob(m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.fileSize;
jobs.push_back(job.release());
}
jobs = m_archiveMount.getNextJobBatch(m_maxFiles, m_maxBytes,m_lc);
} catch (cta::exception::Exception & ex) {
cta::log::ScopedParamContainer scoped(m_lc);
scoped.add("transactionId", m_archiveMount.getMountTransactionId())
......@@ -162,18 +158,10 @@ namespace daemon {
throw castor::tape::tapeserver::daemon::ErrorFlag();
}
Request req = m_parent.m_queue.pop();
std::vector<cta::ArchiveJob *> jobs;
uint64_t files=0;
auto jobs = m_parent.m_archiveMount.getNextJobBatch(req.filesRequested, req.filesRequested, m_parent.m_lc);
uint64_t files=jobs.size();
uint64_t bytes=0;
while(files<=req.filesRequested && bytes<=req.bytesRequested) {
std::unique_ptr<cta::ArchiveJob> job=m_parent.m_archiveMount.getNextJob(m_parent.m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.archiveFileID;
jobs.push_back(job.release());
}
for (auto & j:jobs) bytes+=j->archiveFile.fileSize;
if(jobs.empty()){
if (req.lastCall) {
m_parent.m_lc.log(cta::log::INFO,"No more file to migrate: triggering the end of session.");
......
......@@ -124,7 +124,7 @@ private:
* Create all the tape-read and write-disk tasks for set of files to retrieve
* @param jobs the list of FileToMigrateStructs we have to transform in a pair of task
*/
void injectBulkMigrations(const std::vector<cta::ArchiveJob *>& jobs);
void injectBulkMigrations(std::list<std::unique_ptr<cta::ArchiveJob>>& jobs);
/*Compute how many blocks are needed for a file of fileSize bytes*/
size_t howManyBlocksNeeded(size_t fileSize,size_t blockCapacity){
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment