Commit 697ea45a authored by Eric Cano's avatar Eric Cano
Browse files

Implemented first version of sharded archive queues.

Updated locking of ObjetOps in order to allow locking of sub-objects.
Make cta-objectstore-dump-object lockfree.
parent 97450998
......@@ -22,6 +22,8 @@
#include "EntryLogSerDeser.hpp"
#include "RootEntry.hpp"
#include "ValueCountMap.hpp"
#include "ArchiveQueueShard.hpp"
#include "AgentReference.hpp"
#include <google/protobuf/util/json_util.h>
namespace cta { namespace objectstore {
......@@ -57,6 +59,7 @@ void ArchiveQueue::initialize(const std::string& name) {
m_payload.set_tapepool(name);
// set the archive jobs counter to zero
m_payload.set_archivejobstotalsize(0);
m_payload.set_archivejobscount(0);
m_payload.set_oldestjobcreationtime(0);
// set the initial summary map rebuild count to zero
m_payload.set_mapsrebuildcount(0);
......@@ -65,32 +68,136 @@ void ArchiveQueue::initialize(const std::string& name) {
}
void ArchiveQueue::commit() {
// Before calling ObjectOps::commit, check that we have coherent queue summaries
if (!checkMapsAndShardsCoherency()) {
rebuild();
m_payload.set_mapsrebuildcount(m_payload.mapsrebuildcount()+1);
}
ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>::commit();
}
bool ArchiveQueue::checkMapsAndShardsCoherency() {
checkPayloadReadable();
uint64_t bytesFromShardPointers = 0;
uint64_t jobsExpectedFromShardsPointers = 0;
// Add up shard summaries
for (auto & aqs: m_payload.archivequeuesshards()) {
bytesFromShardPointers += aqs.shardbytescount();
jobsExpectedFromShardsPointers += aqs.shardjobscount();
}
// The sum of shards should be equal to the summary
if (m_payload.archivejobstotalsize() != bytesFromShardPointers ||
m_payload.archivejobscount() != jobsExpectedFromShardsPointers)
return false;
// Check that we have coherent queue summaries
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
if (maxDriveAllowedMap.total() != m_payload.archivejobstotalsize() ||
priorityMap.total() != m_payload.archivejobstotalsize() ||
minArchiveRequestAgeMap.total() != m_payload.archivejobstotalsize())
return false;
return true;
}
void ArchiveQueue::rebuild() {
checkPayloadWritable();
// Something is off with the queue. We will hence rebuild it. The rebuild of the
// queue will consist in:
// 1) Attempting to read all shards in parallel. Absent shards are possible, and will
// mean we have dangling pointers.
// 2) Rebuild the summaries from the shards.
// As a side note, we do not go as far as validating the pointers to jobs within th
// shards, as this is already handled as access goes.
std::list<ArchiveQueueShard> shards;
std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers;
// Get the summaries structures ready
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
maxDriveAllowedMap.clear();
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
priorityMap.clear();
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
if (maxDriveAllowedMap.total() != (uint64_t)m_payload.archivejobs_size() ||
priorityMap.total() != (uint64_t)m_payload.archivejobs_size() ||
minArchiveRequestAgeMap.total() != (uint64_t)m_payload.archivejobs_size()) {
// The maps counts are off: recompute them.
maxDriveAllowedMap.clear();
priorityMap.clear();
minArchiveRequestAgeMap.clear();
for (size_t i=0; i<(size_t)m_payload.archivejobs_size(); i++) {
maxDriveAllowedMap.incCount(m_payload.archivejobs(i).maxdrivesallowed());
priorityMap.incCount(m_payload.archivejobs(i).priority());
minArchiveRequestAgeMap.incCount(m_payload.archivejobs(i).priority());
minArchiveRequestAgeMap.clear();
for (auto & sa: m_payload.archivequeuesshards()) {
shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore));
shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch());
}
auto s = shards.begin();
auto sf = shardsFetchers.begin();
uint64_t totalJobs=0;
uint64_t totalBytes=0;
time_t oldestJobCreationTime=std::numeric_limits<time_t>::max();
while (s != shards.end()) {
// Each shard could be gone
try {
(*sf)->wait();
} catch (Backend::NoSuchObject & ex) {
// Remove the shard from the list
auto aqs = m_payload.mutable_archivequeuesshards()->begin();
while (aqs != m_payload.mutable_archivequeuesshards()->end()) {
if (aqs->address() == s->getAddressIfSet()) {
aqs = m_payload.mutable_archivequeuesshards()->erase(aqs);
} else {
aqs++;
}
}
goto nextShard;
}
m_payload.set_mapsrebuildcount(m_payload.mapsrebuildcount()+1);
{
// The shard is still around, let's compute its summaries.
uint64_t jobs = 0;
uint64_t size = 0;
for (auto & j: s->dumpJobs()) {
jobs++;
size += j.size;
priorityMap.incCount(j.priority);
minArchiveRequestAgeMap.incCount(j.minArchiveRequestAge);
maxDriveAllowedMap.incCount(j.maxDrivesAllowed);
if (j.startTime < oldestJobCreationTime) oldestJobCreationTime = j.startTime;
}
// Add the summary to total.
totalJobs+=jobs;
totalBytes+=size;
// And store the value in the shard pointers.
auto maqs = m_payload.mutable_archivequeuesshards();
for (auto & aqs: *maqs) {
if (aqs.address() == s->getAddressIfSet()) {
aqs.set_shardjobscount(jobs);
aqs.set_shardbytescount(size);
goto shardUpdated;
}
}
{
// We had to update a shard and did not find it. This is an error.
throw exception::Exception(std::string ("In ArchiveQueue::rebuild(): failed to record summary for shard " + s->getAddressIfSet()));
}
shardUpdated:;
// We still need to check if the shard itself is coherent (we have an opportunity to
// match its summary with the jobs total we just recomputed.
if (size != s->getJobsSummary().bytes) {
ArchiveQueueShard aqs(s->getAddressIfSet(), m_objectStore);
m_exclusiveLock->includeSubObject(aqs);
aqs.fetch();
aqs.rebuild();
aqs.commit();
}
}
nextShard:;
s++;
sf++;
}
ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>::commit();
m_payload.set_archivejobscount(totalJobs);
m_payload.set_archivejobstotalsize(totalBytes);
m_payload.set_oldestjobcreationtime(oldestJobCreationTime);
// We went through all the shard, re-updated the summaries, removed references to
// gone shards. Done.
}
bool ArchiveQueue::isEmpty() {
checkPayloadReadable();
// Check we have no archive jobs pending
if (m_payload.archivejobs_size())
if (m_payload.archivequeuesshards_size())
return false;
// If we made it to here, it seems the pool is indeed empty.
return true;
......@@ -142,43 +249,125 @@ std::string ArchiveQueue::getTapePool() {
return m_payload.tapepool();
}
void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd) {
void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc) {
checkPayloadWritable();
for (auto & jta: jobsToAdd) {
// Keep track of the mounting criteria
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed);
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
priorityMap.incCount(jta.policy.archivePriority);
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
minArchiveRequestAgeMap.incCount(jta.policy.archiveMinRequestAge);
if (m_payload.archivejobs_size()) {
if ((uint64_t)jta.startTime < m_payload.oldestjobcreationtime())
m_payload.set_oldestjobcreationtime(jta.startTime);
m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + jta.fileSize);
// Before adding the jobs, we have to decide how to lay them out in the shards.
// We are here in FIFO mode, so the algorithm is just 1) complete the current last
// shard, if it did not reach the maximum size
// 2) create new shard(s) as needed.
//
// First implementation is shard by shard. A batter, parallel one could be implemented,
// but the performance gain should be marginal as most of the time we will be dealing
// with a single shard.
auto nextJob = jobsToAdd.begin();
while (nextJob != jobsToAdd.end()) {
// If we're here, the is at least a job to add.
// Let's find a shard for it/them. It can be either the last (incomplete) shard or
// a new shard to create. In all case, we will max out the shard, jobs list permitting.
// If we do fill up the shard, we'll go through another round here.
// Is there a last shard, and is it not full?
ArchiveQueueShard aqs(m_objectStore);
serializers::ArchiveQueueShardPointer * aqsp = nullptr;
bool newShard=false;
uint64_t shardCount = m_payload.archivequeuesshards_size();
if (shardCount && m_payload.archivequeuesshards(shardCount - 1).shardjobscount() < c_maxShardSize) {
auto & shardPointer=m_payload.archivequeuesshards(shardCount - 1);
aqs.setAddress(shardPointer.address());
// include-locking does not check existence of the object in the object store.
// we will find out on fetch. If we fail, we have to rebuild.
m_exclusiveLock->includeSubObject(aqs);
try {
aqs.fetch();
} catch (Backend::NoSuchObject & ex) {
log::ScopedParamContainer params (lc);
params.add("archiveQueueObject", getAddressIfSet())
.add("shardNumber", shardCount - 1)
.add("shardObject", shardPointer.address());
lc.log(log::ERR, "In ArchiveQueue::addJobsAndCommit(): shard not present. Rebuilding queue.");
rebuild();
commit();
continue;
}
// Validate that the shard is as expected from the pointer. If not we need to
// rebuild the queue and restart the shard selection.
auto shardSummary = aqs.getJobsSummary();
if (shardPointer.shardbytescount() != shardSummary.bytes ||
shardPointer.shardjobscount() != shardSummary.jobs) {
log::ScopedParamContainer params(lc);
params.add("archiveQueueObject", getAddressIfSet())
.add("shardNumber", shardCount - 1)
.add("shardObject", shardPointer.address())
.add("shardReportedBytes", shardSummary.bytes)
.add("shardReportedJobs", shardSummary.jobs)
.add("expectedBytes", shardPointer.shardbytescount())
.add("expectedJobs", shardPointer.shardjobscount());
lc.log(log::ERR, "In ArchiveQueue::addJobsAndCommit(): mismatch found. Rebuilding the queue.");
rebuild();
commit();
continue;
}
// The shard looks good. We will now proceed with the addition of individual jobs.
aqsp = m_payload.mutable_archivequeuesshards(shardCount - 1);
} else {
m_payload.set_archivejobstotalsize(jta.fileSize);
m_payload.set_oldestjobcreationtime(jta.startTime);
// We need a new shard. Just add it (in memory).
newShard = true;
aqsp = m_payload.mutable_archivequeuesshards()->Add();
// Create the shard in memory.
std::stringstream shardName;
shardName << "ArchiveQueueShard-" << m_payload.tapepool();
aqs.setAddress(agentReference.nextId(shardName.str()));
aqs.initialize(getAddressIfSet());
// Reference the shard in the pointer, and initialized counters.
aqsp->set_address(aqs.getAddressIfSet());
aqsp->set_shardbytescount(0);
aqsp->set_shardjobscount(0);
}
auto * j = m_payload.add_archivejobs();
j->set_address(jta.archiveRequestAddress);
j->set_size(jta.fileSize);
j->set_fileid(jta.archiveFileId);
j->set_copynb(jta.job.copyNb);
j->set_maxdrivesallowed(jta.policy.maxDrivesAllowed);
j->set_priority(jta.policy.archivePriority);
j->set_minarchiverequestage(jta.policy.archiveMinRequestAge);
}
commit();
// We can now add the individual jobs, commit the main queue and then insert or commit the shard.
{
// As the queue could be rebuilt on each shard round, we get access to the
// value maps here
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
while (nextJob != jobsToAdd.end() && aqsp->shardjobscount() < c_maxShardSize) {
maxDriveAllowedMap.incCount(nextJob->policy.maxDrivesAllowed);
priorityMap.incCount(nextJob->policy.archivePriority);
minArchiveRequestAgeMap.incCount(nextJob->policy.archiveMinRequestAge);
if (m_payload.archivejobscount()) {
if ((uint64_t)nextJob->startTime < m_payload.oldestjobcreationtime())
m_payload.set_oldestjobcreationtime(nextJob->startTime);
m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + nextJob->fileSize);
} else {
m_payload.set_archivejobstotalsize(nextJob->fileSize);
m_payload.set_oldestjobcreationtime(nextJob->startTime);
}
// Add the job to shard, update pointer counts.
aqsp->set_shardjobscount(aqs.addJob(*nextJob));
aqsp->set_shardbytescount(aqsp->shardbytescount() + nextJob->fileSize);
// And move to the next job
nextJob++;
}
}
// We will new commit this shard (and the queue) before moving to the next.
// Commit in the right order:
// 1) commit the queue so the shard is referenced in all cases (creation).
commit();
// Now get the shard on storage. Could be either insert or commit.
if (newShard)
aqs.insert();
else
aqs.commit();
} // end of loop over all objects.
}
auto ArchiveQueue::getJobsSummary() -> JobsSummary {
checkPayloadReadable();
JobsSummary ret;
ret.files = m_payload.archivejobs_size();
ret.jobs = m_payload.archivejobstotalsize();
ret.bytes = m_payload.archivejobstotalsize();
ret.oldestJobStartTime = m_payload.oldestjobcreationtime();
if (ret.files) {
if (ret.jobs) {
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ret.maxDrivesAllowed = maxDriveAllowedMap.maxValue();
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
......@@ -193,93 +382,142 @@ auto ArchiveQueue::getJobsSummary() -> JobsSummary {
return ret;
}
ArchiveQueue::AdditionSummary ArchiveQueue::addJobsIfNecessaryAndCommit(std::list<JobToAdd>& jobsToAdd) {
ArchiveQueue::AdditionSummary ArchiveQueue::addJobsIfNecessaryAndCommit(std::list<JobToAdd>& jobsToAdd,
AgentReference & agentReference, log::LogContext & lc) {
checkPayloadWritable();
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
// First get all the shards of the queue to understand which jobs to add.
std::list<ArchiveQueueShard> shards;
std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers;
for (auto & sa: m_payload.archivequeuesshards()) {
shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore));
shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch());
}
std::list<std::list<JobDump>> shardsDumps;
auto s = shards.begin();
auto sf = shardsFetchers.begin();
while (s!= shards.end()) {
try {
(*sf)->wait();
} catch (Backend::NoSuchObject & ex) {
goto nextShard;
}
shardsDumps.emplace_back(std::list<JobDump>());
for (auto & j: s->dumpJobs()) {
shardsDumps.back().emplace_back(JobDump({j.size, j.address, j.copyNb}));
}
nextShard:
s++;
sf++;
}
// Now filter the jobs to add
AdditionSummary ret;
std::list<JobToAdd> jobsToReallyAdd;
for (auto & jta: jobsToAdd) {
auto & jl=m_payload.archivejobs();
for (auto j=jl.begin(); j!= jl.end(); j++) {
if (j->address() == jta.archiveRequestAddress)
goto skipInsertion;
}
{
// Keep track of the mounting criteria
maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed);
priorityMap.incCount(jta.policy.archivePriority);
minArchiveRequestAgeMap.incCount(jta.policy.archiveMinRequestAge);
if (m_payload.archivejobs_size()) {
if ((uint64_t)jta.startTime < m_payload.oldestjobcreationtime())
m_payload.set_oldestjobcreationtime(jta.startTime);
m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + jta.fileSize);
} else {
m_payload.set_archivejobstotalsize(jta.fileSize);
m_payload.set_oldestjobcreationtime(jta.startTime);
for (auto & sd: shardsDumps) {
for (auto & sjd: sd) {
if (sjd.address == jta.archiveRequestAddress)
goto found;
}
auto * j = m_payload.add_archivejobs();
j->set_address(jta.archiveRequestAddress);
j->set_size(jta.fileSize);
j->set_fileid(jta.archiveFileId);
j->set_copynb(jta.job.copyNb);
j->set_maxdrivesallowed(jta.policy.maxDrivesAllowed);
j->set_priority(jta.policy.archivePriority);
j->set_minarchiverequestage(jta.policy.archiveMinRequestAge);
// Keep track of this addition.
ret.files++;
ret.bytes+=jta.fileSize;
}
skipInsertion:;
jobsToReallyAdd.emplace_back(jta);
ret.bytes += jta.fileSize;
ret.files++;
found:;
}
if (ret.files) commit();
// We can now proceed with the standard addition.
addJobsAndCommit(jobsToReallyAdd, agentReference, lc);
return ret;
}
void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& requestsToRemove) {
void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemove) {
checkPayloadWritable();
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
auto * jl=m_payload.mutable_archivejobs();
bool jobRemoved=false;
for (auto &rrt: requestsToRemove) {
bool found = false;
do {
found = false;
// Push the found entry all the way to the end.
for (size_t i=0; i<(size_t)jl->size(); i++) {
if (jl->Get(i).address() == rrt) {
found = jobRemoved = true;
maxDriveAllowedMap.decCount(jl->Get(i).maxdrivesallowed());
priorityMap.decCount(jl->Get(i).priority());
minArchiveRequestAgeMap.decCount(jl->Get(i).minarchiverequestage());
m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() - jl->Get(i).size());
while (i+1 < (size_t)jl->size()) {
jl->SwapElements(i, i+1);
i++;
}
break;
}
// Make a working copy of the jobs to remove. We will progressively trim this local list.
auto localJobsToRemove = jobsToRemove;
// The jobs are expected to be removed from the front shards first.
// Remove jobs until there are no more jobs or no more shards.
auto shardPointer = m_payload.mutable_archivequeuesshards()->begin();
while (localJobsToRemove.size() && shardPointer != m_payload.mutable_archivequeuesshards()->end()) {
// Get hold of the shard
ArchiveQueueShard aqs(shardPointer->address(), m_objectStore);
m_exclusiveLock->includeSubObject(aqs);
aqs.fetch();
// Remove jobs from shard
auto removalResult = aqs.removeJobs(localJobsToRemove);
// If the shard is drained, remove, otherwise commit. We update the pointer afterwards.
if (removalResult.jobsAfter) {
aqs.commit();
} else {
aqs.remove();
}
// We still need to update the tracking queue side.
// Update stats and remove the jobs from the todo list.
for (auto & j: removalResult.removedJobs) {
maxDriveAllowedMap.decCount(j.maxDrivesAllowed);
priorityMap.decCount(j.priority);
minArchiveRequestAgeMap.decCount(j.minArchiveRequestAge);
}
// If the shard is still around, we shall update its pointer's stats too.
if (removalResult.jobsAfter) {
m_payload.set_archivejobscount(m_payload.archivejobscount() - removalResult.jobsRemoved);
m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() - removalResult.bytesRemoved);
// Also update the shard pointers's stats. In case of mismatch, we will trigger a rebuild.
shardPointer->set_shardbytescount(shardPointer->shardbytescount() - removalResult.bytesRemoved);
shardPointer->set_shardjobscount(shardPointer->shardjobscount() - removalResult.jobsRemoved);
if (shardPointer->shardbytescount() != removalResult.bytesRemoved
|| shardPointer->shardjobscount() != removalResult.jobsAfter)
rebuild();
// We will commit when exiting anyway...
shardPointer++;
} else {
// Shard's gone, so should the pointer. Push it to the end of the queue and
// trim it.
auto shardPointerToRemove = shardPointer;
// Update the pointer for the next iteration.
shardPointer++;
while ((shardPointerToRemove + 1) != m_payload.mutable_archivequeuesshards()->end()) {
// Swap wants a pointer as a parameter, which the iterator is not, so we convert it to pointer
// with &*
shardPointerToRemove->Swap(&*++shardPointerToRemove);
}
// and remove it
if (found)
jl->RemoveLast();
} while (found);
m_payload.mutable_archivequeuesshards()->RemoveLast();
}
// And commit the queue (once per shard should not hurt performance).
commit();
}
if (jobRemoved) commit();
}
auto ArchiveQueue::dumpJobs() -> std::list<JobDump> {
checkPayloadReadable();
// Go read the shards in parallel...
std::list<JobDump> ret;
auto & jl=m_payload.archivejobs();
for (auto j=jl.begin(); j!=jl.end(); j++) {
ret.push_back(JobDump());
JobDump & jd = ret.back();
jd.address = j->address();
jd.size = j->size();
jd.copyNb = j->copynb();
std::list<ArchiveQueueShard> shards;
std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers;
for (auto & sa: m_payload.archivequeuesshards()) {
shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore));
shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch());
}
auto s = shards.begin();
auto sf = shardsFetchers.begin();
while (s != shards.end()) {
try {
(*sf)->wait();
} catch (Backend::NoSuchObject & ex) {
// We are possibly in read only mode, so we cannot rebuild.
// Just skip this shard.
goto nextShard;
}
for (auto & j: s->dumpJobs()) {
ret.emplace_back(JobDump{j.size, j.address, j.copyNb});
}
nextShard:
s++; sf++;
}
return ret;
}
......@@ -287,17 +525,25 @@ auto ArchiveQueue::dumpJobs() -> std::list<JobDump> {
auto ArchiveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> archiveRequestsToSkip) -> CandidateJobList {
checkPayloadReadable();
CandidateJobList ret;
ret.remainingBytesAfterCandidates = m_payload.archivejobstotalsize();
ret.remainingFilesAfterCandidates = m_payload.archivejobs_size();
for (auto & j: m_payload.archivejobs()) {
if (!archiveRequestsToSkip.count(j.address())) {
ret.candidates.push_back({j.size(), j.address(), (uint16_t)j.copynb()});
ret.candidateBytes += j.size();
ret.candidateFiles ++;
for (auto & aqsp: m_payload.archivequeuesshards()) {
// We need to go through all shard poiters unconditionnaly to count what is left (see else part)
if (ret.candidateBytes < maxBytes && ret.candidateFiles < maxFiles) {
// Fetch the shard
ArchiveQueueShard aqs(aqsp.address(), m_objectStore);
aqs.fetchNoLock();
auto shardCandidates = aqs.getCandidateJobList(maxBytes - ret.candidateBytes, maxFiles - ret.candidateFiles, archiveRequestsToSkip);
ret.candidateBytes += shardCandidates.candidateBytes;
ret.candidateFiles += shardCandidates.candidateFiles;
// We overwrite the remaining values each time as the previous
// shards have exhaustied their candidate lists.
ret.remainingBytesAfterCandidates = shardCandidates.remainingBytesAfterCandidates;
ret.remainingFilesAfterCandidates = shardCandidates.remainingFilesAfterCandidates;
ret.candidates.splice(ret.candidates.end(), shardCandidates.candidates);
} else {
// We are done with finding candidates. We just need to count what is left in the non-visited shards.
ret.remainingBytesAfterCandidates += aqsp.shardbytescount();
ret.remainingFilesAfterCandidates += aqsp.shardjobscount();
}
ret.remainingBytesAfterCandidates -= j.size();
ret.remainingFilesAfterCandidates--;
if (ret.candidateBytes >= maxBytes || ret.candidateFiles >= maxFiles) break;
}
return ret;
}
......
......@@ -51,7 +51,14 @@ public:
// Commit with sanity checks (override from ObjectOps
void commit();
private:
// Validates all summaries are in accordance with each other.
bool checkMapsAndShardsCoherency();
// Rebuild from shards if something goes wrong.
void rebuild();
public:
// Set/get tape pool
void setTapePool(const std::string & name);
std::string getTapePool();
......@@ -65,17 +72,25 @@ public:
const cta::common::dataStructures::MountPolicy policy;
time_t startTime;
};
void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd);
/** Add the jobs to the queue.
* The lock will be used to mark the shards as locked (the lock is the same for
* the main object and the shard, the is no shared access.
* As we potentially have to create new shard(s), we need access to the agent
* reference (to generate a non-colliding object name).
* We will also log the shard creation (hence the context)
*/
void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc);