diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp index d980678369bac1987caa442665750ca6d65a910f..008313b5af1c6d196dd040b59c631ea112e9ae4c 100644 --- a/objectstore/GenericObject.cpp +++ b/objectstore/GenericObject.cpp @@ -27,6 +27,7 @@ #include "ArchiveQueue.hpp" #include "ArchiveQueueShard.hpp" #include "RetrieveQueue.hpp" +#include "RetrieveQueueShard.hpp" #include "DriveRegister.hpp" #include <stdexcept> #include <google/protobuf/util/json_util.h> @@ -189,6 +190,9 @@ std::string GenericObject::dump() { case serializers::RetrieveQueue_t: bodyDump = dumpWithType<cta::objectstore::RetrieveQueue>(this); break; + case serializers::RetrieveQueueShard_t: + bodyDump = dumpWithType<cta::objectstore::RetrieveQueueShard>(this); + break; case serializers::ArchiveRequest_t: bodyDump = dumpWithType<ArchiveRequest>(this); break; diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index 5626d6137913541df4d338f6556c2a541fdbb154..5f19a47d0f96e087785c7bcc58072e82cf444f51 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -48,6 +48,7 @@ void RetrieveQueue::initialize(const std::string &vid) { m_payload.set_retrievejobscount(0); m_payload.set_vid(vid); m_payload.set_mapsrebuildcount(0); + m_payload.set_maxshardsize(m_maxShardSize); m_payloadInterpreted = true; } @@ -186,6 +187,11 @@ void RetrieveQueue::commit() { ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::commit(); } +void RetrieveQueue::getPayloadFromHeader() { + ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::getPayloadFromHeader(); + m_maxShardSize = m_payload.maxshardsize(); +} + bool RetrieveQueue::isEmpty() { checkPayloadReadable(); return !m_payload.retrievejobstotalsize() && !m_payload.retrievequeueshards_size(); @@ -217,34 +223,17 @@ std::string RetrieveQueue::dump() { return headerDump; } -namespace { -struct ShardForAddition { - bool newShard=false; - bool creationDone=false; - bool splitDone=false; - bool toSplit=false; - ShardForAddition * splitDestination = nullptr; - bool fromSplit=false; - ShardForAddition * splitSource = nullptr; - std::string address; - uint64_t minFseq; - uint64_t maxFseq; - uint64_t jobsCount; - std::list<RetrieveQueue::JobToAdd> jobsToAdd; - size_t shardIndex = std::numeric_limits<size_t>::max(); -}; - -void updateShardLimits(uint64_t fSeq, ShardForAddition & sfa) { +void RetrieveQueue::updateShardLimits(uint64_t fSeq, ShardForAddition & sfa) { if (fSeq < sfa.minFseq) sfa.minFseq=fSeq; if (fSeq > sfa.maxFseq) sfa.maxFseq=fSeq; } /** Add a jobs to a shard, spliting it if necessary*/ -void addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd, +void RetrieveQueue::addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd, std::list<ShardForAddition>::iterator & shardForAddition, std::list<ShardForAddition> & shardList) { // Is the shard still small enough? We will not double split shards (we suppose insertion size << shard size cap). // We will also no split a new shard. - if ( shardForAddition->jobsCount < RetrieveQueue::c_maxShardSize + if ( shardForAddition->jobsCount < m_maxShardSize || shardForAddition->fromSplit || shardForAddition->newShard) { // We just piggy back here. No need to increase range, we are within it. shardForAddition->jobsCount++; @@ -256,21 +245,22 @@ void addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd, // Create the new shard auto newSfa = shardList.insert(shardForAddition, ShardForAddition()); // The new shard size can only be estimated, but we will update it to the actual value as we update the shard. + // The new shard is inserted before the old one, so the old one will keep the high + // half and new shard gets the bottom half. uint64_t shardRange = shardForAddition->maxFseq - shardForAddition->minFseq; - uint64_t oldMax = shardForAddition->maxFseq; - shardForAddition->maxFseq = shardForAddition->minFseq + shardRange/2; - shardForAddition->jobsCount = shardForAddition->jobsCount/2; - shardForAddition->toSplit = true; - shardForAddition->splitDestination = &*newSfa; - newSfa->minFseq = shardForAddition->maxFseq+1; - newSfa->maxFseq = oldMax; - newSfa->jobsCount = shardForAddition->jobsCount; + newSfa->minFseq = shardForAddition->minFseq; + newSfa->maxFseq = shardForAddition->minFseq + shardRange/2; + newSfa->jobsCount = shardForAddition->jobsCount/2; newSfa->splitSource = &*shardForAddition; newSfa->fromSplit = true; newSfa->newShard = true; + shardForAddition->minFseq = shardForAddition->minFseq + shardRange/2 + 1; + shardForAddition->jobsCount = shardForAddition->jobsCount/2; + shardForAddition->toSplit = true; + shardForAddition->splitDestination = &*newSfa; // Transfer jobs to add to new shard if needed for (auto jta2=shardForAddition->jobsToAdd.begin(); jta2!=shardForAddition->jobsToAdd.end();) { - if (jta2->fSeq >= newSfa->minFseq) { + if (jta2->fSeq <= newSfa->maxFseq) { newSfa->jobsToAdd.emplace_back(*jta2); jta2 = shardForAddition->jobsToAdd.erase(jta2); } else { @@ -278,22 +268,21 @@ void addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd, } } // We can finally add our job to one of the two shards from the split - if (jobToAdd.fSeq <= shardForAddition->maxFseq) { + if (jobToAdd.fSeq >= shardForAddition->minFseq) { shardForAddition->jobsToAdd.emplace_back(jobToAdd); shardForAddition->jobsCount++; updateShardLimits(jobToAdd.fSeq, *shardForAddition); } else { newSfa->jobsToAdd.emplace_back(jobToAdd); newSfa->jobsCount++; - updateShardLimits(jobToAdd.fSeq, *shardForAddition); + updateShardLimits(jobToAdd.fSeq, *newSfa); } } } -} // anonymous namespace - void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc) { checkPayloadWritable(); + if (jobsToAdd.empty()) return; // Keep track of the mounting criteria ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); @@ -349,8 +338,12 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer } // Find where the job lands in the shards for (auto sfa=shardsForAddition.begin(); sfa != shardsForAddition.end(); sfa++) { - // Is it within this shard? - if (jta.fSeq >= sfa->minFseq && jta.fSeq <= sfa->maxFseq) { + if (sfa->minFseq > jta.fSeq) { + // Are we before the current shard? (for example, before first shard) + addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition); + goto jobInserted; + } else if (jta.fSeq >= sfa->minFseq && jta.fSeq <= sfa->maxFseq) { + // Is it within this shard? addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition); goto jobInserted; } else if (sfa != shardsForAddition.end() && std::next(sfa) != shardsForAddition.end()) { @@ -364,10 +357,6 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer } goto jobInserted; } - } else if (sfa->minFseq > jta.fSeq) { - // Are we before the current shard? (for example, before first shard) - addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition); - goto jobInserted; } else if (std::next(sfa) == shardsForAddition.end() && sfa->maxFseq < jta.fSeq) { // Are we after the last shard? addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition); @@ -376,7 +365,6 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer } // Still not inserted? Now we run out of options. Segfault to ease debugging. { - *((int *)nullptr) = 0; // TODO: remove in the long run. throw cta::exception::Exception("In RetrieveQueue::addJobsAndCommit(): could not find an appropriate shard for job"); } jobInserted:; @@ -394,8 +382,8 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer // TODO: shard creation and update could be parallelized (to some extent as we // have shard to shard dependencies with the splits), but as a first implementation // we just go iteratively. - uint64_t addedJobs = 0, addedBytes = 0; for (auto & shard: shardsForAddition) { + uint64_t addedJobs = 0, addedBytes = 0, transferedInSplitJobs = 0, transferedInSplitBytes = 0; // Variables which will allow the shard/pointer updates in all cases. cta::objectstore::serializers::RetrieveQueueShardPointer * shardPointer = nullptr, * splitFromShardPointer = nullptr; RetrieveQueueShard rqs(m_objectStore), rqsSplitFrom(m_objectStore); @@ -406,13 +394,15 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer rqs.setAddress(agentReference.nextId(shardName.str())); rqs.initialize(getAddressIfSet()); // We also need to create the pointer, and insert it to the right spot. - shardPointer = m_payload.add_retrievequeueshards(); + shardPointer = m_payload.mutable_retrievequeueshards()->Add(); // Pre-update the shard pointer. shardPointer->set_address(rqs.getAddressIfSet()); - shardPointer->set_minfseq(0); + shardPointer->set_maxfseq(0); shardPointer->set_minfseq(0); shardPointer->set_shardbytescount(0); shardPointer->set_shardjobscount(0); + shard.creationDone = true; + shard.address = rqs.getAddressIfSet(); // Move the shard pointer to its intended location. size_t currentShardPosition=m_payload.retrievequeueshards_size() - 1; while (currentShardPosition != shard.shardIndex) { @@ -432,16 +422,27 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer for (auto &j: jobsFromSource) { if (j.fSeq >= shard.minFseq && j.fSeq <= shard.maxFseq) { rqs.addJob(j); + addedJobs++; + addedBytes+=j.fileSize; jobsToTransferAddresses.emplace_back(j.retrieveRequestAddress); } } - rqsSplitFrom.removeJobs(jobsToTransferAddresses); - auto splitFromShardSummary = rqsSplitFrom.getJobsSummary(); - splitFromShardPointer->set_maxfseq(splitFromShardSummary.maxFseq); - splitFromShardPointer->set_minfseq(splitFromShardSummary.minFseq); - splitFromShardPointer->set_shardbytescount(splitFromShardSummary.bytes); - splitFromShardPointer->set_shardjobscount(splitFromShardSummary.jobs); + auto removalResult = rqsSplitFrom.removeJobs(jobsToTransferAddresses); + transferedInSplitBytes += removalResult.bytesRemoved; + transferedInSplitJobs += removalResult.jobsRemoved; + // We update the shard pointer with fseqs to allow validations, but the actual + //values will be updated as the shard itself is populated. + shardPointer->set_maxfseq(shard.maxFseq); + shardPointer->set_minfseq(shard.minFseq); + shardPointer->set_shardjobscount(shard.jobsCount); + shardPointer->set_shardbytescount(1); + splitFromShardPointer->set_minfseq(shard.splitSource->minFseq); + splitFromShardPointer->set_maxfseq(shard.splitSource->maxFseq); + splitFromShardPointer->set_shardjobscount(shard.splitSource->jobsCount); + shardPointer->set_shardbytescount(1); // We are all set (in memory) for the shard from which we split. + shard.splitDone = true; + shard.splitSource->splitDone = true; } // We can now fill up the shard (outside of this if/else). } else { @@ -468,11 +469,24 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer // ... and finally commit the queue (first! there is potentially a new shard to // pre-reference before inserting) and shards as is appropriate. // Update global summaries - m_payload.set_retrievejobscount(m_payload.retrievejobscount() + addedJobs); - m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + addedBytes); - commit(); - if (shard.newShard) + m_payload.set_retrievejobscount(m_payload.retrievejobscount() + addedJobs - transferedInSplitJobs); + m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + addedBytes - transferedInSplitBytes); + // If we are creating a new shard, we have to do a blind commit: the + // stats for shard we are splitting from could not be accounted for properly + // and the new shard is not yet inserted yet. + if (shard.fromSplit) + ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::commit(); + else { + // in other cases, we should have a coherent state. + commit(); + } + shard.comitted = true; + + if (shard.newShard) { rqs.insert(); + if (shard.fromSplit) + rqsSplitFrom.commit(); + } else rqs.commit(); } } @@ -680,4 +694,15 @@ void RetrieveQueue::garbageCollect(const std::string &presumedOwner, AgentRefere throw cta::exception::Exception("In RetrieveQueue::garbageCollect(): not implemented"); } +void RetrieveQueue::setShardSize(uint64_t shardSize) { + checkPayloadWritable(); + m_payload.set_maxshardsize(shardSize); +} + +uint64_t RetrieveQueue::getShardCount() { + checkPayloadReadable(); + return m_payload.retrievequeueshards_size(); +} + + }} // namespace cta::objectstore diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index 8f68b7fa1db3a3e30ec440ca4349c4ccce1a6138..98cd110f15a0b68a1c538bf9c048a6d52206cf45 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -37,6 +37,7 @@ public: RetrieveQueue(GenericObject & go); void initialize(const std::string & vid); void commit(); + void getPayloadFromHeader() override; private: // Validates all summaries are in accordance with each other. @@ -101,13 +102,44 @@ public: // -- Generic parameters std::string getVid(); +private: + struct ShardForAddition { + bool newShard=false; + bool creationDone=false; + bool splitDone=false; + bool toSplit=false; + bool comitted=false; + ShardForAddition * splitDestination = nullptr; + bool fromSplit=false; + ShardForAddition * splitSource = nullptr; + std::string address; + uint64_t minFseq; + uint64_t maxFseq; + uint64_t jobsCount; + std::list<RetrieveQueue::JobToAdd> jobsToAdd; + size_t shardIndex = std::numeric_limits<size_t>::max(); + }; + + void updateShardLimits(uint64_t fSeq, ShardForAddition & sfa); + + void addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd, + std::list<ShardForAddition>::iterator & shardForAddition, std::list<ShardForAddition> & shardList); +public: + /** Helper function for unit tests: use smaller shard size to validate ordered insertion */ + void setShardSize(uint64_t shardSize); + + /** Helper function for unit tests: validate that we have the expected number of shards */ + uint64_t getShardCount(); +private: // The shard size. From experience, 100k is where we start to see performance difference, // but nothing prevents us from using a smaller size. // The performance will be roughly flat until the queue size reaches the square of this limit // (meaning the queue object updates start to take too much time). // with this current value of 25k, the performance should be roughly flat until 25k^2=625M. - static const uint64_t c_maxShardSize = 25000; + static const uint64_t c_defaultMaxShardSize = 25000; + + uint64_t m_maxShardSize = c_defaultMaxShardSize; }; }} diff --git a/objectstore/RetrieveQueueShard.cpp b/objectstore/RetrieveQueueShard.cpp index 771fff1cc245140dd7f3b53fedeb7c4cd36e9aa4..82a5e0dc47cffbb9a1bcec7dab6ec0b33d76c49d 100644 --- a/objectstore/RetrieveQueueShard.cpp +++ b/objectstore/RetrieveQueueShard.cpp @@ -166,6 +166,8 @@ auto RetrieveQueueShard::getJobsSummary() -> JobsSummary { ret.jobs = m_payload.retrievejobs_size(); ret.minFseq = m_payload.retrievejobs(0).fseq(); ret.maxFseq = m_payload.retrievejobs(m_payload.retrievejobs_size()-1).fseq(); + if (ret.minFseq > ret.maxFseq) + throw cta::exception::Exception("In RetrieveQueueShard::getJobsSummary(): wrong shard ordering."); return ret; } @@ -182,6 +184,12 @@ uint64_t RetrieveQueueShard::addJob(RetrieveQueue::JobToAdd& jobToAdd) { j->set_minretrieverequestage(jobToAdd.policy.retrieveMinRequestAge); j->set_starttime(jobToAdd.startTime); m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize()+jobToAdd.fileSize); + // Sort the shard + size_t jobIndex = m_payload.retrievejobs_size() - 1; + while (jobIndex > 0 && m_payload.retrievejobs(jobIndex).fseq() < m_payload.retrievejobs(jobIndex-1).fseq()) { + m_payload.mutable_retrievejobs()->SwapElements(jobIndex-1, jobIndex); + jobIndex--; + } return m_payload.retrievejobs_size(); } diff --git a/objectstore/RetrieveQueueTest.cpp b/objectstore/RetrieveQueueTest.cpp index fff8a3fb8473bed8a57ddf353971806e0a88fb29..24b4ac1ecf97d1b056bb9dd52501701f86e5faf7 100644 --- a/objectstore/RetrieveQueueTest.cpp +++ b/objectstore/RetrieveQueueTest.cpp @@ -22,6 +22,8 @@ #include "AgentReference.hpp" #include "common/log/DummyLogger.hpp" +#include <random> + namespace unitTests { TEST(ObjectStore, RetrieveQueueBasicAccess) { @@ -31,7 +33,7 @@ TEST(ObjectStore, RetrieveQueueBasicAccess) { cta::objectstore::AgentReference agentRef("unitTest", dl); std::string retrieveQueueAddress = agentRef.nextId("RetrieveQueue"); { - // Try to create the tape entry + // Try to create the retrieve queue cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); rq.initialize("V12345"); rq.insert(); @@ -44,6 +46,103 @@ TEST(ObjectStore, RetrieveQueueBasicAccess) { ASSERT_NO_THROW(rq.fetch()); rq.dump(); } + // Delete the queue entry + cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); + cta::objectstore::ScopedExclusiveLock lock(rq); + rq.fetch(); + rq.removeIfEmpty(lc); + ASSERT_FALSE(rq.exists()); +} + +TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) { + cta::objectstore::BackendVFS be; + cta::log::DummyLogger dl("dummyLogger"); + cta::log::LogContext lc(dl); + cta::objectstore::AgentReference agentRef("unitTest", dl); + std::mt19937 gen((std::random_device())()); + // Create 1000 jobs references. + std::list<cta::objectstore::RetrieveQueue::JobToAdd> jobsToAdd; + for (size_t i=0; i<1000; i++) { + cta::objectstore::RetrieveQueue::JobToAdd jta; + jta.copyNb = 1; + jta.fSeq = i; + jta.fileSize = 1000; + jta.policy.maxDrivesAllowed = 10; + jta.policy.retrieveMinRequestAge = 10; + jta.policy.retrievePriority = 1; + jta.startTime = ::time(nullptr); + std::stringstream address; + address << "someRequest-" << i; + jta.retrieveRequestAddress = address.str(); + jobsToAdd.push_back(jta); + } + std::string retrieveQueueAddress = agentRef.nextId("RetrieveQueue"); + { + // Try to create the retrieve queue + cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); + rq.initialize("V12345"); + // Set a small shard size to validate multi shard behaviors + rq.setShardSize(20); + rq.insert(); + } + { + // Read the queue and insert jobs 3 by 3 (the insertion size is + // expected to be << shard size (5 here). + auto jobsToAddNow = jobsToAdd; + while (jobsToAddNow.size()) { + std::list<cta::objectstore::RetrieveQueue::JobToAdd> jobsBatch; + for (size_t i=0; i<15; i++) { + if (jobsToAddNow.size()) { + auto j=std::next(jobsToAddNow.begin(), (std::uniform_int_distribution<size_t>(0, jobsToAddNow.size() -1))(gen)); + jobsBatch.emplace_back(*j); + jobsToAddNow.erase(j); + } + } + cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.addJobsAndCommit(jobsBatch, agentRef, lc); + } + // Check the shard count is not too high. Due to random insertion, we might + // have some efficiencies, but we expect at least an average 5 jobs per shard + // (less than 500 shards). + cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + ASSERT_LT(rq.getShardCount(), 100); + } + { + // Try to read back + cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); + ASSERT_THROW(rq.fetch(), cta::exception::Exception); + cta::objectstore::ScopedExclusiveLock lock(rq); + ASSERT_NO_THROW(rq.fetch()); + // Pop jobs while we can. They should come out in fseq order as there is + // no interleaved push and pop. + uint64_t nextExpectedFseq=0; + while (rq.getJobsSummary().files) { + auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>()); + std::set<std::string> jobsToSkip; + std::list<std::string> jobsToDelete; + for (auto &j: candidateJobs.candidates) { + std::stringstream address; + address << "someRequest-" << nextExpectedFseq; + ASSERT_EQ(address.str(), j.address); + jobsToSkip.insert(j.address); + jobsToDelete.emplace_back(j.address); + nextExpectedFseq++; + } + auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip); + if (candidateJobs2.candidateFiles) { + std::stringstream address; + address << "someRequest-" << nextExpectedFseq; + ASSERT_EQ(address.str(), candidateJobs2.candidates.front().address); + } + rq.removeJobsAndCommit(jobsToDelete); + } + ASSERT_EQ(nextExpectedFseq, 1000); + } + // Delete the root entry cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); cta::objectstore::ScopedExclusiveLock lock(rq); @@ -51,4 +150,5 @@ TEST(ObjectStore, RetrieveQueueBasicAccess) { rq.removeIfEmpty(lc); ASSERT_FALSE(rq.exists()); } + } \ No newline at end of file diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 0abdd7ebede63fc09dd7d42bdc288eb59289d4bd..bd762b2dc862fb3e9b1c5ec7828ca70bc8af48ed 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -435,4 +435,5 @@ message RetrieveQueue { required uint64 retrievejobscount = 10145; required uint64 oldestjobcreationtime = 10150; required uint64 mapsrebuildcount = 10160; + required uint64 maxshardsize = 10170; }