diff --git a/continuousintegration/buildtree_runner/vmBootstrap/bootstrapCTA.sh b/continuousintegration/buildtree_runner/vmBootstrap/bootstrapCTA.sh index f9ed2d39c9f380f69a20efaa0c774359d40d7fec..122df0448681cf1ac02ca47517df9be54192c53f 100755 --- a/continuousintegration/buildtree_runner/vmBootstrap/bootstrapCTA.sh +++ b/continuousintegration/buildtree_runner/vmBootstrap/bootstrapCTA.sh @@ -3,7 +3,7 @@ set -x echo Getting CTA sources... -( cd ~ ; git clone https://:@gitlab.cern.ch:8443/cta/CTA.git) +( cd ~ ; git clone https://:@gitlab.cern.ch:8443/cta/CTA.git; cd CTA ; git submodule update --init --recursive) echo Creating source rpm mkdir -p ~/CTA-build-srpm diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 15c60917e880eb577aba19830a407a0bddb90c6c..c7e066a9df72268573d23a05035fc2a0a9abf54f 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -80,7 +80,7 @@ bool ArchiveQueue::checkMapsAndShardsCoherency() { uint64_t bytesFromShardPointers = 0; uint64_t jobsExpectedFromShardsPointers = 0; // Add up shard summaries - for (auto & aqs: m_payload.archivequeuesshards()) { + for (auto & aqs: m_payload.archivequeueshards()) { bytesFromShardPointers += aqs.shardbytescount(); jobsExpectedFromShardsPointers += aqs.shardjobscount(); } @@ -120,7 +120,7 @@ void ArchiveQueue::rebuild() { priorityMap.clear(); ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); minArchiveRequestAgeMap.clear(); - for (auto & sa: m_payload.archivequeuesshards()) { + for (auto & sa: m_payload.archivequeueshards()) { shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore)); shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch()); } @@ -135,10 +135,10 @@ void ArchiveQueue::rebuild() { (*sf)->wait(); } catch (Backend::NoSuchObject & ex) { // Remove the shard from the list - auto aqs = m_payload.mutable_archivequeuesshards()->begin(); - while (aqs != m_payload.mutable_archivequeuesshards()->end()) { + auto aqs = m_payload.mutable_archivequeueshards()->begin(); + while (aqs != m_payload.mutable_archivequeueshards()->end()) { if (aqs->address() == s->getAddressIfSet()) { - aqs = m_payload.mutable_archivequeuesshards()->erase(aqs); + aqs = m_payload.mutable_archivequeueshards()->erase(aqs); } else { aqs++; } @@ -161,11 +161,11 @@ void ArchiveQueue::rebuild() { totalJobs+=jobs; totalBytes+=size; // And store the value in the shard pointers. - auto maqs = m_payload.mutable_archivequeuesshards(); - for (auto & aqs: *maqs) { - if (aqs.address() == s->getAddressIfSet()) { - aqs.set_shardjobscount(jobs); - aqs.set_shardbytescount(size); + auto maqs = m_payload.mutable_archivequeueshards(); + for (auto & aqsp: *maqs) { + if (aqsp.address() == s->getAddressIfSet()) { + aqsp.set_shardjobscount(jobs); + aqsp.set_shardbytescount(size); goto shardUpdated; } } @@ -199,7 +199,7 @@ void ArchiveQueue::rebuild() { bool ArchiveQueue::isEmpty() { checkPayloadReadable(); // Check we have no archive jobs pending - if (m_payload.archivequeuesshards_size()) + if (m_payload.archivequeueshards_size()) return false; // If we made it to here, it seems the pool is indeed empty. return true; @@ -272,9 +272,9 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefere ArchiveQueueShard aqs(m_objectStore); serializers::ArchiveQueueShardPointer * aqsp = nullptr; bool newShard=false; - uint64_t shardCount = m_payload.archivequeuesshards_size(); - if (shardCount && m_payload.archivequeuesshards(shardCount - 1).shardjobscount() < c_maxShardSize) { - auto & shardPointer=m_payload.archivequeuesshards(shardCount - 1); + uint64_t shardCount = m_payload.archivequeueshards_size(); + if (shardCount && m_payload.archivequeueshards(shardCount - 1).shardjobscount() < c_maxShardSize) { + auto & shardPointer=m_payload.archivequeueshards(shardCount - 1); aqs.setAddress(shardPointer.address()); // include-locking does not check existence of the object in the object store. // we will find out on fetch. If we fail, we have to rebuild. @@ -310,11 +310,11 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefere continue; } // The shard looks good. We will now proceed with the addition of individual jobs. - aqsp = m_payload.mutable_archivequeuesshards(shardCount - 1); + aqsp = m_payload.mutable_archivequeueshards(shardCount - 1); } else { // We need a new shard. Just add it (in memory). newShard = true; - aqsp = m_payload.mutable_archivequeuesshards()->Add(); + aqsp = m_payload.add_archivequeueshards(); // Create the shard in memory. std::stringstream shardName; shardName << "ArchiveQueueShard-" << m_payload.tapepool(); @@ -393,7 +393,7 @@ ArchiveQueue::AdditionSummary ArchiveQueue::addJobsIfNecessaryAndCommit(std::lis std::list<ArchiveQueueShard> shards; std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers; - for (auto & sa: m_payload.archivequeuesshards()) { + for (auto & sa: m_payload.archivequeueshards()) { shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore)); shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch()); } @@ -447,7 +447,7 @@ void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemov // The jobs are expected to be removed from the front shards first. // Remove jobs until there are no more jobs or no more shards. ssize_t shardIndex=0; - auto * mutableArchiveQueueShards= m_payload.mutable_archivequeuesshards(); + auto * mutableArchiveQueueShards= m_payload.mutable_archivequeueshards(); while (localJobsToRemove.size() && shardIndex < mutableArchiveQueueShards->size()) { auto * shardPointer = mutableArchiveQueueShards->Mutable(shardIndex); // Get hold of the shard @@ -489,7 +489,7 @@ void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemov for (auto i=shardIndex; i<mutableArchiveQueueShards->size()-1; i++) { mutableArchiveQueueShards->SwapElements(i, i+1); } - m_payload.mutable_archivequeuesshards()->RemoveLast(); + mutableArchiveQueueShards->RemoveLast(); } // We should also trim the removed jobs from our list. localJobsToRemove.remove_if( @@ -512,7 +512,7 @@ auto ArchiveQueue::dumpJobs() -> std::list<JobDump> { std::list<JobDump> ret; std::list<ArchiveQueueShard> shards; std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers; - for (auto & sa: m_payload.archivequeuesshards()) { + for (auto & sa: m_payload.archivequeueshards()) { shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore)); shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch()); } @@ -538,7 +538,7 @@ auto ArchiveQueue::dumpJobs() -> std::list<JobDump> { auto ArchiveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> archiveRequestsToSkip) -> CandidateJobList { checkPayloadReadable(); CandidateJobList ret; - for (auto & aqsp: m_payload.archivequeuesshards()) { + for (auto & aqsp: m_payload.archivequeueshards()) { // We need to go through all shard poiters unconditionnaly to count what is left (see else part) if (ret.candidateBytes < maxBytes && ret.candidateFiles < maxFiles) { // Fetch the shard diff --git a/objectstore/ArchiveQueueShard.cpp b/objectstore/ArchiveQueueShard.cpp index 345e4ea9f7570648acd4e9cede8d04108bdb4c7b..406e7173e6ff64132e0fd40b6117b612a4e0eda5 100644 --- a/objectstore/ArchiveQueueShard.cpp +++ b/objectstore/ArchiveQueueShard.cpp @@ -151,7 +151,7 @@ auto ArchiveQueueShard::getJobsSummary() -> JobsSummary { uint64_t ArchiveQueueShard::addJob(ArchiveQueue::JobToAdd& jobToAdd) { checkPayloadWritable(); - auto * j = m_payload.mutable_archivejobs()->Add(); + auto * j = m_payload.add_archivejobs(); j->set_address(jobToAdd.archiveRequestAddress); j->set_size(jobToAdd.fileSize); j->set_fileid(jobToAdd.archiveFileId); diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index aaed1b155875e5677670dee1cfd6b7846f4b140e..079f849701665f3135175eb2a39c8cc1727ec12b 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -61,6 +61,7 @@ add_library (ctaobjectstore SHARED ArchiveQueue.cpp ArchiveQueueShard.cpp RetrieveQueue.cpp + RetrieveQueueShard.cpp ArchiveRequest.cpp RetrieveRequest.cpp DriveRegister.cpp diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 55ef6cd19f82b9d548a6c24e5d0f93bef06ee359..14bbdf913ece1c9be0e8db3c92ed460513163901 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -555,7 +555,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon } } } - auto addedJobs = rq.addJobsIfNecessaryAndCommit(jta); + auto addedJobs = rq.addJobsIfNecessaryAndCommit(jta, m_ourAgentReference, lc); queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); // If we have an unexpected failure, we will re-run the individual garbage collection. Before that, // we will NOT remove the object from agent's ownership. This variable is declared a bit ahead so diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 092df08f8e6fb77b76d9fafef87e696631e57162..2a0a6ea6793cdca40a30114bfe5d78a603229573 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -589,7 +589,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { rq.fetch(); std::list <cta::objectstore::RetrieveQueue::JobToAdd> jta; jta.push_back({1,rqc.archiveFile.tapeFiles[1].fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time}); - rq.addJobsAndCommit(jta); + rq.addJobsAndCommit(jta, agentRef, lc); } if (pass < 5) { pass++; continue; } // - Still marked as not owned but referenced in the agent diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index dd0bbaf3adcd537a97009781bc56047a65b8f817..b5a2d742c981ecf0e49e8b9134a5f7c3678b2111 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -91,8 +91,9 @@ protected: } void checkReadable() { - if (!m_locksCount && !m_noLock) - throw NotLocked("In ObjectOps::checkReadable: object not locked"); + // We could still read from a fresh, not yet inserted object. + if (m_existingObject && (!m_locksCount && !m_noLock)) + throw NotLocked("In ObjectOps::checkReadable: object not locked"); } public: diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index aee2b9eecce1daf6f590da936bb5ce8b043ac06b..5626d6137913541df4d338f6556c2a541fdbb154 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -17,15 +17,19 @@ */ #include "RetrieveQueue.hpp" +#include "RetrieveQueueShard.hpp" #include "GenericObject.hpp" #include "EntryLogSerDeser.hpp" #include "ValueCountMap.hpp" +#include "AgentReference.hpp" #include <google/protobuf/util/json_util.h> -cta::objectstore::RetrieveQueue::RetrieveQueue(const std::string& address, Backend& os): +namespace cta { namespace objectstore { + +RetrieveQueue::RetrieveQueue(const std::string& address, Backend& os): ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>(os, address) { } -cta::objectstore::RetrieveQueue::RetrieveQueue(GenericObject& go): +RetrieveQueue::RetrieveQueue(GenericObject& go): ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>(go.objectStore()){ // Here we transplant the generic object into the new object go.transplantHeader(*this); @@ -33,47 +37,161 @@ cta::objectstore::RetrieveQueue::RetrieveQueue(GenericObject& go): getPayloadFromHeader(); } -cta::objectstore::RetrieveQueue::RetrieveQueue(Backend& os): +RetrieveQueue::RetrieveQueue(Backend& os): ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>(os) { } -void cta::objectstore::RetrieveQueue::initialize(const std::string &vid) { +void RetrieveQueue::initialize(const std::string &vid) { ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::initialize(); // Set the reguired fields m_payload.set_oldestjobcreationtime(0); m_payload.set_retrievejobstotalsize(0); + m_payload.set_retrievejobscount(0); m_payload.set_vid(vid); m_payload.set_mapsrebuildcount(0); m_payloadInterpreted = true; } -void cta::objectstore::RetrieveQueue::commit() { - // Before calling ObjectOps::commit, check that we have coherent queue summaries +bool RetrieveQueue::checkMapsAndShardsCoherency() { + checkPayloadReadable(); + uint64_t bytesFromShardPointers = 0; + uint64_t jobsExpectedFromShardsPointers = 0; + // Add up shard summaries + for (auto & aqs: m_payload.retrievequeueshards()) { + bytesFromShardPointers += aqs.shardbytescount(); + jobsExpectedFromShardsPointers += aqs.shardjobscount(); + } + uint64_t totalBytes = m_payload.retrievejobstotalsize(); + uint64_t totalJobs = m_payload.retrievejobscount(); + // The sum of shards should be equal to the summary + if (totalBytes != bytesFromShardPointers || + totalJobs != jobsExpectedFromShardsPointers) + return false; + // Check that we have coherent queue summaries ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); - if (maxDriveAllowedMap.total() != (uint64_t)m_payload.retrievejobs_size() || - priorityMap.total() != (uint64_t)m_payload.retrievejobs_size() || - minRetrieveRequestAgeMap.total() != (uint64_t)m_payload.retrievejobs_size()) { - // The maps counts are off: recompute them. - maxDriveAllowedMap.clear(); - priorityMap.clear(); - minRetrieveRequestAgeMap.clear(); - for (size_t i=0; i<(size_t)m_payload.retrievejobs_size(); i++) { - maxDriveAllowedMap.incCount(m_payload.retrievejobs(i).maxdrivesallowed()); - priorityMap.incCount(m_payload.retrievejobs(i).priority()); - minRetrieveRequestAgeMap.incCount(m_payload.retrievejobs(i).minretrieverequestage()); + if (maxDriveAllowedMap.total() != m_payload.retrievejobscount() || + priorityMap.total() != m_payload.retrievejobscount() || + minRetrieveRequestAgeMap.total() != m_payload.retrievejobscount()) + return false; + return true; +} + +void RetrieveQueue::rebuild() { + checkPayloadWritable(); + // Something is off with the queue. We will hence rebuild it. The rebuild of the + // queue will consist in: + // 1) Attempting to read all shards in parallel. Absent shards are possible, and will + // mean we have dangling pointers. + // 2) Rebuild the summaries from the shards. + // As a side note, we do not go as far as validating the pointers to jobs within the + // shards, as this is already handled as access goes. + std::list<RetrieveQueueShard> shards; + std::list<std::unique_ptr<RetrieveQueueShard::AsyncLockfreeFetcher>> shardsFetchers; + + // Get the summaries structures ready + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + maxDriveAllowedMap.clear(); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + priorityMap.clear(); + ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + minRetrieveRequestAgeMap.clear(); + for (auto & sa: m_payload.retrievequeueshards()) { + shards.emplace_back(RetrieveQueueShard(sa.address(), m_objectStore)); + shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch()); + } + auto s = shards.begin(); + auto sf = shardsFetchers.begin(); + uint64_t totalJobs=0; + uint64_t totalBytes=0; + time_t oldestJobCreationTime=std::numeric_limits<time_t>::max(); + while (s != shards.end()) { + // Each shard could be gone + try { + (*sf)->wait(); + } catch (Backend::NoSuchObject & ex) { + // Remove the shard from the list + auto aqs = m_payload.mutable_retrievequeueshards()->begin(); + while (aqs != m_payload.mutable_retrievequeueshards()->end()) { + if (aqs->address() == s->getAddressIfSet()) { + aqs = m_payload.mutable_retrievequeueshards()->erase(aqs); + } else { + aqs++; + } + } + goto nextShard; + } + { + // The shard is still around, let's compute its summaries. + uint64_t jobs = 0; + uint64_t size = 0; + uint64_t minFseq = std::numeric_limits<uint64_t>::max(); + uint64_t maxFseq = std::numeric_limits<uint64_t>::min(); + for (auto & j: s->dumpJobs()) { + jobs++; + size += j.size; + priorityMap.incCount(j.priority); + minRetrieveRequestAgeMap.incCount(j.minRetrieveRequestAge); + maxDriveAllowedMap.incCount(j.maxDrivesAllowed); + if (j.startTime < oldestJobCreationTime) oldestJobCreationTime = j.startTime; + if (j.fSeq < minFseq) minFseq = j.fSeq; + if (j.fSeq > maxFseq) maxFseq = j.fSeq; + } + // Add the summary to total. + totalJobs+=jobs; + totalBytes+=size; + // And store the value in the shard pointers. + auto mrqs = m_payload.mutable_retrievequeueshards(); + for (auto & rqsp: *mrqs) { + if (rqsp.address() == s->getAddressIfSet()) { + rqsp.set_shardjobscount(jobs); + rqsp.set_shardbytescount(size); + rqsp.set_maxfseq(maxFseq); + rqsp.set_minfseq(minFseq); + goto shardUpdated; + } + } + { + // We had to update a shard and did not find it. This is an error. + throw exception::Exception(std::string ("In RetrieveQueue::rebuild(): failed to record summary for shard " + s->getAddressIfSet())); + } + shardUpdated:; + // We still need to check if the shard itself is coherent (we have an opportunity to + // match its summary with the jobs total we just recomputed. + if (size != s->getJobsSummary().bytes) { + RetrieveQueueShard rqs(s->getAddressIfSet(), m_objectStore); + m_exclusiveLock->includeSubObject(rqs); + rqs.fetch(); + rqs.rebuild(); + rqs.commit(); + } } + nextShard:; + s++; + sf++; + } + m_payload.set_retrievejobscount(totalJobs); + m_payload.set_retrievejobstotalsize(totalBytes); + m_payload.set_oldestjobcreationtime(oldestJobCreationTime); + // We went through all the shard, re-updated the summaries, removed references to + // gone shards. Done.} +} + + +void RetrieveQueue::commit() { + if (!checkMapsAndShardsCoherency()) { + rebuild(); m_payload.set_mapsrebuildcount(m_payload.mapsrebuildcount()+1); } ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::commit(); } -bool cta::objectstore::RetrieveQueue::isEmpty() { +bool RetrieveQueue::isEmpty() { checkPayloadReadable(); - return !m_payload.retrievejobs_size(); + return !m_payload.retrievejobstotalsize() && !m_payload.retrievequeueshards_size(); } -void cta::objectstore::RetrieveQueue::removeIfEmpty(log::LogContext & lc) { +void RetrieveQueue::removeIfEmpty(log::LogContext & lc) { checkPayloadWritable(); if (!isEmpty()) { throw NotEmpty("In RetrieveQueue::removeIfEmpty: trying to remove an tape with retrieves queued"); @@ -84,12 +202,12 @@ void cta::objectstore::RetrieveQueue::removeIfEmpty(log::LogContext & lc) { lc.log(log::INFO, "In RetrieveQueue::removeIfEmpty(): removed the queue."); } -std::string cta::objectstore::RetrieveQueue::getVid() { +std::string RetrieveQueue::getVid() { checkPayloadReadable(); return m_payload.vid(); } -std::string cta::objectstore::RetrieveQueue::dump() { +std::string RetrieveQueue::dump() { checkPayloadReadable(); google::protobuf::util::JsonPrintOptions options; options.add_whitespace = true; @@ -99,204 +217,467 @@ std::string cta::objectstore::RetrieveQueue::dump() { return headerDump; } -void cta::objectstore::RetrieveQueue::addJobsAndCommit(std::list<cta::objectstore::RetrieveQueue::JobToAdd> & jobsToAdd) { +namespace { +struct ShardForAddition { + bool newShard=false; + bool creationDone=false; + bool splitDone=false; + bool toSplit=false; + ShardForAddition * splitDestination = nullptr; + bool fromSplit=false; + ShardForAddition * splitSource = nullptr; + std::string address; + uint64_t minFseq; + uint64_t maxFseq; + uint64_t jobsCount; + std::list<RetrieveQueue::JobToAdd> jobsToAdd; + size_t shardIndex = std::numeric_limits<size_t>::max(); +}; + +void updateShardLimits(uint64_t fSeq, ShardForAddition & sfa) { + if (fSeq < sfa.minFseq) sfa.minFseq=fSeq; + if (fSeq > sfa.maxFseq) sfa.maxFseq=fSeq; +} + +/** Add a jobs to a shard, spliting it if necessary*/ +void addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd, + std::list<ShardForAddition>::iterator & shardForAddition, std::list<ShardForAddition> & shardList) { + // Is the shard still small enough? We will not double split shards (we suppose insertion size << shard size cap). + // We will also no split a new shard. + if ( shardForAddition->jobsCount < RetrieveQueue::c_maxShardSize + || shardForAddition->fromSplit || shardForAddition->newShard) { + // We just piggy back here. No need to increase range, we are within it. + shardForAddition->jobsCount++; + shardForAddition->jobsToAdd.emplace_back(jobToAdd); + updateShardLimits(jobToAdd.fSeq, *shardForAddition); + } else { + // The shard is full. We need to split it (and can). We will cut the shard range in + // 2 equal parts, not forgetting to redistribute the existing jobs to add accordinglyGarbageCollectorRetrieveRequest + // Create the new shard + auto newSfa = shardList.insert(shardForAddition, ShardForAddition()); + // The new shard size can only be estimated, but we will update it to the actual value as we update the shard. + uint64_t shardRange = shardForAddition->maxFseq - shardForAddition->minFseq; + uint64_t oldMax = shardForAddition->maxFseq; + shardForAddition->maxFseq = shardForAddition->minFseq + shardRange/2; + shardForAddition->jobsCount = shardForAddition->jobsCount/2; + shardForAddition->toSplit = true; + shardForAddition->splitDestination = &*newSfa; + newSfa->minFseq = shardForAddition->maxFseq+1; + newSfa->maxFseq = oldMax; + newSfa->jobsCount = shardForAddition->jobsCount; + newSfa->splitSource = &*shardForAddition; + newSfa->fromSplit = true; + newSfa->newShard = true; + // Transfer jobs to add to new shard if needed + for (auto jta2=shardForAddition->jobsToAdd.begin(); jta2!=shardForAddition->jobsToAdd.end();) { + if (jta2->fSeq >= newSfa->minFseq) { + newSfa->jobsToAdd.emplace_back(*jta2); + jta2 = shardForAddition->jobsToAdd.erase(jta2); + } else { + jta2++; + } + } + // We can finally add our job to one of the two shards from the split + if (jobToAdd.fSeq <= shardForAddition->maxFseq) { + shardForAddition->jobsToAdd.emplace_back(jobToAdd); + shardForAddition->jobsCount++; + updateShardLimits(jobToAdd.fSeq, *shardForAddition); + } else { + newSfa->jobsToAdd.emplace_back(jobToAdd); + newSfa->jobsCount++; + updateShardLimits(jobToAdd.fSeq, *shardForAddition); + } + } +} + +} // anonymous namespace + +void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc) { checkPayloadWritable(); // Keep track of the mounting criteria ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + // We need to figure out which job will be added to which shard. + // We might have to split shards if they would become too big. + // For a given jobs, there a 4 possible cases: + // - Before first shard + // - Within a shard + // - Between 2 shards + // - After last shard + // We can classify the previous into 2 use cases + // - Within a shard + // - Outside of a shard + // In the case we land within a shard, we either have to use the existing one, or + // to split it and choose the half we are going to use. + // In case we land outside of a shard, we have 1 or 2 adjacent shards. If any of the + // one or 2 is not full, we will add the job to it. + // otherwise, a new shard will be added to contain the job. + // We need to pre-plan the insertion with all the incoming jobs before doing the real action. + // 1) Initialize the shard list from the shard pointers. The shards are supposed to be + // sorted by fseq and to contain non-overlapping segments of fSeq ranges. We will tweak the + // extracted values from the object store to achieve this condition. + std::list<ShardForAddition> shardsForAddition; + for (auto & rqsp: m_payload.retrievequeueshards()) { + shardsForAddition.emplace_back(ShardForAddition()); + auto & sfa = shardsForAddition.back(); + sfa.minFseq = rqsp.minfseq(); + sfa.maxFseq = rqsp.maxfseq(); + sfa.jobsCount = rqsp.shardjobscount(); + sfa.address = rqsp.address(); + } + // After extracting the pointers, we ensure the fSeqs are in order. We go from first to last, + // and whichever fSeq is used will push forward the limits of the following shards. + uint64_t highestFseqSoFar=0; + for (auto & sfa: shardsForAddition) { + sfa.minFseq = std::max(highestFseqSoFar, sfa.minFseq); + highestFseqSoFar = sfa.minFseq; + sfa.maxFseq = std::max(highestFseqSoFar, sfa.maxFseq); + highestFseqSoFar = sfa.maxFseq+1; + } + // We now try to fit the jobs to the right shards for (auto & jta: jobsToAdd) { - maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed); - priorityMap.incCount(jta.policy.retrievePriority); - minRetrieveRequestAgeMap.incCount(jta.policy.retrieveMinRequestAge); - if (m_payload.retrievejobs_size()) { - if (m_payload.oldestjobcreationtime() > (uint64_t)jta.startTime) { - m_payload.set_oldestjobcreationtime(jta.startTime); + // If there is no shard, let's create the initial one. + if (shardsForAddition.empty()) { + shardsForAddition.emplace_back(ShardForAddition()); + auto & sfa=shardsForAddition.back(); + sfa.newShard=true; + sfa.maxFseq = sfa.minFseq = jta.fSeq; + sfa.jobsCount=1; + sfa.jobsToAdd.emplace_back(jta); + goto jobInserted; + } + // Find where the job lands in the shards + for (auto sfa=shardsForAddition.begin(); sfa != shardsForAddition.end(); sfa++) { + // Is it within this shard? + if (jta.fSeq >= sfa->minFseq && jta.fSeq <= sfa->maxFseq) { + addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition); + goto jobInserted; + } else if (sfa != shardsForAddition.end() && std::next(sfa) != shardsForAddition.end()) { + // Are we between shards? + auto nextSfa=std::next(sfa); + if (jta.fSeq > sfa->maxFseq && jta.fSeq < nextSfa->minFseq) { + if (sfa->jobsCount < nextSfa->jobsCount) { + addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition); + } else { + addJobToShardAndMaybeSplit(jta, nextSfa, shardsForAddition); + } + goto jobInserted; + } + } else if (sfa->minFseq > jta.fSeq) { + // Are we before the current shard? (for example, before first shard) + addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition); + goto jobInserted; + } else if (std::next(sfa) == shardsForAddition.end() && sfa->maxFseq < jta.fSeq) { + // Are we after the last shard? + addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition); + goto jobInserted; } - m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + jta.size); + } + // Still not inserted? Now we run out of options. Segfault to ease debugging. + { + *((int *)nullptr) = 0; // TODO: remove in the long run. + throw cta::exception::Exception("In RetrieveQueue::addJobsAndCommit(): could not find an appropriate shard for job"); + } + jobInserted:; + } + + { + // Number the shards. + size_t shardIndex=0; + for (auto & shard: shardsForAddition) shard.shardIndex=shardIndex++; + } + + // Jobs are now planned for insertions in their respective (and potentially + // new) shards. + // We will iterate shard by shard. + // TODO: shard creation and update could be parallelized (to some extent as we + // have shard to shard dependencies with the splits), but as a first implementation + // we just go iteratively. + uint64_t addedJobs = 0, addedBytes = 0; + for (auto & shard: shardsForAddition) { + // Variables which will allow the shard/pointer updates in all cases. + cta::objectstore::serializers::RetrieveQueueShardPointer * shardPointer = nullptr, * splitFromShardPointer = nullptr; + RetrieveQueueShard rqs(m_objectStore), rqsSplitFrom(m_objectStore); + if (shard.newShard) { + // Irrespective of the case, we need to create the shard. + std::stringstream shardName; + shardName << "RetrieveQueueShard-" << m_payload.vid(); + rqs.setAddress(agentReference.nextId(shardName.str())); + rqs.initialize(getAddressIfSet()); + // We also need to create the pointer, and insert it to the right spot. + shardPointer = m_payload.add_retrievequeueshards(); + // Pre-update the shard pointer. + shardPointer->set_address(rqs.getAddressIfSet()); + shardPointer->set_minfseq(0); + shardPointer->set_minfseq(0); + shardPointer->set_shardbytescount(0); + shardPointer->set_shardjobscount(0); + // Move the shard pointer to its intended location. + size_t currentShardPosition=m_payload.retrievequeueshards_size() - 1; + while (currentShardPosition != shard.shardIndex) { + m_payload.mutable_retrievequeueshards()->SwapElements(currentShardPosition-1,currentShardPosition); + currentShardPosition--; + } + // Make sure the pointer is the right one after move. + shardPointer = m_payload.mutable_retrievequeueshards(shard.shardIndex); + // If necessary, fill up this new shard with jobs from the split one. + if (shard.fromSplit) { + rqsSplitFrom.setAddress(shard.splitSource->address); + splitFromShardPointer=m_payload.mutable_retrievequeueshards(shard.splitSource->shardIndex); + m_exclusiveLock->includeSubObject(rqsSplitFrom); + rqsSplitFrom.fetch(); + auto jobsFromSource=rqsSplitFrom.dumpJobsToAdd(); + std::list<std::string> jobsToTransferAddresses; + for (auto &j: jobsFromSource) { + if (j.fSeq >= shard.minFseq && j.fSeq <= shard.maxFseq) { + rqs.addJob(j); + jobsToTransferAddresses.emplace_back(j.retrieveRequestAddress); + } + } + rqsSplitFrom.removeJobs(jobsToTransferAddresses); + auto splitFromShardSummary = rqsSplitFrom.getJobsSummary(); + splitFromShardPointer->set_maxfseq(splitFromShardSummary.maxFseq); + splitFromShardPointer->set_minfseq(splitFromShardSummary.minFseq); + splitFromShardPointer->set_shardbytescount(splitFromShardSummary.bytes); + splitFromShardPointer->set_shardjobscount(splitFromShardSummary.jobs); + // We are all set (in memory) for the shard from which we split. + } + // We can now fill up the shard (outside of this if/else). } else { - m_payload.set_oldestjobcreationtime(jta.startTime); - m_payload.set_retrievejobstotalsize(jta.size); + rqs.setAddress(shard.address); + m_exclusiveLock->includeSubObject(rqs); + rqs.fetch(); + shardPointer=m_payload.mutable_retrievequeueshards(shard.shardIndex); } - auto * j = m_payload.add_retrievejobs(); - j->set_address(jta.retrieveRequestAddress); - j->set_size(jta.size); - j->set_copynb(jta.copyNb); - j->set_fseq(jta.fSeq); - j->set_priority(jta.policy.retrievePriority); - j->set_minretrieverequestage(jta.policy.retrieveMinRequestAge); - j->set_maxdrivesallowed(jta.policy.maxDrivesAllowed); - // move the the new job in the right spot on the queue. - // i points to the newly added job all the time. - size_t i=m_payload.retrievejobs_size() - 1; - while (i > 0 && m_payload.retrievejobs(i).fseq() < m_payload.retrievejobs(i - 1).fseq()) { - m_payload.mutable_retrievejobs()->SwapElements(i-1, i); - i--; + // ... add the jobs to the shard (in memory) + for (auto j:shard.jobsToAdd) { + rqs.addJob(j); + addedJobs++; + addedBytes+=j.fileSize; + maxDriveAllowedMap.incCount(j.policy.maxDrivesAllowed); + priorityMap.incCount(j.policy.retrievePriority); + minRetrieveRequestAgeMap.incCount(j.policy.retrieveMinRequestAge); } + // ... update the shard pointer + auto shardSummary = rqs.getJobsSummary(); + shardPointer->set_maxfseq(shardSummary.maxFseq); + shardPointer->set_minfseq(shardSummary.minFseq); + shardPointer->set_shardbytescount(shardSummary.bytes); + shardPointer->set_shardjobscount(shardSummary.jobs); + // ... and finally commit the queue (first! there is potentially a new shard to + // pre-reference before inserting) and shards as is appropriate. + // Update global summaries + m_payload.set_retrievejobscount(m_payload.retrievejobscount() + addedJobs); + m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + addedBytes); + commit(); + if (shard.newShard) + rqs.insert(); + else rqs.commit(); } - commit(); } -auto cta::objectstore::RetrieveQueue::addJobsIfNecessaryAndCommit(std::list<cta::objectstore::RetrieveQueue::JobToAdd> & jobsToAdd) +auto RetrieveQueue::addJobsIfNecessaryAndCommit(std::list<JobToAdd> & jobsToAdd, + AgentReference & agentReference, log::LogContext & lc) -> AdditionSummary { checkPayloadWritable(); + // First get all the shards of the queue to understand which jobs to add. + std::list<RetrieveQueueShard> shards; + std::list<std::unique_ptr<RetrieveQueueShard::AsyncLockfreeFetcher>> shardsFetchers; + + for (auto & sp: m_payload.retrievequeueshards()) { + shards.emplace_back(RetrieveQueueShard(sp.address(), m_objectStore)); + shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch()); + } + std::list<std::list<JobDump>> shardsDumps; + auto s = shards.begin(); + auto sf = shardsFetchers.begin(); + + while (s!= shards.end()) { + try { + (*sf)->wait(); + } catch (Backend::NoSuchObject & ex) { + goto nextShard; + } + shardsDumps.emplace_back(std::list<JobDump>()); + for (auto & j: s->dumpJobs()) { + shardsDumps.back().emplace_back(JobDump({j.address, j.copyNb, j.size})); + } + nextShard: + s++; + sf++; + } + + // Now filter the jobs to add AdditionSummary ret; - ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); - ValueCountMap priorityMap(m_payload.mutable_prioritymap()); - ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + std::list<JobToAdd> jobsToReallyAdd; for (auto & jta: jobsToAdd) { - // Check if the job is present and skip insertion if so - for (auto &j: m_payload.retrievejobs()) { - if (j.address() == jta.retrieveRequestAddress) - goto skipInsertion; - } - { - // Keep track of the mounting criteria - maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed); - priorityMap.incCount(jta.policy.retrievePriority); - minRetrieveRequestAgeMap.incCount(jta.policy.retrieveMinRequestAge); - if (m_payload.retrievejobs_size()) { - if (m_payload.oldestjobcreationtime() > (uint64_t)jta.startTime) { - m_payload.set_oldestjobcreationtime(jta.startTime); - } - m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + jta.size); - } else { - m_payload.set_oldestjobcreationtime(jta.startTime); - m_payload.set_retrievejobstotalsize(jta.size); + for (auto & sd: shardsDumps) { + for (auto & sjd: sd) { + if (sjd.address == jta.retrieveRequestAddress) + goto found; } - auto * j = m_payload.add_retrievejobs(); - j->set_address(jta.retrieveRequestAddress); - j->set_size(jta.size); - j->set_copynb(jta.copyNb); - j->set_fseq(jta.fSeq); - j->set_priority(jta.policy.retrievePriority); - j->set_minretrieverequestage(jta.policy.retrieveMinRequestAge); - j->set_maxdrivesallowed(jta.policy.maxDrivesAllowed); - // move the the new job in the right spot on the queue. - // i points to the newly added job all the time. - size_t i=m_payload.retrievejobs_size() - 1; - while (i > 0 && m_payload.retrievejobs(i).fseq() < m_payload.retrievejobs(i - 1).fseq()) { - m_payload.mutable_retrievejobs()->SwapElements(i-1, i); - i--; - } - // Keep track of this addition. - ret.files++; - ret.bytes+=jta.size; } - skipInsertion:; + jobsToReallyAdd.emplace_back(jta); + ret.bytes += jta.fileSize; + ret.files++; + found:; } - if (ret.files) commit(); + + // We can now proceed with the standard addition. + addJobsAndCommit(jobsToReallyAdd, agentReference, lc); return ret; } -cta::objectstore::RetrieveQueue::JobsSummary cta::objectstore::RetrieveQueue::getJobsSummary() { + +RetrieveQueue::JobsSummary RetrieveQueue::getJobsSummary() { checkPayloadReadable(); JobsSummary ret; ret.bytes = m_payload.retrievejobstotalsize(); - ret.files = m_payload.retrievejobs_size(); + ret.files = m_payload.retrievejobscount(); ret.oldestJobStartTime = m_payload.oldestjobcreationtime(); if (ret.files) { ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ret.maxDrivesAllowed = maxDriveAllowedMap.maxValue(); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ret.priority = priorityMap.maxValue(); - ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); - ret.minArchiveRequestAge = minArchiveRequestAgeMap.minValue(); + ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + ret.minRetrieveRequestAge = minRetrieveRequestAgeMap.minValue(); } else { ret.maxDrivesAllowed = 0; ret.priority = 0; - ret.minArchiveRequestAge = 0; + ret.minRetrieveRequestAge = 0; } return ret; } -auto cta::objectstore::RetrieveQueue::dumpAndFetchRetrieveRequests() - -> std::list<RetrieveRequestDump> { +auto RetrieveQueue::dumpJobs() -> std::list<JobDump> { checkPayloadReadable(); - std::list<RetrieveRequestDump> ret; - auto & rjl = m_payload.retrievejobs(); - for (auto rj=rjl.begin(); rj!=rjl.end(); rj++) { + // Go read the shards in parallel... + std::list<JobDump> ret; + std::list<RetrieveQueueShard> shards; + std::list<std::unique_ptr<RetrieveQueueShard::AsyncLockfreeFetcher>> shardsFetchers; + for (auto & sa: m_payload.retrievequeueshards()) { + shards.emplace_back(RetrieveQueueShard(sa.address(), m_objectStore)); + shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch()); + } + auto s = shards.begin(); + auto sf = shardsFetchers.begin(); + while (s != shards.end()) { try { - cta::objectstore::RetrieveRequest retrieveRequest(rj->address(),m_objectStore); - objectstore::ScopedSharedLock rtfrl(retrieveRequest); - retrieveRequest.fetch(); - ret.push_back(RetrieveRequestDump()); - auto & retReq = ret.back(); - retReq.retrieveRequest = retrieveRequest.getSchedulerRequest(); - retReq.criteria = retrieveRequest.getRetrieveFileQueueCriteria(); - retReq.activeCopyNb = retrieveRequest.getActiveCopyNumber(); - } catch (cta::exception::Exception &) {} + (*sf)->wait(); + } catch (Backend::NoSuchObject & ex) { + // We are possibly in read only mode, so we cannot rebuild. + // Just skip this shard. + goto nextShard; + } + for (auto & j: s->dumpJobs()) { + ret.emplace_back(JobDump{j.address, j.copyNb, j.size}); + } + nextShard: + s++; sf++; } return ret; } -auto cta::objectstore::RetrieveQueue::dumpJobs() -> std::list<JobDump> { +auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip) -> CandidateJobList { checkPayloadReadable(); - std::list<JobDump> ret; - auto & rjl = m_payload.retrievejobs(); - for (auto rj=rjl.begin(); rj!=rjl.end(); rj++) { - ret.push_back(JobDump()); - auto & b=ret.back(); - b.copyNb = rj->copynb(); - b.address = rj->address(); - b.size = rj->size(); - } - return ret; -} - -auto cta::objectstore::RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip) -> CandidateJobList { CandidateJobList ret; - ret.remainingBytesAfterCandidates = m_payload.retrievejobstotalsize(); - ret.remainingFilesAfterCandidates = m_payload.retrievejobs_size(); - for (auto & j: m_payload.retrievejobs()) { - if (!retrieveRequestsToSkip.count(j.address())) { - ret.candidates.push_back({j.address(), (uint16_t)j.copynb(), j.size()}); - ret.candidateBytes += j.size(); - ret.candidateFiles ++; + for (auto & rqsp: m_payload.retrievequeueshards()) { + // We need to go through all shard poiters unconditionnaly to count what is left (see else part) + if (ret.candidateBytes < maxBytes && ret.candidateFiles < maxFiles) { + // Fetch the shard + RetrieveQueueShard rqs(rqsp.address(), m_objectStore); + rqs.fetchNoLock(); + auto shardCandidates = rqs.getCandidateJobList(maxBytes - ret.candidateBytes, maxFiles - ret.candidateFiles, retrieveRequestsToSkip); + ret.candidateBytes += shardCandidates.candidateBytes; + ret.candidateFiles += shardCandidates.candidateFiles; + // We overwrite the remaining values each time as the previous + // shards have exhaustied their candidate lists. + ret.remainingBytesAfterCandidates = shardCandidates.remainingBytesAfterCandidates; + ret.remainingFilesAfterCandidates = shardCandidates.remainingFilesAfterCandidates; + ret.candidates.splice(ret.candidates.end(), shardCandidates.candidates); + } else { + // We are done with finding candidates. We just need to count what is left in the non-visited shards. + ret.remainingBytesAfterCandidates += rqsp.shardbytescount(); + ret.remainingFilesAfterCandidates += rqsp.shardjobscount(); } - ret.remainingBytesAfterCandidates -= j.size(); - ret.remainingFilesAfterCandidates--; - if (ret.candidateBytes >= maxBytes || ret.candidateFiles >= maxFiles) break; } return ret; } -void cta::objectstore::RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& requestsToRemove) { +void RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemove) { checkPayloadWritable(); ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); - auto * jl = m_payload.mutable_retrievejobs(); - bool jobRemoved=false; - for (auto &rrt: requestsToRemove) { - bool found=false; - do { - found=false; - // Push the found entry all the way to the end. - for (size_t i=0; i<(size_t)jl->size(); i++) { - if (jl->Get(i).address() == rrt) { - found = jobRemoved = true; - // Keep track of the mounting criteria - maxDriveAllowedMap.decCount(jl->Get(i).maxdrivesallowed()); - priorityMap.decCount(jl->Get(i).priority()); - minRetrieveRequestAgeMap.decCount(jl->Get(i).minretrieverequestage()); - m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() - jl->Get(i).size()); - while (i+1 < (size_t)jl->size()) { - jl->SwapElements(i, i+1); - i++; + // Make a working copy of the jobs to remove. We will progressively trim this local list. + auto localJobsToRemove = jobsToRemove; + // The jobs are expected to be removed from the front shards first (poped in order) + // Remove jobs until there are no more jobs or no more shards. + ssize_t shardIndex=0; + auto * mutableRetrieveQueueShards= m_payload.mutable_retrievequeueshards(); + while (localJobsToRemove.size() && shardIndex < mutableRetrieveQueueShards->size()) { + auto * shardPointer = mutableRetrieveQueueShards->Mutable(shardIndex); + // Get hold of the shard + RetrieveQueueShard rqs(shardPointer->address(), m_objectStore); + m_exclusiveLock->includeSubObject(rqs); + rqs.fetch(); + // Remove jobs from shard + auto removalResult = rqs.removeJobs(localJobsToRemove); + // If the shard is drained, remove, otherwise commit. We update the pointer afterwards. + if (removalResult.jobsAfter) { + rqs.commit(); + } else { + rqs.remove(); + } + // We still need to update the tracking queue side. + // Update stats and remove the jobs from the todo list. + for (auto & j: removalResult.removedJobs) { + maxDriveAllowedMap.decCount(j.maxDrivesAllowed); + priorityMap.decCount(j.priority); + minRetrieveRequestAgeMap.decCount(j.minRetrieveRequestAge); + } + // In all cases, we should update the global statistics. + m_payload.set_retrievejobscount(m_payload.retrievejobscount() - removalResult.jobsRemoved); + m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() - removalResult.bytesRemoved); + // If the shard is still around, we shall update its pointer's stats too. + if (removalResult.jobsAfter) { + // Also update the shard pointers's stats. In case of mismatch, we will trigger a rebuild. + shardPointer->set_shardbytescount(shardPointer->shardbytescount() - removalResult.bytesRemoved); + shardPointer->set_shardjobscount(shardPointer->shardjobscount() - removalResult.jobsRemoved); + if (shardPointer->shardbytescount() != removalResult.bytesAfter + || shardPointer->shardjobscount() != removalResult.jobsAfter) { + rebuild(); + } + // We will commit when exiting anyway... + shardIndex++; + } else { + // Shard's gone, so should the pointer. Push it to the end of the queue and + // trim it. + for (auto i=shardIndex; i<mutableRetrieveQueueShards->size()-1; i++) { + mutableRetrieveQueueShards->SwapElements(i, i+1); + } + mutableRetrieveQueueShards->RemoveLast(); + } + // We should also trim the removed jobs from our list. + localJobsToRemove.remove_if( + [&removalResult](const std::string & ja){ + return std::count_if(removalResult.removedJobs.begin(), removalResult.removedJobs.end(), + [&ja](RetrieveQueueShard::JobInfo & j) { + return j.address == ja; } - break; - } + ); } - // and remove it - if (found) - jl->RemoveLast(); - } while (found); + ); // end of remove_if + // And commit the queue (once per shard should not hurt performance). + commit(); } - if (jobRemoved) commit(); } -void cta::objectstore::RetrieveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, +void RetrieveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, cta::catalogue::Catalogue & catalogue) { throw cta::exception::Exception("In RetrieveQueue::garbageCollect(): not implemented"); } + +}} // namespace cta::objectstore diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index 89dd673c3c219f461115b1a05f24ebdbabff77aa..8f68b7fa1db3a3e30ec440ca4349c4ccce1a6138 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -37,6 +37,15 @@ public: RetrieveQueue(GenericObject & go); void initialize(const std::string & vid); void commit(); + +private: + // Validates all summaries are in accordance with each other. + bool checkMapsAndShardsCoherency(); + + // Rebuild from shards if something goes wrong. + void rebuild(); +public: + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, cta::catalogue::Catalogue & catalogue) override; bool isEmpty(); @@ -48,29 +57,29 @@ public: struct JobToAdd { uint64_t copyNb; uint64_t fSeq; - const std::string retrieveRequestAddress; - uint64_t size; - const cta::common::dataStructures::MountPolicy policy; + std::string retrieveRequestAddress; + uint64_t fileSize; + cta::common::dataStructures::MountPolicy policy; time_t startTime; }; - void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd); + void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc); /// This version will check for existence of the job in the queue before // returns the count and sizes of actually added jobs (if any). struct AdditionSummary { uint64_t files = 0; uint64_t bytes = 0; }; - AdditionSummary addJobsIfNecessaryAndCommit(std::list<JobToAdd> & jobsToAdd); + AdditionSummary addJobsIfNecessaryAndCommit(std::list<JobToAdd> & jobsToAdd, + AgentReference & agentReference, log::LogContext & lc); struct JobsSummary { uint64_t files; uint64_t bytes; time_t oldestJobStartTime; uint64_t priority; - uint64_t minArchiveRequestAge; + uint64_t minRetrieveRequestAge; uint64_t maxDrivesAllowed; }; JobsSummary getJobsSummary(); - std::list<RetrieveRequestDump> dumpAndFetchRetrieveRequests(); struct JobDump { std::string address; uint16_t copyNb; @@ -88,9 +97,17 @@ public: // which still should be removed from the queue. They will be disregarded from listing. CandidateJobList getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip); - void removeJobsAndCommit(const std::list<std::string> & requestsToRemove); + void removeJobsAndCommit(const std::list<std::string> & jobsToRemove); // -- Generic parameters std::string getVid(); + + + // The shard size. From experience, 100k is where we start to see performance difference, + // but nothing prevents us from using a smaller size. + // The performance will be roughly flat until the queue size reaches the square of this limit + // (meaning the queue object updates start to take too much time). + // with this current value of 25k, the performance should be roughly flat until 25k^2=625M. + static const uint64_t c_maxShardSize = 25000; }; }} diff --git a/objectstore/RetrieveQueueShard.cpp b/objectstore/RetrieveQueueShard.cpp new file mode 100644 index 0000000000000000000000000000000000000000..771fff1cc245140dd7f3b53fedeb7c4cd36e9aa4 --- /dev/null +++ b/objectstore/RetrieveQueueShard.cpp @@ -0,0 +1,191 @@ + +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "RetrieveQueueShard.hpp" +#include "GenericObject.hpp" +#include <google/protobuf/util/json_util.h> + + + +namespace cta { namespace objectstore { + +RetrieveQueueShard::RetrieveQueueShard(Backend& os): + ObjectOps<serializers::RetrieveQueueShard, serializers::RetrieveQueueShard_t>(os) { } + +RetrieveQueueShard::RetrieveQueueShard(const std::string& address, Backend& os): + ObjectOps<serializers::RetrieveQueueShard, serializers::RetrieveQueueShard_t>(os, address) { } + +RetrieveQueueShard::RetrieveQueueShard(GenericObject& go): + ObjectOps<serializers::RetrieveQueueShard, serializers::RetrieveQueueShard_t>(go.objectStore()) { + // Here we transplant the generic object into the new object + go.transplantHeader(*this); + // And interpret the header. + getPayloadFromHeader(); +} + +void RetrieveQueueShard::rebuild() { + checkPayloadWritable(); + uint64_t totalSize=0; + for (auto j: m_payload.retrievejobs()) { + totalSize += j.size(); + } + m_payload.set_retrievejobstotalsize(totalSize); +} + +std::string RetrieveQueueShard::dump() { + checkPayloadReadable(); + google::protobuf::util::JsonPrintOptions options; + options.add_whitespace = true; + options.always_print_primitive_fields = true; + std::string headerDump; + google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options); + return headerDump; +} + +void RetrieveQueueShard::garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) { + throw exception::Exception("In RetrieveQueueShard::garbageCollect(): garbage collection should not be necessary for this type of object."); +} + +RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip) { + checkPayloadReadable(); + RetrieveQueue::CandidateJobList ret; + ret.remainingBytesAfterCandidates = m_payload.retrievejobstotalsize(); + ret.remainingFilesAfterCandidates = m_payload.retrievejobs_size(); + for (auto & j: m_payload.retrievejobs()) { + if (!retrieveRequestsToSkip.count(j.address())) { + ret.candidates.push_back({j.address(), (uint16_t)j.copynb(), j.size()}); + ret.candidateBytes += j.size(); + ret.candidateFiles ++; + } + ret.remainingBytesAfterCandidates -= j.size(); + ret.remainingFilesAfterCandidates--; + if (ret.candidateBytes >= maxBytes || ret.candidateFiles >= maxFiles) break; + } + return ret; +} + +auto RetrieveQueueShard::removeJobs(const std::list<std::string>& jobsToRemove) -> RemovalResult { + checkPayloadWritable(); + RemovalResult ret; + uint64_t totalSize = m_payload.retrievejobstotalsize(); + auto * jl=m_payload.mutable_retrievejobs(); + for (auto &rrt: jobsToRemove) { + bool found = false; + do { + found = false; + // Push the found entry all the way to the end. + for (size_t i=0; i<(size_t)jl->size(); i++) { + if (jl->Get(i).address() == rrt) { + found = true; + const auto & j = jl->Get(i); + ret.removedJobs.emplace_back(JobInfo()); + ret.removedJobs.back().address = j.address(); + ret.removedJobs.back().copyNb = j.copynb(); + ret.removedJobs.back().maxDrivesAllowed = j.maxdrivesallowed(); + ret.removedJobs.back().minRetrieveRequestAge = j.minretrieverequestage(); + ret.removedJobs.back().priority = j.priority(); + ret.removedJobs.back().size = j.size(); + ret.removedJobs.back().startTime = j.starttime(); + ret.bytesRemoved += j.size(); + totalSize -= j.size(); + ret.jobsRemoved++; + m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() - j.size()); + while (i+1 < (size_t)jl->size()) { + jl->SwapElements(i, i+1); + i++; + } + break; + } + } + // and remove it + if (found) + jl->RemoveLast(); + } while (found); + } + ret.bytesAfter = totalSize; + ret.jobsAfter = m_payload.retrievejobs_size(); + return ret; +} + +void RetrieveQueueShard::initialize(const std::string& owner) { + ObjectOps<serializers::RetrieveQueueShard, serializers::RetrieveQueueShard_t>::initialize(); + setOwner(owner); + setBackupOwner(owner); + m_payload.set_retrievejobstotalsize(0); + m_payloadInterpreted=true; +} + +auto RetrieveQueueShard::dumpJobs() -> std::list<JobInfo> { + checkPayloadReadable(); + std::list<JobInfo> ret; + for (auto &j: m_payload.retrievejobs()) { + ret.emplace_back(JobInfo{j.size(), j.address(), (uint16_t)j.copynb(), j.priority(), + j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq()}); + } + return ret; +} + +std::list<RetrieveQueue::JobToAdd> RetrieveQueueShard::dumpJobsToAdd() { + checkPayloadReadable(); + std::list<RetrieveQueue::JobToAdd> ret; + for (auto &j: m_payload.retrievejobs()) { + ret.emplace_back(RetrieveQueue::JobToAdd()); + ret.back().copyNb = j.copynb(); + ret.back().fSeq = j.fseq(); + ret.back().fileSize = j.size(); + ret.back().policy.retrieveMinRequestAge = j.minretrieverequestage(); + ret.back().policy.maxDrivesAllowed = j.maxdrivesallowed(); + ret.back().policy.retrievePriority = j.priority(); + ret.back().startTime = j.starttime(); + ret.back().retrieveRequestAddress = j.address(); + } + return ret; +} + + +auto RetrieveQueueShard::getJobsSummary() -> JobsSummary { + checkPayloadReadable(); + JobsSummary ret; + ret.bytes = m_payload.retrievejobstotalsize(); + ret.jobs = m_payload.retrievejobs_size(); + ret.minFseq = m_payload.retrievejobs(0).fseq(); + ret.maxFseq = m_payload.retrievejobs(m_payload.retrievejobs_size()-1).fseq(); + return ret; +} + +uint64_t RetrieveQueueShard::addJob(RetrieveQueue::JobToAdd& jobToAdd) { + checkPayloadWritable(); + auto * j = m_payload.add_retrievejobs(); + j->set_address(jobToAdd.retrieveRequestAddress); + j->set_size(jobToAdd.fileSize); + j->set_copynb(jobToAdd.copyNb); + j->set_fseq(jobToAdd.fSeq); + j->set_starttime(jobToAdd.startTime); + j->set_maxdrivesallowed(jobToAdd.policy.maxDrivesAllowed); + j->set_priority(jobToAdd.policy.retrievePriority); + j->set_minretrieverequestage(jobToAdd.policy.retrieveMinRequestAge); + j->set_starttime(jobToAdd.startTime); + m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize()+jobToAdd.fileSize); + return m_payload.retrievejobs_size(); +} + + + + +}} \ No newline at end of file diff --git a/objectstore/RetrieveQueueShard.hpp b/objectstore/RetrieveQueueShard.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e3c706510aaa279b7f01fc18c4514f0af7e714bd --- /dev/null +++ b/objectstore/RetrieveQueueShard.hpp @@ -0,0 +1,96 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "RetrieveQueue.hpp" + +namespace cta { namespace objectstore { + +class RetrieveQueueShard: public ObjectOps<serializers::RetrieveQueueShard, serializers::RetrieveQueueShard_t> { +public: + // Constructor with undefined address + RetrieveQueueShard(Backend & os); + + // Constructor + RetrieveQueueShard(const std::string & address, Backend & os); + + // Upgrader form generic object + RetrieveQueueShard(GenericObject & go); + + // Forbid/hide base initializer + void initialize() = delete; + + // Initializer + void initialize(const std::string & owner); + + // dumper + std::string dump(); + + void garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) override; + + struct JobInfo { + uint64_t size; + std::string address; + uint16_t copyNb; + uint64_t priority; + uint64_t minRetrieveRequestAge; + uint64_t maxDrivesAllowed; + time_t startTime; + uint64_t fSeq; + }; + std::list<JobInfo> dumpJobs(); + + /** Variant function allowing shard to shard job transfer (not needed) archives, + * which do not split. */ + std::list<RetrieveQueue::JobToAdd> dumpJobsToAdd(); + + struct JobsSummary { + uint64_t jobs; + uint64_t bytes; + uint64_t minFseq; + uint64_t maxFseq; + }; + JobsSummary getJobsSummary(); + + /** + * adds job, returns new size + */ + uint64_t addJob(RetrieveQueue::JobToAdd & jobToAdd); + + + struct RemovalResult { + uint64_t jobsRemoved = 0; + uint64_t jobsAfter = 0; + uint64_t bytesRemoved = 0; + uint64_t bytesAfter = 0; + std::list<JobInfo> removedJobs; + }; + /** + * Removes jobs from shard (and from the to remove list). Returns list of removed jobs. + */ + RemovalResult removeJobs(const std::list<std::string> & jobsToRemove); + + RetrieveQueue::CandidateJobList getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip); + + /** Re compute summaries in case they do not match the array content. */ + void rebuild(); + +}; + +}} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 6581a81d6af5bfe71fab22621011fc12a0d431a8..5cd915f6b2f43f0efecfbd668b6b5b756e1caa8c 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -142,7 +142,7 @@ jobFound:; std::list<RetrieveQueue::JobToAdd> jta; jta.push_back({bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(), mp, (signed)m_payload.schedulerrequest().entrylog().time()}); - rq.addJobsIfNecessaryAndCommit(jta); + rq.addJobsIfNecessaryAndCommit(jta, agentReference, lc); auto jobsSummary=rq.getJobsSummary(); auto queueUpdateTime = t.secs(utils::Timer::resetCounter); // We can now make the transition official diff --git a/objectstore/cta.proto b/objectstore/cta.proto index cb1029f42f4bdfce5146a5bf41be2b02ba04bbc1..0abdd7ebede63fc09dd7d42bdc288eb59289d4bd 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -391,7 +391,7 @@ message ArchiveQueueShard { message ArchiveQueue { required string tapepool = 10000; - repeated ArchiveQueueShardPointer archivequeuesshards = 10010; + repeated ArchiveQueueShardPointer archivequeueshards = 10010; repeated ValueCountPair prioritymap = 10031; repeated ValueCountPair minarchiverequestagemap = 10032; repeated ValueCountPair maxdrivesallowedmap = 10033; @@ -409,15 +409,30 @@ message RetrieveJobPointer { required uint64 priority = 3104; required uint64 minretrieverequestage = 3105; required uint64 maxdrivesallowed = 3106; + required uint64 starttime = 3108; +} + +message RetrieveQueueShardPointer { + required string address = 10400; + required uint64 shardjobscount = 10401; + required uint64 shardbytescount = 10402; + required uint64 minfseq = 10403; + required uint64 maxfseq = 10404; +} + +message RetrieveQueueShard { + repeated RetrieveJobPointer retrievejobs = 10500; + required uint64 retrievejobstotalsize = 10501; } message RetrieveQueue { required string vid = 10100; - repeated RetrieveJobPointer retrievejobs = 10110; + repeated RetrieveQueueShardPointer retrievequeueshards = 10111; repeated ValueCountPair prioritymap = 10131; repeated ValueCountPair minretrieverequestagemap = 10132; repeated ValueCountPair maxdrivesallowedmap = 10133; required uint64 retrievejobstotalsize = 10140; + required uint64 retrievejobscount = 10145; required uint64 oldestjobcreationtime = 10150; required uint64 mapsrebuildcount = 10160; } diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index d60f70f3f20708acd22f6f57ed2bae9d85135cfd..65eca28cb83958a6c72f5406760116a777e44f73 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -63,7 +63,7 @@ void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::special } jobAdded:; } - queue.addJobsAndCommit(jtal); + queue.addJobsAndCommit(jtal, agentReference, logContext); } template<> diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 120f8e0e9c2831ff566bf197672f59a8ec76e9fe..ae370346a3ab122ddb9133e9eb09b64a16ed9d0c 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -118,7 +118,7 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro m.oldestJobStartTime = aqueue.getJobsSummary().oldestJobStartTime; m.priority = aqueue.getJobsSummary().priority; m.maxDrivesAllowed = aqueue.getJobsSummary().maxDrivesAllowed; - m.minArchiveRequestAge = aqueue.getJobsSummary().minArchiveRequestAge; + m.minRequestAge = aqueue.getJobsSummary().minArchiveRequestAge; m.logicalLibrary = ""; } else { tmdi.queueTrimRequired = true; @@ -164,7 +164,7 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro m.oldestJobStartTime = rqueue.getJobsSummary().oldestJobStartTime; m.priority = rqueue.getJobsSummary().priority; m.maxDrivesAllowed = rqueue.getJobsSummary().maxDrivesAllowed; - m.minArchiveRequestAge = rqueue.getJobsSummary().minArchiveRequestAge; + m.minRequestAge = rqueue.getJobsSummary().minRetrieveRequestAge; m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller. } else { tmdi.queueTrimRequired = true; @@ -2555,7 +2555,7 @@ void OStoreDB::RetrieveJob::fail(log::LogContext &logContext) { auto sr = m_retrieveRequest.getSchedulerRequest(); std::list<objectstore::RetrieveQueue::JobToAdd> jta; jta.push_back({bestCopyNb, tf.fSeq, m_retrieveRequest.getAddressIfSet(), af.fileSize, rfqc.mountPolicy, sr.creationLog.time}); - rq.addJobsIfNecessaryAndCommit(jta); + rq.addJobsIfNecessaryAndCommit(jta, *m_oStoreDB.m_agentReference, logContext); m_retrieveRequest.setOwner(rq.getAddressIfSet()); m_retrieveRequest.commit(); // We do not own the request anymore diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index e954a66d82bf74c0e90af15de5286271341f85cf..93df03ffa7fa9a0d9665ae06c2bf553f6208138d 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -541,7 +541,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T mountPassesACriteria = true; if (m->filesQueued / (1 + effectiveExistingMounts) >= m_minFilesToWarrantAMount) mountPassesACriteria = true; - if (!effectiveExistingMounts && ((time(NULL) - m->oldestJobStartTime) > m->minArchiveRequestAge)) + if (!effectiveExistingMounts && ((time(NULL) - m->oldestJobStartTime) > m->minRequestAge)) mountPassesACriteria = true; if (!mountPassesACriteria || existingMounts >= m->maxDrivesAllowed) { log::ScopedParamContainer params(lc); @@ -556,7 +556,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("minArchiveRequestAge", m->minRequestAge) .add("existingMounts", existingMounts) .add("maxDrivesAllowed", m->maxDrivesAllowed); lc.log(log::DEBUG, "In Scheduler::sortAndGetTapesForMountInfo(): Removing potential mount not passing criteria"); @@ -576,7 +576,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("minArchiveRequestAge", m->minRequestAge) .add("existingMounts", existingMounts) .add("maxDrivesAllowed", m->maxDrivesAllowed) .add("ratioOfMountQuotaUsed", m->ratioOfMountQuotaUsed); @@ -666,7 +666,7 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("minArchiveRequestAge", m->minRequestAge) .add("getMountInfoTime", getMountInfoTime) .add("getTapeInfoTime", getTapeInfoTime) .add("candidateSortingTime", candidateSortingTime) @@ -700,7 +700,7 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("minArchiveRequestAge", m->minRequestAge) .add("getMountInfoTime", getMountInfoTime) .add("getTapeInfoTime", getTapeInfoTime) .add("candidateSortingTime", candidateSortingTime) @@ -813,7 +813,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("minArchiveRequestAge", m->minRequestAge) .add("getMountInfoTime", getMountInfoTime) .add("queueTrimingTime", queueTrimingTime) .add("getTapeInfoTime", getTapeInfoTime) @@ -871,7 +871,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) .add("oldestJobAge", time(NULL) - m->oldestJobStartTime) - .add("minArchiveRequestAge", m->minArchiveRequestAge) + .add("minArchiveRequestAge", m->minRequestAge) .add("getMountInfoTime", getMountInfoTime) .add("queueTrimingTime", queueTrimingTime) .add("getTapeInfoTime", getTapeInfoTime) @@ -928,7 +928,7 @@ std::list<common::dataStructures::QueueAndMountSummary> Scheduler::getQueuesAndM switch (pm.type) { case common::dataStructures::MountType::Archive: summary.mountPolicy.archivePriority = pm.priority; - summary.mountPolicy.archiveMinRequestAge = pm.minArchiveRequestAge; + summary.mountPolicy.archiveMinRequestAge = pm.minRequestAge; summary.mountPolicy.maxDrivesAllowed = pm.maxDrivesAllowed; summary.bytesQueued = pm.bytesQueued; summary.filesQueued = pm.filesQueued; @@ -936,7 +936,7 @@ std::list<common::dataStructures::QueueAndMountSummary> Scheduler::getQueuesAndM break; case common::dataStructures::MountType::Retrieve: // TODO: we should remove the retrieveMinRequestAge if it's redundant, or rename pm.minArchiveRequestAge. - summary.mountPolicy.retrieveMinRequestAge = pm.minArchiveRequestAge; + summary.mountPolicy.retrieveMinRequestAge = pm.minRequestAge; summary.mountPolicy.retrievePriority = pm.priority; summary.mountPolicy.maxDrivesAllowed = pm.maxDrivesAllowed; summary.bytesQueued = pm.bytesQueued; diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 6fe191d78599606b58297bf865e6b492f5df5769..7283b79e4848526cc4d0879e96d40acf8770c5f0 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -352,7 +352,7 @@ public: uint64_t maxDrivesAllowed; /**< The maximum number of drives allowed for this * tape pool, defined as the highest value amongst * jobs */ - time_t minArchiveRequestAge; /**< The maximum amount of time to wait before + time_t minRequestAge; /**< The maximum amount of time to wait before * forcing a mount in the absence of enough data. * Defined as the smallest value amongst jobs.*/ uint64_t filesQueued; /**< The number of files queued for this queue */