diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 051eaec752f1046ee17a84e9ab79cf58dfe111c6..a3190d136c65d54bc420b621505c372cae8318e9 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -22,6 +22,8 @@ #include "EntryLogSerDeser.hpp" #include "RootEntry.hpp" #include "ValueCountMap.hpp" +#include "ArchiveQueueShard.hpp" +#include "AgentReference.hpp" #include <google/protobuf/util/json_util.h> namespace cta { namespace objectstore { @@ -57,6 +59,7 @@ void ArchiveQueue::initialize(const std::string& name) { m_payload.set_tapepool(name); // set the archive jobs counter to zero m_payload.set_archivejobstotalsize(0); + m_payload.set_archivejobscount(0); m_payload.set_oldestjobcreationtime(0); // set the initial summary map rebuild count to zero m_payload.set_mapsrebuildcount(0); @@ -65,32 +68,136 @@ void ArchiveQueue::initialize(const std::string& name) { } void ArchiveQueue::commit() { - // Before calling ObjectOps::commit, check that we have coherent queue summaries + if (!checkMapsAndShardsCoherency()) { + rebuild(); + m_payload.set_mapsrebuildcount(m_payload.mapsrebuildcount()+1); + } + ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>::commit(); +} + +bool ArchiveQueue::checkMapsAndShardsCoherency() { + checkPayloadReadable(); + uint64_t bytesFromShardPointers = 0; + uint64_t jobsExpectedFromShardsPointers = 0; + // Add up shard summaries + for (auto & aqs: m_payload.archivequeuesshards()) { + bytesFromShardPointers += aqs.shardbytescount(); + jobsExpectedFromShardsPointers += aqs.shardjobscount(); + } + // The sum of shards should be equal to the summary + if (m_payload.archivejobstotalsize() != bytesFromShardPointers || + m_payload.archivejobscount() != jobsExpectedFromShardsPointers) + return false; + // Check that we have coherent queue summaries + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); + if (maxDriveAllowedMap.total() != m_payload.archivejobstotalsize() || + priorityMap.total() != m_payload.archivejobstotalsize() || + minArchiveRequestAgeMap.total() != m_payload.archivejobstotalsize()) + return false; + return true; +} + +void ArchiveQueue::rebuild() { + checkPayloadWritable(); + // Something is off with the queue. We will hence rebuild it. The rebuild of the + // queue will consist in: + // 1) Attempting to read all shards in parallel. Absent shards are possible, and will + // mean we have dangling pointers. + // 2) Rebuild the summaries from the shards. + // As a side note, we do not go as far as validating the pointers to jobs within th + // shards, as this is already handled as access goes. + std::list<ArchiveQueueShard> shards; + std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers; + + // Get the summaries structures ready ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + maxDriveAllowedMap.clear(); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + priorityMap.clear(); ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); - if (maxDriveAllowedMap.total() != (uint64_t)m_payload.archivejobs_size() || - priorityMap.total() != (uint64_t)m_payload.archivejobs_size() || - minArchiveRequestAgeMap.total() != (uint64_t)m_payload.archivejobs_size()) { - // The maps counts are off: recompute them. - maxDriveAllowedMap.clear(); - priorityMap.clear(); - minArchiveRequestAgeMap.clear(); - for (size_t i=0; i<(size_t)m_payload.archivejobs_size(); i++) { - maxDriveAllowedMap.incCount(m_payload.archivejobs(i).maxdrivesallowed()); - priorityMap.incCount(m_payload.archivejobs(i).priority()); - minArchiveRequestAgeMap.incCount(m_payload.archivejobs(i).priority()); + minArchiveRequestAgeMap.clear(); + for (auto & sa: m_payload.archivequeuesshards()) { + shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore)); + shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch()); + } + auto s = shards.begin(); + auto sf = shardsFetchers.begin(); + uint64_t totalJobs=0; + uint64_t totalBytes=0; + time_t oldestJobCreationTime=std::numeric_limits<time_t>::max(); + while (s != shards.end()) { + // Each shard could be gone + try { + (*sf)->wait(); + } catch (Backend::NoSuchObject & ex) { + // Remove the shard from the list + auto aqs = m_payload.mutable_archivequeuesshards()->begin(); + while (aqs != m_payload.mutable_archivequeuesshards()->end()) { + if (aqs->address() == s->getAddressIfSet()) { + aqs = m_payload.mutable_archivequeuesshards()->erase(aqs); + } else { + aqs++; + } + } + goto nextShard; } - m_payload.set_mapsrebuildcount(m_payload.mapsrebuildcount()+1); + { + // The shard is still around, let's compute its summaries. + uint64_t jobs = 0; + uint64_t size = 0; + for (auto & j: s->dumpJobs()) { + jobs++; + size += j.size; + priorityMap.incCount(j.priority); + minArchiveRequestAgeMap.incCount(j.minArchiveRequestAge); + maxDriveAllowedMap.incCount(j.maxDrivesAllowed); + if (j.startTime < oldestJobCreationTime) oldestJobCreationTime = j.startTime; + } + // Add the summary to total. + totalJobs+=jobs; + totalBytes+=size; + // And store the value in the shard pointers. + auto maqs = m_payload.mutable_archivequeuesshards(); + for (auto & aqs: *maqs) { + if (aqs.address() == s->getAddressIfSet()) { + aqs.set_shardjobscount(jobs); + aqs.set_shardbytescount(size); + goto shardUpdated; + } + } + { + // We had to update a shard and did not find it. This is an error. + throw exception::Exception(std::string ("In ArchiveQueue::rebuild(): failed to record summary for shard " + s->getAddressIfSet())); + } + shardUpdated:; + // We still need to check if the shard itself is coherent (we have an opportunity to + // match its summary with the jobs total we just recomputed. + if (size != s->getJobsSummary().bytes) { + ArchiveQueueShard aqs(s->getAddressIfSet(), m_objectStore); + m_exclusiveLock->includeSubObject(aqs); + aqs.fetch(); + aqs.rebuild(); + aqs.commit(); + } + } + nextShard:; + s++; + sf++; } - ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>::commit(); + m_payload.set_archivejobscount(totalJobs); + m_payload.set_archivejobstotalsize(totalBytes); + m_payload.set_oldestjobcreationtime(oldestJobCreationTime); + // We went through all the shard, re-updated the summaries, removed references to + // gone shards. Done. } bool ArchiveQueue::isEmpty() { checkPayloadReadable(); // Check we have no archive jobs pending - if (m_payload.archivejobs_size()) + if (m_payload.archivequeuesshards_size()) return false; // If we made it to here, it seems the pool is indeed empty. return true; @@ -142,43 +249,125 @@ std::string ArchiveQueue::getTapePool() { return m_payload.tapepool(); } -void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd) { +void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc) { checkPayloadWritable(); - for (auto & jta: jobsToAdd) { - // Keep track of the mounting criteria - ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); - maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed); - ValueCountMap priorityMap(m_payload.mutable_prioritymap()); - priorityMap.incCount(jta.policy.archivePriority); - ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); - minArchiveRequestAgeMap.incCount(jta.policy.archiveMinRequestAge); - if (m_payload.archivejobs_size()) { - if ((uint64_t)jta.startTime < m_payload.oldestjobcreationtime()) - m_payload.set_oldestjobcreationtime(jta.startTime); - m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + jta.fileSize); + // Before adding the jobs, we have to decide how to lay them out in the shards. + // We are here in FIFO mode, so the algorithm is just 1) complete the current last + // shard, if it did not reach the maximum size + // 2) create new shard(s) as needed. + // + // First implementation is shard by shard. A batter, parallel one could be implemented, + // but the performance gain should be marginal as most of the time we will be dealing + // with a single shard. + + auto nextJob = jobsToAdd.begin(); + while (nextJob != jobsToAdd.end()) { + // If we're here, the is at least a job to add. + // Let's find a shard for it/them. It can be either the last (incomplete) shard or + // a new shard to create. In all case, we will max out the shard, jobs list permitting. + // If we do fill up the shard, we'll go through another round here. + // Is there a last shard, and is it not full? + ArchiveQueueShard aqs(m_objectStore); + serializers::ArchiveQueueShardPointer * aqsp = nullptr; + bool newShard=false; + uint64_t shardCount = m_payload.archivequeuesshards_size(); + if (shardCount && m_payload.archivequeuesshards(shardCount - 1).shardjobscount() < c_maxShardSize) { + auto & shardPointer=m_payload.archivequeuesshards(shardCount - 1); + aqs.setAddress(shardPointer.address()); + // include-locking does not check existence of the object in the object store. + // we will find out on fetch. If we fail, we have to rebuild. + m_exclusiveLock->includeSubObject(aqs); + try { + aqs.fetch(); + } catch (Backend::NoSuchObject & ex) { + log::ScopedParamContainer params (lc); + params.add("archiveQueueObject", getAddressIfSet()) + .add("shardNumber", shardCount - 1) + .add("shardObject", shardPointer.address()); + lc.log(log::ERR, "In ArchiveQueue::addJobsAndCommit(): shard not present. Rebuilding queue."); + rebuild(); + commit(); + continue; + } + // Validate that the shard is as expected from the pointer. If not we need to + // rebuild the queue and restart the shard selection. + auto shardSummary = aqs.getJobsSummary(); + if (shardPointer.shardbytescount() != shardSummary.bytes || + shardPointer.shardjobscount() != shardSummary.jobs) { + log::ScopedParamContainer params(lc); + params.add("archiveQueueObject", getAddressIfSet()) + .add("shardNumber", shardCount - 1) + .add("shardObject", shardPointer.address()) + .add("shardReportedBytes", shardSummary.bytes) + .add("shardReportedJobs", shardSummary.jobs) + .add("expectedBytes", shardPointer.shardbytescount()) + .add("expectedJobs", shardPointer.shardjobscount()); + lc.log(log::ERR, "In ArchiveQueue::addJobsAndCommit(): mismatch found. Rebuilding the queue."); + rebuild(); + commit(); + continue; + } + // The shard looks good. We will now proceed with the addition of individual jobs. + aqsp = m_payload.mutable_archivequeuesshards(shardCount - 1); } else { - m_payload.set_archivejobstotalsize(jta.fileSize); - m_payload.set_oldestjobcreationtime(jta.startTime); + // We need a new shard. Just add it (in memory). + newShard = true; + aqsp = m_payload.mutable_archivequeuesshards()->Add(); + // Create the shard in memory. + std::stringstream shardName; + shardName << "ArchiveQueueShard-" << m_payload.tapepool(); + aqs.setAddress(agentReference.nextId(shardName.str())); + aqs.initialize(getAddressIfSet()); + // Reference the shard in the pointer, and initialized counters. + aqsp->set_address(aqs.getAddressIfSet()); + aqsp->set_shardbytescount(0); + aqsp->set_shardjobscount(0); } - auto * j = m_payload.add_archivejobs(); - j->set_address(jta.archiveRequestAddress); - j->set_size(jta.fileSize); - j->set_fileid(jta.archiveFileId); - j->set_copynb(jta.job.copyNb); - j->set_maxdrivesallowed(jta.policy.maxDrivesAllowed); - j->set_priority(jta.policy.archivePriority); - j->set_minarchiverequestage(jta.policy.archiveMinRequestAge); - } - commit(); + // We can now add the individual jobs, commit the main queue and then insert or commit the shard. + { + // As the queue could be rebuilt on each shard round, we get access to the + // value maps here + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); + while (nextJob != jobsToAdd.end() && aqsp->shardjobscount() < c_maxShardSize) { + maxDriveAllowedMap.incCount(nextJob->policy.maxDrivesAllowed); + priorityMap.incCount(nextJob->policy.archivePriority); + minArchiveRequestAgeMap.incCount(nextJob->policy.archiveMinRequestAge); + if (m_payload.archivejobscount()) { + if ((uint64_t)nextJob->startTime < m_payload.oldestjobcreationtime()) + m_payload.set_oldestjobcreationtime(nextJob->startTime); + m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + nextJob->fileSize); + } else { + m_payload.set_archivejobstotalsize(nextJob->fileSize); + m_payload.set_oldestjobcreationtime(nextJob->startTime); + } + // Add the job to shard, update pointer counts. + aqsp->set_shardjobscount(aqs.addJob(*nextJob)); + aqsp->set_shardbytescount(aqsp->shardbytescount() + nextJob->fileSize); + // And move to the next job + nextJob++; + } + } + // We will new commit this shard (and the queue) before moving to the next. + // Commit in the right order: + // 1) commit the queue so the shard is referenced in all cases (creation). + commit(); + // Now get the shard on storage. Could be either insert or commit. + if (newShard) + aqs.insert(); + else + aqs.commit(); + } // end of loop over all objects. } auto ArchiveQueue::getJobsSummary() -> JobsSummary { checkPayloadReadable(); JobsSummary ret; - ret.files = m_payload.archivejobs_size(); + ret.jobs = m_payload.archivejobstotalsize(); ret.bytes = m_payload.archivejobstotalsize(); ret.oldestJobStartTime = m_payload.oldestjobcreationtime(); - if (ret.files) { + if (ret.jobs) { ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ret.maxDrivesAllowed = maxDriveAllowedMap.maxValue(); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); @@ -193,93 +382,142 @@ auto ArchiveQueue::getJobsSummary() -> JobsSummary { return ret; } -ArchiveQueue::AdditionSummary ArchiveQueue::addJobsIfNecessaryAndCommit(std::list<JobToAdd>& jobsToAdd) { +ArchiveQueue::AdditionSummary ArchiveQueue::addJobsIfNecessaryAndCommit(std::list<JobToAdd>& jobsToAdd, + AgentReference & agentReference, log::LogContext & lc) { checkPayloadWritable(); - ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); - ValueCountMap priorityMap(m_payload.mutable_prioritymap()); - ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); + // First get all the shards of the queue to understand which jobs to add. + std::list<ArchiveQueueShard> shards; + std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers; + + for (auto & sa: m_payload.archivequeuesshards()) { + shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore)); + shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch()); + } + std::list<std::list<JobDump>> shardsDumps; + auto s = shards.begin(); + auto sf = shardsFetchers.begin(); + + while (s!= shards.end()) { + try { + (*sf)->wait(); + } catch (Backend::NoSuchObject & ex) { + goto nextShard; + } + shardsDumps.emplace_back(std::list<JobDump>()); + for (auto & j: s->dumpJobs()) { + shardsDumps.back().emplace_back(JobDump({j.size, j.address, j.copyNb})); + } + nextShard: + s++; + sf++; + } + + // Now filter the jobs to add AdditionSummary ret; + std::list<JobToAdd> jobsToReallyAdd; for (auto & jta: jobsToAdd) { - auto & jl=m_payload.archivejobs(); - for (auto j=jl.begin(); j!= jl.end(); j++) { - if (j->address() == jta.archiveRequestAddress) - goto skipInsertion; - } - { - // Keep track of the mounting criteria - maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed); - priorityMap.incCount(jta.policy.archivePriority); - minArchiveRequestAgeMap.incCount(jta.policy.archiveMinRequestAge); - if (m_payload.archivejobs_size()) { - if ((uint64_t)jta.startTime < m_payload.oldestjobcreationtime()) - m_payload.set_oldestjobcreationtime(jta.startTime); - m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + jta.fileSize); - } else { - m_payload.set_archivejobstotalsize(jta.fileSize); - m_payload.set_oldestjobcreationtime(jta.startTime); + for (auto & sd: shardsDumps) { + for (auto & sjd: sd) { + if (sjd.address == jta.archiveRequestAddress) + goto found; } - auto * j = m_payload.add_archivejobs(); - j->set_address(jta.archiveRequestAddress); - j->set_size(jta.fileSize); - j->set_fileid(jta.archiveFileId); - j->set_copynb(jta.job.copyNb); - j->set_maxdrivesallowed(jta.policy.maxDrivesAllowed); - j->set_priority(jta.policy.archivePriority); - j->set_minarchiverequestage(jta.policy.archiveMinRequestAge); - // Keep track of this addition. - ret.files++; - ret.bytes+=jta.fileSize; } - skipInsertion:; + jobsToReallyAdd.emplace_back(jta); + ret.bytes += jta.fileSize; + ret.files++; + found:; } - if (ret.files) commit(); + + // We can now proceed with the standard addition. + addJobsAndCommit(jobsToReallyAdd, agentReference, lc); return ret; } -void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& requestsToRemove) { +void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemove) { checkPayloadWritable(); ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); - auto * jl=m_payload.mutable_archivejobs(); - bool jobRemoved=false; - for (auto &rrt: requestsToRemove) { - bool found = false; - do { - found = false; - // Push the found entry all the way to the end. - for (size_t i=0; i<(size_t)jl->size(); i++) { - if (jl->Get(i).address() == rrt) { - found = jobRemoved = true; - maxDriveAllowedMap.decCount(jl->Get(i).maxdrivesallowed()); - priorityMap.decCount(jl->Get(i).priority()); - minArchiveRequestAgeMap.decCount(jl->Get(i).minarchiverequestage()); - m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() - jl->Get(i).size()); - while (i+1 < (size_t)jl->size()) { - jl->SwapElements(i, i+1); - i++; - } - break; - } + // Make a working copy of the jobs to remove. We will progressively trim this local list. + auto localJobsToRemove = jobsToRemove; + // The jobs are expected to be removed from the front shards first. + // Remove jobs until there are no more jobs or no more shards. + auto shardPointer = m_payload.mutable_archivequeuesshards()->begin(); + while (localJobsToRemove.size() && shardPointer != m_payload.mutable_archivequeuesshards()->end()) { + // Get hold of the shard + ArchiveQueueShard aqs(shardPointer->address(), m_objectStore); + m_exclusiveLock->includeSubObject(aqs); + aqs.fetch(); + // Remove jobs from shard + auto removalResult = aqs.removeJobs(localJobsToRemove); + // If the shard is drained, remove, otherwise commit. We update the pointer afterwards. + if (removalResult.jobsAfter) { + aqs.commit(); + } else { + aqs.remove(); + } + // We still need to update the tracking queue side. + // Update stats and remove the jobs from the todo list. + for (auto & j: removalResult.removedJobs) { + maxDriveAllowedMap.decCount(j.maxDrivesAllowed); + priorityMap.decCount(j.priority); + minArchiveRequestAgeMap.decCount(j.minArchiveRequestAge); + } + // If the shard is still around, we shall update its pointer's stats too. + if (removalResult.jobsAfter) { + m_payload.set_archivejobscount(m_payload.archivejobscount() - removalResult.jobsRemoved); + m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() - removalResult.bytesRemoved); + // Also update the shard pointers's stats. In case of mismatch, we will trigger a rebuild. + shardPointer->set_shardbytescount(shardPointer->shardbytescount() - removalResult.bytesRemoved); + shardPointer->set_shardjobscount(shardPointer->shardjobscount() - removalResult.jobsRemoved); + if (shardPointer->shardbytescount() != removalResult.bytesRemoved + || shardPointer->shardjobscount() != removalResult.jobsAfter) + rebuild(); + // We will commit when exiting anyway... + shardPointer++; + } else { + // Shard's gone, so should the pointer. Push it to the end of the queue and + // trim it. + auto shardPointerToRemove = shardPointer; + // Update the pointer for the next iteration. + shardPointer++; + while ((shardPointerToRemove + 1) != m_payload.mutable_archivequeuesshards()->end()) { + // Swap wants a pointer as a parameter, which the iterator is not, so we convert it to pointer + // with &* + shardPointerToRemove->Swap(&*++shardPointerToRemove); } - // and remove it - if (found) - jl->RemoveLast(); - } while (found); + m_payload.mutable_archivequeuesshards()->RemoveLast(); + } + // And commit the queue (once per shard should not hurt performance). + commit(); } - if (jobRemoved) commit(); } auto ArchiveQueue::dumpJobs() -> std::list<JobDump> { checkPayloadReadable(); + // Go read the shards in parallel... std::list<JobDump> ret; - auto & jl=m_payload.archivejobs(); - for (auto j=jl.begin(); j!=jl.end(); j++) { - ret.push_back(JobDump()); - JobDump & jd = ret.back(); - jd.address = j->address(); - jd.size = j->size(); - jd.copyNb = j->copynb(); + std::list<ArchiveQueueShard> shards; + std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers; + for (auto & sa: m_payload.archivequeuesshards()) { + shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore)); + shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch()); + } + auto s = shards.begin(); + auto sf = shardsFetchers.begin(); + while (s != shards.end()) { + try { + (*sf)->wait(); + } catch (Backend::NoSuchObject & ex) { + // We are possibly in read only mode, so we cannot rebuild. + // Just skip this shard. + goto nextShard; + } + for (auto & j: s->dumpJobs()) { + ret.emplace_back(JobDump{j.size, j.address, j.copyNb}); + } + nextShard: + s++; sf++; } return ret; } @@ -287,17 +525,25 @@ auto ArchiveQueue::dumpJobs() -> std::list<JobDump> { auto ArchiveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> archiveRequestsToSkip) -> CandidateJobList { checkPayloadReadable(); CandidateJobList ret; - ret.remainingBytesAfterCandidates = m_payload.archivejobstotalsize(); - ret.remainingFilesAfterCandidates = m_payload.archivejobs_size(); - for (auto & j: m_payload.archivejobs()) { - if (!archiveRequestsToSkip.count(j.address())) { - ret.candidates.push_back({j.size(), j.address(), (uint16_t)j.copynb()}); - ret.candidateBytes += j.size(); - ret.candidateFiles ++; + for (auto & aqsp: m_payload.archivequeuesshards()) { + // We need to go through all shard poiters unconditionnaly to count what is left (see else part) + if (ret.candidateBytes < maxBytes && ret.candidateFiles < maxFiles) { + // Fetch the shard + ArchiveQueueShard aqs(aqsp.address(), m_objectStore); + aqs.fetchNoLock(); + auto shardCandidates = aqs.getCandidateJobList(maxBytes - ret.candidateBytes, maxFiles - ret.candidateFiles, archiveRequestsToSkip); + ret.candidateBytes += shardCandidates.candidateBytes; + ret.candidateFiles += shardCandidates.candidateFiles; + // We overwrite the remaining values each time as the previous + // shards have exhaustied their candidate lists. + ret.remainingBytesAfterCandidates = shardCandidates.remainingBytesAfterCandidates; + ret.remainingFilesAfterCandidates = shardCandidates.remainingFilesAfterCandidates; + ret.candidates.splice(ret.candidates.end(), shardCandidates.candidates); + } else { + // We are done with finding candidates. We just need to count what is left in the non-visited shards. + ret.remainingBytesAfterCandidates += aqsp.shardbytescount(); + ret.remainingFilesAfterCandidates += aqsp.shardjobscount(); } - ret.remainingBytesAfterCandidates -= j.size(); - ret.remainingFilesAfterCandidates--; - if (ret.candidateBytes >= maxBytes || ret.candidateFiles >= maxFiles) break; } return ret; } diff --git a/objectstore/ArchiveQueue.hpp b/objectstore/ArchiveQueue.hpp index fe0abb3e0ab0d859f48a464c83d8a918e77034ee..188aaa47cbcfb04b1a3379f7b3aa8a913680cccd 100644 --- a/objectstore/ArchiveQueue.hpp +++ b/objectstore/ArchiveQueue.hpp @@ -51,7 +51,14 @@ public: // Commit with sanity checks (override from ObjectOps void commit(); +private: + // Validates all summaries are in accordance with each other. + bool checkMapsAndShardsCoherency(); + // Rebuild from shards if something goes wrong. + void rebuild(); + +public: // Set/get tape pool void setTapePool(const std::string & name); std::string getTapePool(); @@ -65,17 +72,25 @@ public: const cta::common::dataStructures::MountPolicy policy; time_t startTime; }; - void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd); + /** Add the jobs to the queue. + * The lock will be used to mark the shards as locked (the lock is the same for + * the main object and the shard, the is no shared access. + * As we potentially have to create new shard(s), we need access to the agent + * reference (to generate a non-colliding object name). + * We will also log the shard creation (hence the context) + */ + void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc); /// This version will check for existence of the job in the queue before // returns the count and sizes of actually added jobs (if any). struct AdditionSummary { uint64_t files = 0; uint64_t bytes = 0; }; - AdditionSummary addJobsIfNecessaryAndCommit(std::list<JobToAdd> & jobsToAdd); + AdditionSummary addJobsIfNecessaryAndCommit(std::list<JobToAdd> & jobsToAdd, + AgentReference & agentReference, log::LogContext & lc); struct JobsSummary { - uint64_t files; + uint64_t jobs; uint64_t bytes; time_t oldestJobStartTime; uint64_t priority; @@ -84,7 +99,7 @@ public: }; JobsSummary getJobsSummary(); - void removeJobsAndCommit(const std::list<std::string> & requestsToRemove); + void removeJobsAndCommit(const std::list<std::string> & jobsToRemove); struct JobDump { uint64_t size; std::string address; @@ -111,6 +126,13 @@ public: cta::catalogue::Catalogue & catalogue) override; std::string dump(); + + // The shard size. From experience, 100k is where we start to see performance difference, + // but nothing prevents us from using a smaller size. + // The performance will be roughly flat until the queue size reaches the square of this limit + // (meaning the queue object updates start to take too much time). + // with this current value of 25k, the performance should be roughly flat until 25k^2=625M. + static const uint64_t c_maxShardSize = 25000; }; }} diff --git a/objectstore/ArchiveQueueShard.cpp b/objectstore/ArchiveQueueShard.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d79873629fd7be0de12418d80225e595129dad05 --- /dev/null +++ b/objectstore/ArchiveQueueShard.cpp @@ -0,0 +1,148 @@ + +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ArchiveQueueShard.hpp" + + +namespace cta { namespace objectstore { + +ArchiveQueueShard::ArchiveQueueShard(Backend& os): + ObjectOps<serializers::ArchiveQueueShard, serializers::ArchiveQueueShard_t>(os) { } + +ArchiveQueueShard::ArchiveQueueShard(const std::string& address, Backend& os): + ObjectOps<serializers::ArchiveQueueShard, serializers::ArchiveQueueShard_t>(os, address) { } + +void ArchiveQueueShard::rebuild() { + checkPayloadWritable(); + uint64_t totalSize=0; + for (auto j: m_payload.archivejobs()) { + totalSize += j.size(); + } + m_payload.set_archivejobstotalsize(totalSize); +} + +void ArchiveQueueShard::garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) { + throw exception::Exception("In ArchiveQueueShard::garbageCollect(): garbage collection should not be necessary for this type of object."); +} + +ArchiveQueue::CandidateJobList ArchiveQueueShard::getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> archiveRequestsToSkip) { + checkPayloadReadable(); + ArchiveQueue::CandidateJobList ret; + ret.remainingBytesAfterCandidates = m_payload.archivejobstotalsize(); + ret.remainingFilesAfterCandidates = m_payload.archivejobs_size(); + for (auto & j: m_payload.archivejobs()) { + if (!archiveRequestsToSkip.count(j.address())) { + ret.candidates.push_back({j.size(), j.address(), (uint16_t)j.copynb()}); + ret.candidateBytes += j.size(); + ret.candidateFiles ++; + } + ret.remainingBytesAfterCandidates -= j.size(); + ret.remainingFilesAfterCandidates--; + if (ret.candidateBytes >= maxBytes || ret.candidateFiles >= maxFiles) break; + } + return ret; +} + +auto ArchiveQueueShard::removeJobs(const std::list<std::string>& jobsToRemove) -> RemovalResult { + checkPayloadWritable(); + RemovalResult ret; + uint64_t totalSize = m_payload.archivejobstotalsize(); + auto * jl=m_payload.mutable_archivejobs(); + for (auto &rrt: jobsToRemove) { + bool found = false; + do { + found = false; + // Push the found entry all the way to the end. + for (size_t i=0; i<(size_t)jl->size(); i++) { + if (jl->Get(i).address() == rrt) { + found = true; + const auto & j = jl->Get(i); + ret.removedJobs.emplace_back(JobInfo()); + ret.removedJobs.back().address = j.address(); + ret.removedJobs.back().copyNb = j.copynb(); + ret.removedJobs.back().maxDrivesAllowed = j.maxdrivesallowed(); + ret.removedJobs.back().minArchiveRequestAge = j.minarchiverequestage(); + ret.removedJobs.back().priority = j.priority(); + ret.removedJobs.back().size = j.size(); + ret.removedJobs.back().startTime = j.starttime(); + ret.bytesRemoved += j.size(); + totalSize -= j.size(); + ret.jobsRemoved++; + m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() - j.size()); + while (i+1 < (size_t)jl->size()) { + jl->SwapElements(i, i+1); + i++; + } + break; + } + } + // and remove it + if (found) + jl->RemoveLast(); + } while (found); + } + ret.bytesAfter = totalSize; + ret.jobsAfter = m_payload.archivejobs_size(); + return ret; +} + +void ArchiveQueueShard::initialize(const std::string& owner) { + ObjectOps<serializers::ArchiveQueueShard, serializers::ArchiveQueueShard_t>::initialize(); + setOwner(owner); + setBackupOwner(owner); + m_payload.set_archivejobstotalsize(0); + m_payloadInterpreted=true; +} + +auto ArchiveQueueShard::dumpJobs() -> std::list<JobInfo> { + checkPayloadReadable(); + std::list<JobInfo> ret; + for (auto &j: m_payload.archivejobs()) { + ret.emplace_back(JobInfo{j.size(), j.address(), (uint16_t)j.copynb(), j.priority(), + j.minarchiverequestage(), j.maxdrivesallowed(), (time_t)j.starttime()}); + } + return ret; +} + +auto ArchiveQueueShard::getJobsSummary() -> JobsSummary { + checkPayloadReadable(); + JobsSummary ret; + ret.bytes = m_payload.archivejobstotalsize(); + return ret; +} + +uint64_t ArchiveQueueShard::addJob(ArchiveQueue::JobToAdd& jobToAdd) { + checkPayloadWritable(); + auto * j = m_payload.mutable_archivejobs()->Add(); + j->set_address(jobToAdd.archiveRequestAddress); + j->set_size(jobToAdd.fileSize); + j->set_fileid(jobToAdd.archiveFileId); + j->set_copynb(jobToAdd.job.copyNb); + j->set_maxdrivesallowed(jobToAdd.policy.maxDrivesAllowed); + j->set_priority(jobToAdd.policy.archivePriority); + j->set_minarchiverequestage(jobToAdd.policy.archiveMinRequestAge); + j->set_starttime(jobToAdd.startTime); + m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize()+jobToAdd.fileSize); + return m_payload.archivejobs_size(); +} + + + + +}} \ No newline at end of file diff --git a/objectstore/ArchiveQueueShard.hpp b/objectstore/ArchiveQueueShard.hpp new file mode 100644 index 0000000000000000000000000000000000000000..03a36c008f9ca6050bd458c162c724a38b2f8555 --- /dev/null +++ b/objectstore/ArchiveQueueShard.hpp @@ -0,0 +1,83 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "ArchiveQueue.hpp" + +namespace cta { namespace objectstore { + +class ArchiveQueueShard: public ObjectOps<serializers::ArchiveQueueShard, serializers::ArchiveQueueShard_t> { +public: + // Constructor with undefined address + ArchiveQueueShard(Backend & os); + + // Constructor + ArchiveQueueShard(const std::string & address, Backend & os); + + // Forbid/hide base initializer + void initialize() = delete; + + // Initializer + void initialize(const std::string & owner); + + void garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) override; + + struct JobInfo { + uint64_t size; + std::string address; + uint16_t copyNb; + uint64_t priority; + uint64_t minArchiveRequestAge; + uint64_t maxDrivesAllowed; + time_t startTime; + }; + std::list<JobInfo> dumpJobs(); + + struct JobsSummary { + uint64_t jobs; + uint64_t bytes; + }; + JobsSummary getJobsSummary(); + + /** + * adds job, returns new size + */ + uint64_t addJob(ArchiveQueue::JobToAdd & jobToAdd); + + + struct RemovalResult { + uint64_t jobsRemoved = 0; + uint64_t jobsAfter = 0; + uint64_t bytesRemoved = 0; + uint64_t bytesAfter = 0; + std::list<JobInfo> removedJobs; + }; + /** + * Removes jobs from shard (and from the to remove list). Returns list of removed jobs. + */ + RemovalResult removeJobs(const std::list<std::string> & jobsToRemove); + + ArchiveQueue::CandidateJobList getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> archiveRequestsToSkip); + + /** Re compute summaries in case they do not match the array content. */ + void rebuild(); + +}; + +}} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 81e52e869b2a3eeee815de8ff44f3c29fb902556..63a58bd5294345d3f5924ecd9a6be340f6647547 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -318,7 +318,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer std::list<ArchiveQueue::JobToAdd> jta; jta.push_back({jd, getAddressIfSet(), getArchiveFile().archiveFileID, getArchiveFile().fileSize, getMountPolicy(), getEntryLog().time}); - aq.addJobsIfNecessaryAndCommit(jta); + aq.addJobsIfNecessaryAndCommit(jta, agentReference, lc); auto queueUpdateTime = t.secs(utils::Timer::resetCounter); j->set_owner(aq.getAddressIfSet()); j->set_status(serializers::AJS_PendingMount); diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index 734424ff3606e9555ec7f090249fc12898c31d1e..aaed1b155875e5677670dee1cfd6b7846f4b140e 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -51,6 +51,7 @@ SET_SOURCE_FILES_PROPERTIES(${CTAProtoDependants} include_directories (${PROTOBUF3_INCLUDE_DIRS}) add_library (ctaobjectstore SHARED ${CTAProtoSources} + ObjectOps.cpp RootEntry.cpp Agent.cpp AgentHeartbeatThread.cpp @@ -58,6 +59,7 @@ add_library (ctaobjectstore SHARED AgentRegister.cpp AgentWatchdog.cpp ArchiveQueue.cpp + ArchiveQueueShard.cpp RetrieveQueue.cpp ArchiveRequest.cpp RetrieveRequest.cpp diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index b0f094336f94a71e0957c45860feb8ba87f64a24..55ef6cd19f82b9d548a6c24e5d0f93bef06ee359 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -380,7 +380,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, m_ourAgentReference, tapepool.first, lc); queueLockFetchTime = t.secs(utils::Timer::resetCounter); auto jobsSummary=aq.getJobsSummary(); - filesBefore=jobsSummary.files; + filesBefore=jobsSummary.jobs; bytesBefore=jobsSummary.bytes; // We have the queue. We will loop on the requests, add them to the queue. We will launch their updates // after committing the queue. @@ -394,7 +394,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon } } } - auto addedJobs = aq.addJobsIfNecessaryAndCommit(jtal); + auto addedJobs = aq.addJobsIfNecessaryAndCommit(jtal, m_ourAgentReference, lc); queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); // If we have an unexpected failure, we will re-run the individual garbage collection. Before that, // we will NOT remove the object from agent's ownership. This variable is declared a bit ahead so @@ -492,7 +492,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon .add("bytesDequeuedAfterErrors", bytesDequeued) .add("filesBefore", filesBefore) .add("bytesBefore", bytesBefore) - .add("filesAfter", jobsSummary.files) + .add("filesAfter", jobsSummary.jobs) .add("bytesAfter", jobsSummary.bytes) .add("queueLockFetchTime", queueLockFetchTime) .add("queueProcessAndCommitTime", queueProcessAndCommitTime) diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index b9388a37f3667fe45ff0be566df7db3d1cb8da25..092df08f8e6fb77b76d9fafef87e696631e57162 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -387,7 +387,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { policy.maxDrivesAllowed = 1; std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta; jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000U+pass, policy, time(NULL)}); - aq.addJobsAndCommit(jta); + aq.addJobsAndCommit(jta, agentRef, lc); } if (pass < 4) { pass++; continue; } // TODO: partially migrated or selected @@ -406,7 +406,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { policy.maxDrivesAllowed = 1; std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta; jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL)}); - aq.addJobsAndCommit(jta); + aq.addJobsAndCommit(jta, agentRef, lc); } if (pass < 5) { pass++; continue; } // - Still marked a not owned but referenced in the agent @@ -439,8 +439,8 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { auto d1=aq1.dumpJobs(); // We expect all jobs with sizes 1002-1005 inclusive to be connected to // their respective tape pools. - ASSERT_EQ(5, aq0.getJobsSummary().files); - ASSERT_EQ(5, aq1.getJobsSummary().files); + ASSERT_EQ(5, aq0.getJobsSummary().jobs); + ASSERT_EQ(5, aq1.getJobsSummary().jobs); } // Unregister gc's agent cta::objectstore::ScopedExclusiveLock gcal(gcAgent); diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp index 3edfca13d6ea5d6039117c95352b6d35721bd895..5eddf92380716316ccb26d085bf8326f3512fe9c 100644 --- a/objectstore/GenericObject.cpp +++ b/objectstore/GenericObject.cpp @@ -148,18 +148,17 @@ namespace { using cta::objectstore::GenericObject; using cta::objectstore::ScopedExclusiveLock; template <class C> - std::string dumpWithType(GenericObject * gop, ScopedSharedLock& lock) { + std::string dumpWithType(GenericObject * gop) { C typedObject(*gop); - lock.transfer(typedObject); + ScopedLock::transfer(*gop, typedObject); std::string ret = typedObject.dump(); // Release the lock now as if we let the caller do, it will point // to the then-removed typedObject. - lock.release(); return ret; } } -std::string GenericObject::dump(ScopedSharedLock& lock) { +std::string GenericObject::dump() { checkHeaderReadable(); google::protobuf::util::JsonPrintOptions options; options.add_whitespace = true; @@ -169,31 +168,31 @@ std::string GenericObject::dump(ScopedSharedLock& lock) { google::protobuf::util::MessageToJsonString(m_header, &headerDump, options); switch(m_header.type()) { case serializers::RootEntry_t: - bodyDump = dumpWithType<RootEntry>(this, lock); + bodyDump = dumpWithType<RootEntry>(this); break; case serializers::AgentRegister_t: - bodyDump = dumpWithType<AgentRegister>(this, lock); + bodyDump = dumpWithType<AgentRegister>(this); break; case serializers::Agent_t: - bodyDump = dumpWithType<Agent>(this, lock); + bodyDump = dumpWithType<Agent>(this); break; case serializers::DriveRegister_t: - bodyDump = dumpWithType<DriveRegister>(this, lock); + bodyDump = dumpWithType<DriveRegister>(this); break; case serializers::ArchiveQueue_t: - bodyDump = dumpWithType<cta::objectstore::ArchiveQueue>(this, lock); + bodyDump = dumpWithType<cta::objectstore::ArchiveQueue>(this); break; case serializers::RetrieveQueue_t: - bodyDump = dumpWithType<cta::objectstore::RetrieveQueue>(this, lock); + bodyDump = dumpWithType<cta::objectstore::RetrieveQueue>(this); break; case serializers::ArchiveRequest_t: - bodyDump = dumpWithType<ArchiveRequest>(this, lock); + bodyDump = dumpWithType<ArchiveRequest>(this); break; case serializers::RetrieveRequest_t: - bodyDump = dumpWithType<RetrieveRequest>(this, lock); + bodyDump = dumpWithType<RetrieveRequest>(this); break; case serializers::SchedulerGlobalLock_t: - bodyDump = dumpWithType<SchedulerGlobalLock>(this, lock); + bodyDump = dumpWithType<SchedulerGlobalLock>(this); break; default: std::stringstream err; diff --git a/objectstore/GenericObject.hpp b/objectstore/GenericObject.hpp index 45e6320ce1edbbc5bf6fe1f0c6e68f836c4e8fa6..40bdc95fbce2a7c9d28bbfba735364c93d9e9b17 100644 --- a/objectstore/GenericObject.hpp +++ b/objectstore/GenericObject.hpp @@ -78,7 +78,7 @@ public: * * @param lock reference to the generic object's lock */ - std::string dump(ScopedSharedLock & lock); + std::string dump(); CTA_GENERATE_EXCEPTION_CLASS(UnsupportedType); diff --git a/objectstore/ObjectOps.cpp b/objectstore/ObjectOps.cpp new file mode 100644 index 0000000000000000000000000000000000000000..33ebad952f99721b90ac4cb0cd924833d7d9eb84 --- /dev/null +++ b/objectstore/ObjectOps.cpp @@ -0,0 +1,27 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ObjectOps.hpp" + +namespace cta { namespace objectstore { + +ObjectOpsBase::~ObjectOpsBase() { + if (m_lockForSubObject) m_lockForSubObject->dereferenceSubObject(*this); +} + +}} \ No newline at end of file diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index a88ea5f7a6724603b418efb90d3b760c35aab457..1bd01c3ac038923dec303c2ad4b084ff8b9608e9 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -30,6 +30,8 @@ namespace cta { namespace objectstore { class AgentReference; +class ScopedLock; +class ScopedExclusiveLock; class ObjectOpsBase { friend class ScopedLock; @@ -42,6 +44,8 @@ protected: m_headerInterpreted(false), m_payloadInterpreted(false), m_existingObject(false), m_locksCount(0), m_locksForWriteCount(0) {} + + virtual ~ObjectOpsBase(); public: CTA_GENERATE_EXCEPTION_CLASS(AddressNotSet); CTA_GENERATE_EXCEPTION_CLASS(NotLocked); @@ -82,6 +86,8 @@ protected: void checkWritable() { if (m_existingObject && !m_locksForWriteCount) throw NotLocked("In ObjectOps::checkWritable: object not locked for write"); + if (m_existingObject && !m_exclusiveLock && !m_lockForSubObject) + throw exception::Exception("In ObjectOps::checkWritable: missing reference to exclusive lock"); } void checkReadable() { @@ -163,6 +169,12 @@ protected: int m_locksCount; int m_locksForWriteCount; bool m_noLock = false; + // When locked exclusively, we will keep a reference to the lock, + // so we can propagate it to sub objects with minimal passing through. + ScopedExclusiveLock * m_exclusiveLock = nullptr; + // When being locked as a sub object, we will keep a reference to the lock + // we are provided with. Likewise, the lock will update ourselves when released. + ScopedLock * m_lockForSubObject = nullptr; }; class ScopedLock { @@ -176,6 +188,39 @@ public: return m_locked; } + /** + * Virtual function (implemented differently in shared and exclusive locks), + * marking the object as locked. + * @param objectOps pointer to the ObjectOpsBase. + */ + virtual void setObjectLocked(ObjectOpsBase * objectOps) = 0; + + /** + * Virtual function (implemented differently in shared and exclusive locks), + * marking the object as unlocked. + * @param objectOps pointer to the ObjectOpsBase. + */ + virtual void setObjectUnlocked(ObjectOpsBase * objectOps) = 0; + + /** + * Expand the scope of the current lock to a sub object, which will also be covered + * by this lock. This will allow the sub object to benefit from the same protection + * from lack of proper locking. This feature is to be used with sharded objects. + */ + void includeSubObject(ObjectOpsBase & subObject) { + // To propagate a lock, we should have one to begin with. + checkLocked(); + ObjectOpsBase * oob = & subObject; + // Validate the sub object is defined. + checkObjectAndAddressSet(oob); + // Propagate the lock to the sub object (this is lock type dependant). + setObjectLocked(oob); + // Reference ourselves to the sub object so it can declare it destruction to us. + oob->m_lockForSubObject = this; + // Add a reference to the object. + m_subObjectsOps.push_back(oob); + } + /** Move the locked object reference to a new one. This is done when the locked * object is a GenericObject and the caller instantiated a derived object from * it. The lock follows the move. @@ -183,20 +228,38 @@ public: * use case). * New object's locks are moved from the old one (referenced in the lock) */ - void transfer(ObjectOpsBase & newObject) { - decltype(m_objectOps) oldObj(m_objectOps); - m_objectOps = & newObject; + static void transfer(ObjectOpsBase & oldObject, ObjectOpsBase & newObject) { // Transfer the locks from old to new object - m_objectOps->m_locksCount = oldObj->m_locksCount; - m_objectOps->m_locksForWriteCount = oldObj->m_locksForWriteCount; + newObject.m_locksCount = oldObject.m_locksCount; + newObject.m_locksForWriteCount = oldObject.m_locksForWriteCount; + newObject.m_exclusiveLock = oldObject.m_exclusiveLock; + newObject.m_lockForSubObject = oldObject.m_lockForSubObject; + newObject.m_noLock = oldObject.m_noLock; // The old object is not considered locked anymore and should be // discarded. A previous call the the new object's constructor should - oldObj->m_locksCount = 0; - oldObj->m_locksForWriteCount = 0; + oldObject.m_locksCount = 0; + oldObject.m_locksForWriteCount = 0; + oldObject.m_exclusiveLock = nullptr; + oldObject.m_lockForSubObject = nullptr; + oldObject.m_noLock=false; + } + + /** + * + * @param subObject + */ + + /** + * Dereference a sub object at destruction time + * @param subObject + */ + void dereferenceSubObject(ObjectOpsBase & subObject) { + m_subObjectsOps.remove(&subObject); } virtual ~ScopedLock() { - releaseIfNeeded(); + // Each child class will have to call releaseIfNeeded() in their own destructor + // as it relies on pure virtual members of this base class. } CTA_GENERATE_EXCEPTION_CLASS(AlreadyLocked); @@ -207,6 +270,7 @@ protected: ScopedLock(): m_objectOps(NULL), m_locked(false) {} std::unique_ptr<Backend::ScopedLock> m_lock; ObjectOpsBase * m_objectOps; + std::list <ObjectOpsBase *> m_subObjectsOps; bool m_locked; void checkNotLocked() { if (m_locked) @@ -216,20 +280,27 @@ protected: if (!m_locked) throw NotLocked("In ScopedLock::checkLocked: trying to unlock an unlocked lock"); } - void checkObjectAndAddressSet() { - if (!m_objectOps) { + void checkObjectAndAddressSet(ObjectOpsBase * oob = nullptr) { + // By default we deal with the main object. + if (!oob) oob = m_objectOps; + if (!oob) { throw MissingAddress("In ScopedLock::checkAddressSet: trying to lock a NULL object"); - } else if (!m_objectOps->m_nameSet || m_objectOps->m_name.empty()) { + } else if (!oob->m_nameSet || oob->m_name.empty()) { throw MissingAddress("In ScopedLock::checkAddressSet: trying to lock an object without address"); } } virtual void releaseIfNeeded() { if(!m_locked) return; m_lock.reset(NULL); - m_objectOps->m_locksCount--; m_locked = false; + setObjectUnlocked(m_objectOps); // Releasing a lock voids the object content in memory as stored object can now change. - m_objectOps->m_payloadInterpreted=false; + m_objectOps->m_payloadInterpreted = false; + // Apply the same to sub objects + for (auto & oob: m_subObjectsOps) { + setObjectUnlocked(oob); + oob->m_payloadInterpreted = false; + } } }; @@ -239,14 +310,28 @@ public: ScopedSharedLock(ObjectOpsBase & oo) { lock(oo); } + + void setObjectLocked(ObjectOpsBase* objectOps) override { + objectOps->m_locksCount++; + } + + void setObjectUnlocked(ObjectOpsBase* objectOps) override { + objectOps->m_locksCount--; + } + void lock(ObjectOpsBase & oo) { checkNotLocked(); m_objectOps = & oo; checkObjectAndAddressSet(); m_lock.reset(m_objectOps->m_objectStore.lockShared(m_objectOps->getAddressIfSet())); - m_objectOps->m_locksCount++; + setObjectLocked(m_objectOps); m_locked = true; } + + virtual ~ScopedSharedLock() { + releaseIfNeeded(); + } + }; class ScopedExclusiveLock: public ScopedLock { @@ -255,21 +340,52 @@ public: ScopedExclusiveLock(ObjectOpsBase & oo, uint64_t timeout_us = 0) { lock(oo, timeout_us); } + + void setObjectLocked(ObjectOpsBase* objectOps) override { + m_objectOps->m_locksCount++; + m_objectOps->m_locksForWriteCount++; + } + + void setObjectUnlocked(ObjectOpsBase* objectOps) override { + m_objectOps->m_locksCount--; + m_objectOps->m_locksForWriteCount--; + } + void lock(ObjectOpsBase & oo, uint64_t timeout_us = 0) { checkNotLocked(); m_objectOps = &oo; checkObjectAndAddressSet(); m_lock.reset(m_objectOps->m_objectStore.lockExclusive(m_objectOps->getAddressIfSet(), timeout_us)); - m_objectOps->m_locksCount++; - m_objectOps->m_locksForWriteCount++; + setObjectLocked(m_objectOps); + m_objectOps->m_exclusiveLock = this; m_locked = true; } -protected: - void releaseIfNeeded() { - if (!m_locked) return; - ScopedLock::releaseIfNeeded(); - m_objectOps->m_locksForWriteCount--; + + /** Move the locked object reference to a new one. This is done when the locked + * object is a GenericObject and the caller instantiated a derived object from + * it. The lock follows the move. + * We check we move the lock from a Generic object (this is the only allowed + * use case). + * New object's locks are moved from the old one (referenced in the lock) + */ + void transfer(ObjectOpsBase & newObject) { + // Sanity checks: we should be the lock for this object. + if ((m_objectOps->m_exclusiveLock && m_objectOps->m_exclusiveLock != this) || + (m_objectOps->m_lockForSubObject && m_objectOps->m_lockForSubObject != this)) { + std::stringstream err; + err << "In ScopedExclusiveLock::transfer(): we should be this object's lock (and are not): " + << std::hex << std::showbase << " exclusiveLock=" << m_objectOps->m_exclusiveLock + << " lockForSubObject=" << m_objectOps->m_lockForSubObject + << " this=" << this; + throw exception::Exception (err.str()); + } + ScopedLock::transfer(*m_objectOps, newObject); } + + virtual ~ScopedExclusiveLock() { + releaseIfNeeded(); + } + }; template <class PayloadType, serializers::ObjectType PayloadTypeId> diff --git a/objectstore/cta-objectstore-dump-object.cpp b/objectstore/cta-objectstore-dump-object.cpp index 9cce8d42b295c6493f802213904cfd3282ec1e05..fecfc4ab1bdf7163a70b451b00530f9273c739f7 100644 --- a/objectstore/cta-objectstore-dump-object.cpp +++ b/objectstore/cta-objectstore-dump-object.cpp @@ -54,9 +54,8 @@ int main(int argc, char ** argv) { std::cout << "Object store path: " << be->getParams()->toURL() << std::endl << "Object name: " << objectName << std::endl; cta::objectstore::GenericObject ge(objectName, *be); - cta::objectstore::ScopedSharedLock gel(ge); - ge.fetch(); - std::cout << ge.dump(gel) << std::endl; + ge.fetchNoLock(); + std::cout << ge.dump() << std::endl; } catch (std::exception & e) { std::cerr << "Failed to dump object: " << std::endl << e.what() << std::endl; diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 68f392a76c5d91c1e09eaa167e3f79887b656644..cb1029f42f4bdfce5146a5bf41be2b02ba04bbc1 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -28,7 +28,9 @@ enum ObjectType { ArchiveRequest_t = 7; RetrieveRequest_t = 8; ArchiveQueue_t = 9; + ArchiveQueueShard_t = 90; RetrieveQueue_t = 10; + RetrieveQueueShard_t = 100; GenericObject_t = 1000; } @@ -137,28 +139,6 @@ message AgentRegister { repeated string untrackedagents = 2101; } -// ------------- Jobs handling ------------------------------------------------- - -message ArchiveJobPointer { - required uint64 fileid = 3000; - required uint64 size = 3001; - required string address = 3002; - required uint32 copynb = 3003; - required uint64 priority = 3004; - required uint64 minarchiverequestage = 3005; - required uint64 maxdrivesallowed = 3006; -} - -message RetrieveJobPointer { - required uint64 size = 3101; - required string address = 3102; - required uint32 copynb = 3103; - required uint64 fseq = 3107; - required uint64 priority = 3104; - required uint64 minretrieverequestage = 3105; - required uint64 maxdrivesallowed = 3106; -} - // ------------- Mount criteria and quota ------------------------------------- message MountCriteria { @@ -387,17 +367,50 @@ message ValueCountPair { required uint64 count = 9302; } +message ArchiveJobPointer { + required uint64 fileid = 3000; + required uint64 size = 3001; + required string address = 3002; + required uint32 copynb = 3003; + required uint64 priority = 3004; + required uint64 minarchiverequestage = 3005; + required uint64 maxdrivesallowed = 3006; + required uint64 starttime = 3007; +} + +message ArchiveQueueShardPointer { + required string address = 10200; + required uint64 shardjobscount = 10201; + required uint64 shardbytescount = 10202; +} + +message ArchiveQueueShard { + repeated ArchiveJobPointer archivejobs = 10300; + required uint64 archivejobstotalsize = 10301; +} + message ArchiveQueue { required string tapepool = 10000; - repeated ArchiveJobPointer archivejobs = 10010; + repeated ArchiveQueueShardPointer archivequeuesshards = 10010; repeated ValueCountPair prioritymap = 10031; repeated ValueCountPair minarchiverequestagemap = 10032; repeated ValueCountPair maxdrivesallowedmap = 10033; required uint64 archivejobstotalsize = 10040; + required uint64 archivejobscount = 10045; required uint64 oldestjobcreationtime = 10050; required uint64 mapsrebuildcount = 10060; } +message RetrieveJobPointer { + required uint64 size = 3101; + required string address = 3102; + required uint32 copynb = 3103; + required uint64 fseq = 3107; + required uint64 priority = 3104; + required uint64 minretrieverequestage = 3105; + required uint64 maxdrivesallowed = 3106; +} + message RetrieveQueue { required string vid = 10100; repeated RetrieveJobPointer retrievejobs = 10110; diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index 857f553c0fe57e5ec3eb97dff82dab778f7207ae..d60f70f3f20708acd22f6f57ed2bae9d85135cfd 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -26,7 +26,7 @@ namespace cta { namespace ostoredb { template<> void MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializedAddJobsToQueueAndCommit( std::list<MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::JobAndRequest> & jobsToAdd, - objectstore::ArchiveQueue& queue) { + objectstore::ArchiveQueue& queue, objectstore::AgentReference & agentReference, log::LogContext & logContext) { std::list<objectstore::ArchiveQueue::JobToAdd> jtal; auto queueAddress = queue.getAddressIfSet(); for (auto & j: jobsToAdd) { @@ -38,13 +38,13 @@ void MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializ j.job.owner = queueAddress; j.request.setJobOwner(j.job.copyNb, j.job.owner); } - queue.addJobsAndCommit(jtal); + queue.addJobsAndCommit(jtal, agentReference, logContext); } template<> void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::specializedAddJobsToQueueAndCommit( std::list<MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::JobAndRequest> & jobsToAdd, - objectstore::RetrieveQueue &queue) { + objectstore::RetrieveQueue &queue, objectstore::AgentReference & agentReference, log::LogContext & logContext) { std::list<objectstore::RetrieveQueue::JobToAdd> jtal; auto queueAddress = queue.getAddressIfSet(); for (auto & jta: jobsToAdd) { diff --git a/scheduler/OStoreDB/MemQueues.hpp b/scheduler/OStoreDB/MemQueues.hpp index 70c596c8d6b9c34b2aacbc10872f6981b1caeb09..81ac26502d6a5fca47aab9d81588de48e2e6ad0f 100644 --- a/scheduler/OStoreDB/MemQueues.hpp +++ b/scheduler/OStoreDB/MemQueues.hpp @@ -185,7 +185,8 @@ private: }; /** Helper function handling the difference between archive and retrieve (vid vs tapepool) */ - static void specializedAddJobsToQueueAndCommit(std::list<JobAndRequest> & jobsToAdd, Queue & queue); + static void specializedAddJobsToQueueAndCommit(std::list<JobAndRequest> & jobsToAdd, Queue & queue, + objectstore::AgentReference & agentReference, log::LogContext & logContext); /** Helper function updating the cached retrieve queue stats. Noop for archive queues */ static void specializedUpdateCachedQueueStats(Queue &queue); @@ -320,7 +321,7 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share addedJobs++; } // Actually ass the jobs. - specializedAddJobsToQueueAndCommit(jta, queue); + specializedAddJobsToQueueAndCommit(jta, queue, *oStoreDB.m_agentReference, logContext); double queueProcessAndCommitTime = timer.secs(utils::Timer::resetCounter); // Update the cache stats in memory as we hold the queue. specializedUpdateCachedQueueStats(queue); diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 0c5a89578fbeaebd0fefca34db91664ed8722809..120f8e0e9c2831ff566bf197672f59a8ec76e9fe 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -108,13 +108,13 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro } // If there are files queued, we create an entry for this tape pool in the // mount candidates list. - if (aqueue.getJobsSummary().files) { + if (aqueue.getJobsSummary().jobs) { tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount()); auto & m = tmdi.potentialMounts.back(); m.tapePool = aqp.tapePool; m.type = cta::common::dataStructures::MountType::Archive; m.bytesQueued = aqueue.getJobsSummary().bytes; - m.filesQueued = aqueue.getJobsSummary().files; + m.filesQueued = aqueue.getJobsSummary().jobs; m.oldestJobStartTime = aqueue.getJobsSummary().oldestJobStartTime; m.priority = aqueue.getJobsSummary().priority; m.maxDrivesAllowed = aqueue.getJobsSummary().maxDrivesAllowed; @@ -304,7 +304,7 @@ void OStoreDB::trimEmptyQueues(log::LogContext& lc) { ArchiveQueue aq(a.address, m_objectStore); ScopedSharedLock aql(aq); aq.fetch(); - if (!aq.getJobsSummary().files) { + if (!aq.getJobsSummary().jobs) { aql.release(); re.removeArchiveQueueAndCommit(a.tapePool, lc); log::ScopedParamContainer params(lc); @@ -1614,7 +1614,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun log::ScopedParamContainer params(logContext); params.add("tapepool", mountInfo.tapePool) .add("queueObject", aq.getAddressIfSet()) - .add("queueSize", aq.getJobsSummary().files); + .add("queueSize", aq.getJobsSummary().jobs); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): archive queue found."); } // The queue will give us a list of files to try and grab. We will attempt to @@ -2407,7 +2407,7 @@ void OStoreDB::ArchiveJob::fail(log::LogContext & lc) { std::list<objectstore::ArchiveQueue::JobToAdd> jta; jta.push_back({j, m_archiveRequest.getAddressIfSet(), m_archiveRequest.getArchiveFile().archiveFileID, m_archiveRequest.getArchiveFile().fileSize, m_archiveRequest.getMountPolicy(), m_archiveRequest.getEntryLog().time}); - aq.addJobsIfNecessaryAndCommit(jta); + aq.addJobsIfNecessaryAndCommit(jta, *m_oStoreDB.m_agentReference, lc); aqlock.release(); // We have a pointer to the job, we can change the job ownership m_archiveRequest.setJobOwner(tapeFile.copyNb, aq.getAddressIfSet());