Commit d733b4c7 authored by Eric Cano's avatar Eric Cano
Browse files

Reworked OStoreDB::ArchiveMount::getNextJobBatch().

This new version locks the queues for less time.
Fixed a bug where the wrong lock (shared and not exclusive) was taken when removing empty queues
from the root entry.
Improved multiple iterations retrying (we can now retry on a new queue if one gets contructed
in the meantime.
parent 9e329479
......@@ -1582,13 +1582,13 @@ const SchedulerDatabase::ArchiveMount::MountInfo& OStoreDB::ArchiveMount::getMou
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMount::getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, log::LogContext& logContext) {
// Find the next files to archive
// Get the archive queue
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
// First, check we should not forcibly go down. In such an occasion, we just find noting to do.
// Get drive register
{
// Get the archive queue
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
ScopedSharedLock drl(dr);
dr.fetch();
......@@ -1598,62 +1598,69 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
return std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> >();
}
}
auto aql = re.dumpArchiveQueues();
rel.release();
std::string aqAddress;
for (auto & aqp : aql) {
// Now, we should repeatedly fetch jobs from the queue until we fulfilled the request or there is nothing to get form the
// queue anymore.
// Requests are left as-is on errors. We will keep a list of them to avoid re-accessing them in the same batch.
std::set<std::string> archiveRequestsToSkip;
// Prepare the returned jobs that we might accumulate in several rounds.
std::list<std::unique_ptr<OStoreDB::ArchiveJob>> privateRet;
uint64_t currentBytes=0;
uint64_t currentFiles=0;
size_t iterationCount=0;
while (true) {
iterationCount++;
uint64_t beforeBytes=currentBytes;
uint64_t beforeFiles=currentFiles;
// Try and get access to a queue.
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
std::string aqAddress;
auto aql = re.dumpArchiveQueues();
for (auto & aqp : aql) {
if (aqp.tapePool == mountInfo.tapePool)
aqAddress = aqp.address;
}
// The archive queue is gone, there is no more job
if (!aqAddress.size())
return std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> >();
// Try and open the archive queue. It could be gone by now.
try {
}
if (!aqAddress.size()) break;
// try and lock the archive queue. Any failure from here on means the end of the getting jobs.
objectstore::ArchiveQueue aq(aqAddress, m_objectStore);
objectstore::ScopedExclusiveLock aqlock;
try {
aqlock.lock(aq);
aq.fetch();
} catch (cta::exception::Exception & ex) {
// The queue is now absent. We can remove its reference in the root entry.
// A new queue could have been added in the mean time, and be non-empty.
// We will then fail to remove from the RootEntry (non-fatal).
// TODO: We still conclude that the queue is empty on this unlikely event.
// (cont'd): A better approach would be to restart the process of this function
// from scratch.
rel.lock(re);
re.fetch();
try {
re.removeArchiveQueueAndCommit(mountInfo.tapePool);
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry");
} catch (RootEntry::ArchiveQueueNotEmpty & ex) {
// TODO: improve: if we fail here we could retry to fetch a job.
aqlock.lock(aq);
rel.release();
aq.fetch();
} catch (cta::exception::Exception & ex) {
// The queue is now absent. We can remove its reference in the root entry.
// A new queue could have been added in the mean time, and be non-empty.
// We will then fail to remove from the RootEntry (non-fatal).
if (rel.isLocked()) rel.release();
ScopedExclusiveLock rexl(re);
re.fetch();
try {
re.removeArchiveQueueAndCommit(mountInfo.tapePool);
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry");
} catch (RootEntry::ArchiveQueueNotEmpty & ex) {
// TODO: improve: if we fail here we could retry to fetch a job.
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet())
.add("Message", ex.getMessageValue());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry");
}
continue;
}
// We now have the queue.
{
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet())
.add("Message", ex.getMessageValue());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry");
.add("queueSize", aq.dumpJobs().size());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): archive queue found.");
}
return std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> >();
}
// Pop jobs by batch until we find a sufficient number actually belonging to the queue.
// Any job not really belonging is an uncommitted pop, which we will
// re-do here.
std::list<std::unique_ptr<OStoreDB::ArchiveJob>> privateRet;
uint64_t currentBytes=0;
uint64_t currentFiles=0;
{
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet())
.add("queueSize", aq.dumpJobs().size());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): archive queue found.");
}
while (aq.dumpJobs().size()) {
// We should build the list of jobs we intend to grab. We will attempt to
// dequeue them in one go, updating jobs in parallel. If some jobs turn out
// to not be there really, we will have to do several passes.
......@@ -1665,10 +1672,15 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
while (candidateDumps.size() && currentBytes < bytesRequested && currentFiles < filesRequested) {
auto job=candidateDumps.front();
candidateDumps.pop_front();
currentFiles++;
currentBytes+=job.size;
candidateJobs.emplace_back(new OStoreDB::ArchiveJob(job.address, m_objectStore, m_catalogue, m_logger, m_agentReference, *this));
candidateJobs.back()->tapeFile.copyNb = job.copyNb;
// If we saw an archive request we could not pop nor cleanup (really bad case), we
// will disregard it for the rest of this getNextJobBatch call. We will re-consider
// in the next call.
if (!archiveRequestsToSkip.count(job.address)) {
currentFiles++;
currentBytes+=job.size;
candidateJobs.emplace_back(new OStoreDB::ArchiveJob(job.address, m_objectStore, m_catalogue, m_logger, m_agentReference, *this));
candidateJobs.back()->tapeFile.copyNb = job.copyNb;
}
}
{
log::ScopedParamContainer params(logContext);
......@@ -1776,6 +1788,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
params.add("message", e.getMessageValue());
logContext.log(log::ERR, "In ArchiveMount::getNextJobBatch(): unexpected error. Leaving the job queued.");
jobsToForget.emplace_back((*j)->m_archiveRequest.getAddressIfSet());
archiveRequestsToSkip.insert((*j)->m_archiveRequest.getAddressIfSet());
}
// This job is not for us.
jobsToForget.emplace_back((*j)->m_archiveRequest.getAddressIfSet());
......@@ -1798,61 +1811,83 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
privateRet.emplace_back(std::move(*vj));
vj=validatedJobs.erase(vj);
}
// We could be done now.
if (currentBytes >= bytesRequested || currentFiles >= filesRequested)
break;
// If this round was not fruitful, just break.
if (!jobsInThisRound)
break;
}
// We either ran out of jobs or fulfilled the requirements. Time to build up the reply.
// Before this, we can release the queue and delete it if we emptied it.
auto remainingJobs=aq.dumpJobs().size();
aqlock.release();
// If the queue is empty, we can get rid of it.
if (!remainingJobs) {
try {
// The queue should be removed as it is empty.
ScopedExclusiveLock reel(re);
re.fetch();
re.removeArchiveQueueAndCommit(mountInfo.tapePool);
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): deleted empty queue");
} catch (cta::exception::Exception &ex) {
// Before going for another round, we can release the queue and delete it if we emptied it.
auto remainingJobs=aq.dumpJobs().size();
aqlock.release();
// If the queue is empty, we can get rid of it.
if (!remainingJobs) {
try {
// The queue should be removed as it is empty.
ScopedExclusiveLock rexl(re);
re.fetch();
re.removeArchiveQueueAndCommit(mountInfo.tapePool);
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): deleted empty queue");
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet())
.add("Message", ex.getMessageValue());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not delete a presumably empty queue");
}
}
// We can now summarize this round
{
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("queueObject", aq.getAddressIfSet())
.add("Message", ex.getMessageValue());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not delete a presumably empty queue");
.add("filesAdded", currentFiles - beforeFiles)
.add("bytesAdded", currentBytes - beforeBytes)
.add("filesBefore", beforeFiles)
.add("bytesBefore", beforeBytes)
.add("filesAfter", currentFiles)
.add("bytesAfter", currentBytes)
.add("iterationCount", iterationCount);
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): did one round of jobs retrieval.");
}
// We could be done now.
if (currentBytes >= bytesRequested || currentFiles >= filesRequested)
break;
// If we had exhausted the queue while selecting the jobs, we stop here, else we can go for another
// round.
if (!candidateDumps.size())
break;
} catch (cta::exception::Exception & ex) {
log::ScopedParamContainer params (logContext);
params.add("exceptionMessage", ex.getMessageValue());
logContext.log(log::ERR, "In OStoreDB::ArchiveMount::getNextJobBatch(): error (CTA exception) getting more jobs. Backtrace follows.");
logContext.logBacktrace(log::ERR, ex.backtrace());
break;
} catch (std::exception & e) {
log::ScopedParamContainer params (logContext);
params.add("exceptionWhat", e.what());
logContext.log(log::ERR, "In OStoreDB::ArchiveMount::getNextJobBatch(): error (std exception) getting more jobs.");
break;
} catch (...) {
logContext.log(log::ERR, "In OStoreDB::ArchiveMount::getNextJobBatch(): error (unknown exception) getting more jobs.");
break;
}
// Log the outcome.
uint64_t nFiles=privateRet.size();
uint64_t nBytes=0;
for (auto & j: privateRet) {
nBytes+=j->archiveFile.fileSize;
}
{
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("files", nFiles)
.add("bytes", nBytes)
.add("queueObject", aq.getAddressIfSet());
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): jobs retrieval complete.");
}
// We can construct the return value.
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret;
for (auto & j: privateRet) ret.emplace_back(std::move(j));
return ret;
} catch (cta::exception::Exception & ex) {
}
// We either ran out of jobs or fulfilled the requirements. Time to build up the reply.
// Log the outcome.
uint64_t nFiles=privateRet.size();
uint64_t nBytes=0;
for (auto & j: privateRet) {
nBytes+=j->archiveFile.fileSize;
}
{
log::ScopedParamContainer params(logContext);
params.add("tapepool", mountInfo.tapePool)
.add("message", ex.getMessageValue());
logContext.log(log::ERR, "In ArchiveMount::getNextJobBatch(): got exception. Returning empty");
return std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> >();
.add("files", nFiles)
.add("bytes", nBytes);
logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): jobs retrieval complete.");
}
// We can construct the return value.
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret;
for (auto & j: privateRet) ret.emplace_back(std::move(j));
return ret;
}
//------------------------------------------------------------------------------
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment