Commit 8f8f50a7 authored by Eric Cano's avatar Eric Cano
Browse files

Created a new unit test for random order insertion in RetriveQueue.

Fixed various bugs detected through it.
parent 7facf272
......@@ -27,6 +27,7 @@
#include "ArchiveQueue.hpp"
#include "ArchiveQueueShard.hpp"
#include "RetrieveQueue.hpp"
#include "RetrieveQueueShard.hpp"
#include "DriveRegister.hpp"
#include <stdexcept>
#include <google/protobuf/util/json_util.h>
......@@ -189,6 +190,9 @@ std::string GenericObject::dump() {
case serializers::RetrieveQueue_t:
bodyDump = dumpWithType<cta::objectstore::RetrieveQueue>(this);
break;
case serializers::RetrieveQueueShard_t:
bodyDump = dumpWithType<cta::objectstore::RetrieveQueueShard>(this);
break;
case serializers::ArchiveRequest_t:
bodyDump = dumpWithType<ArchiveRequest>(this);
break;
......
......@@ -48,6 +48,7 @@ void RetrieveQueue::initialize(const std::string &vid) {
m_payload.set_retrievejobscount(0);
m_payload.set_vid(vid);
m_payload.set_mapsrebuildcount(0);
m_payload.set_maxshardsize(m_maxShardSize);
m_payloadInterpreted = true;
}
......@@ -186,6 +187,11 @@ void RetrieveQueue::commit() {
ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::commit();
}
void RetrieveQueue::getPayloadFromHeader() {
ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::getPayloadFromHeader();
m_maxShardSize = m_payload.maxshardsize();
}
bool RetrieveQueue::isEmpty() {
checkPayloadReadable();
return !m_payload.retrievejobstotalsize() && !m_payload.retrievequeueshards_size();
......@@ -217,34 +223,17 @@ std::string RetrieveQueue::dump() {
return headerDump;
}
namespace {
struct ShardForAddition {
bool newShard=false;
bool creationDone=false;
bool splitDone=false;
bool toSplit=false;
ShardForAddition * splitDestination = nullptr;
bool fromSplit=false;
ShardForAddition * splitSource = nullptr;
std::string address;
uint64_t minFseq;
uint64_t maxFseq;
uint64_t jobsCount;
std::list<RetrieveQueue::JobToAdd> jobsToAdd;
size_t shardIndex = std::numeric_limits<size_t>::max();
};
void updateShardLimits(uint64_t fSeq, ShardForAddition & sfa) {
void RetrieveQueue::updateShardLimits(uint64_t fSeq, ShardForAddition & sfa) {
if (fSeq < sfa.minFseq) sfa.minFseq=fSeq;
if (fSeq > sfa.maxFseq) sfa.maxFseq=fSeq;
}
/** Add a jobs to a shard, spliting it if necessary*/
void addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd,
void RetrieveQueue::addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd,
std::list<ShardForAddition>::iterator & shardForAddition, std::list<ShardForAddition> & shardList) {
// Is the shard still small enough? We will not double split shards (we suppose insertion size << shard size cap).
// We will also no split a new shard.
if ( shardForAddition->jobsCount < RetrieveQueue::c_maxShardSize
if ( shardForAddition->jobsCount < m_maxShardSize
|| shardForAddition->fromSplit || shardForAddition->newShard) {
// We just piggy back here. No need to increase range, we are within it.
shardForAddition->jobsCount++;
......@@ -256,21 +245,22 @@ void addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd,
// Create the new shard
auto newSfa = shardList.insert(shardForAddition, ShardForAddition());
// The new shard size can only be estimated, but we will update it to the actual value as we update the shard.
// The new shard is inserted before the old one, so the old one will keep the high
// half and new shard gets the bottom half.
uint64_t shardRange = shardForAddition->maxFseq - shardForAddition->minFseq;
uint64_t oldMax = shardForAddition->maxFseq;
shardForAddition->maxFseq = shardForAddition->minFseq + shardRange/2;
shardForAddition->jobsCount = shardForAddition->jobsCount/2;
shardForAddition->toSplit = true;
shardForAddition->splitDestination = &*newSfa;
newSfa->minFseq = shardForAddition->maxFseq+1;
newSfa->maxFseq = oldMax;
newSfa->jobsCount = shardForAddition->jobsCount;
newSfa->minFseq = shardForAddition->minFseq;
newSfa->maxFseq = shardForAddition->minFseq + shardRange/2;
newSfa->jobsCount = shardForAddition->jobsCount/2;
newSfa->splitSource = &*shardForAddition;
newSfa->fromSplit = true;
newSfa->newShard = true;
shardForAddition->minFseq = shardForAddition->minFseq + shardRange/2 + 1;
shardForAddition->jobsCount = shardForAddition->jobsCount/2;
shardForAddition->toSplit = true;
shardForAddition->splitDestination = &*newSfa;
// Transfer jobs to add to new shard if needed
for (auto jta2=shardForAddition->jobsToAdd.begin(); jta2!=shardForAddition->jobsToAdd.end();) {
if (jta2->fSeq >= newSfa->minFseq) {
if (jta2->fSeq <= newSfa->maxFseq) {
newSfa->jobsToAdd.emplace_back(*jta2);
jta2 = shardForAddition->jobsToAdd.erase(jta2);
} else {
......@@ -278,22 +268,21 @@ void addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd,
}
}
// We can finally add our job to one of the two shards from the split
if (jobToAdd.fSeq <= shardForAddition->maxFseq) {
if (jobToAdd.fSeq >= shardForAddition->minFseq) {
shardForAddition->jobsToAdd.emplace_back(jobToAdd);
shardForAddition->jobsCount++;
updateShardLimits(jobToAdd.fSeq, *shardForAddition);
} else {
newSfa->jobsToAdd.emplace_back(jobToAdd);
newSfa->jobsCount++;
updateShardLimits(jobToAdd.fSeq, *shardForAddition);
updateShardLimits(jobToAdd.fSeq, *newSfa);
}
}
}
} // anonymous namespace
void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc) {
checkPayloadWritable();
if (jobsToAdd.empty()) return;
// Keep track of the mounting criteria
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
......@@ -349,8 +338,12 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer
}
// Find where the job lands in the shards
for (auto sfa=shardsForAddition.begin(); sfa != shardsForAddition.end(); sfa++) {
// Is it within this shard?
if (jta.fSeq >= sfa->minFseq && jta.fSeq <= sfa->maxFseq) {
if (sfa->minFseq > jta.fSeq) {
// Are we before the current shard? (for example, before first shard)
addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition);
goto jobInserted;
} else if (jta.fSeq >= sfa->minFseq && jta.fSeq <= sfa->maxFseq) {
// Is it within this shard?
addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition);
goto jobInserted;
} else if (sfa != shardsForAddition.end() && std::next(sfa) != shardsForAddition.end()) {
......@@ -364,10 +357,6 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer
}
goto jobInserted;
}
} else if (sfa->minFseq > jta.fSeq) {
// Are we before the current shard? (for example, before first shard)
addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition);
goto jobInserted;
} else if (std::next(sfa) == shardsForAddition.end() && sfa->maxFseq < jta.fSeq) {
// Are we after the last shard?
addJobToShardAndMaybeSplit(jta, sfa, shardsForAddition);
......@@ -376,7 +365,6 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer
}
// Still not inserted? Now we run out of options. Segfault to ease debugging.
{
*((int *)nullptr) = 0; // TODO: remove in the long run.
throw cta::exception::Exception("In RetrieveQueue::addJobsAndCommit(): could not find an appropriate shard for job");
}
jobInserted:;
......@@ -394,8 +382,8 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer
// TODO: shard creation and update could be parallelized (to some extent as we
// have shard to shard dependencies with the splits), but as a first implementation
// we just go iteratively.
uint64_t addedJobs = 0, addedBytes = 0;
for (auto & shard: shardsForAddition) {
uint64_t addedJobs = 0, addedBytes = 0, transferedInSplitJobs = 0, transferedInSplitBytes = 0;
// Variables which will allow the shard/pointer updates in all cases.
cta::objectstore::serializers::RetrieveQueueShardPointer * shardPointer = nullptr, * splitFromShardPointer = nullptr;
RetrieveQueueShard rqs(m_objectStore), rqsSplitFrom(m_objectStore);
......@@ -406,13 +394,15 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer
rqs.setAddress(agentReference.nextId(shardName.str()));
rqs.initialize(getAddressIfSet());
// We also need to create the pointer, and insert it to the right spot.
shardPointer = m_payload.add_retrievequeueshards();
shardPointer = m_payload.mutable_retrievequeueshards()->Add();
// Pre-update the shard pointer.
shardPointer->set_address(rqs.getAddressIfSet());
shardPointer->set_minfseq(0);
shardPointer->set_maxfseq(0);
shardPointer->set_minfseq(0);
shardPointer->set_shardbytescount(0);
shardPointer->set_shardjobscount(0);
shard.creationDone = true;
shard.address = rqs.getAddressIfSet();
// Move the shard pointer to its intended location.
size_t currentShardPosition=m_payload.retrievequeueshards_size() - 1;
while (currentShardPosition != shard.shardIndex) {
......@@ -432,16 +422,27 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer
for (auto &j: jobsFromSource) {
if (j.fSeq >= shard.minFseq && j.fSeq <= shard.maxFseq) {
rqs.addJob(j);
addedJobs++;
addedBytes+=j.fileSize;
jobsToTransferAddresses.emplace_back(j.retrieveRequestAddress);
}
}
rqsSplitFrom.removeJobs(jobsToTransferAddresses);
auto splitFromShardSummary = rqsSplitFrom.getJobsSummary();
splitFromShardPointer->set_maxfseq(splitFromShardSummary.maxFseq);
splitFromShardPointer->set_minfseq(splitFromShardSummary.minFseq);
splitFromShardPointer->set_shardbytescount(splitFromShardSummary.bytes);
splitFromShardPointer->set_shardjobscount(splitFromShardSummary.jobs);
auto removalResult = rqsSplitFrom.removeJobs(jobsToTransferAddresses);
transferedInSplitBytes += removalResult.bytesRemoved;
transferedInSplitJobs += removalResult.jobsRemoved;
// We update the shard pointer with fseqs to allow validations, but the actual
//values will be updated as the shard itself is populated.
shardPointer->set_maxfseq(shard.maxFseq);
shardPointer->set_minfseq(shard.minFseq);
shardPointer->set_shardjobscount(shard.jobsCount);
shardPointer->set_shardbytescount(1);
splitFromShardPointer->set_minfseq(shard.splitSource->minFseq);
splitFromShardPointer->set_maxfseq(shard.splitSource->maxFseq);
splitFromShardPointer->set_shardjobscount(shard.splitSource->jobsCount);
shardPointer->set_shardbytescount(1);
// We are all set (in memory) for the shard from which we split.
shard.splitDone = true;
shard.splitSource->splitDone = true;
}
// We can now fill up the shard (outside of this if/else).
} else {
......@@ -468,11 +469,24 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer
// ... and finally commit the queue (first! there is potentially a new shard to
// pre-reference before inserting) and shards as is appropriate.
// Update global summaries
m_payload.set_retrievejobscount(m_payload.retrievejobscount() + addedJobs);
m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + addedBytes);
commit();
if (shard.newShard)
m_payload.set_retrievejobscount(m_payload.retrievejobscount() + addedJobs - transferedInSplitJobs);
m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + addedBytes - transferedInSplitBytes);
// If we are creating a new shard, we have to do a blind commit: the
// stats for shard we are splitting from could not be accounted for properly
// and the new shard is not yet inserted yet.
if (shard.fromSplit)
ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::commit();
else {
// in other cases, we should have a coherent state.
commit();
}
shard.comitted = true;
if (shard.newShard) {
rqs.insert();
if (shard.fromSplit)
rqsSplitFrom.commit();
}
else rqs.commit();
}
}
......@@ -680,4 +694,15 @@ void RetrieveQueue::garbageCollect(const std::string &presumedOwner, AgentRefere
throw cta::exception::Exception("In RetrieveQueue::garbageCollect(): not implemented");
}
void RetrieveQueue::setShardSize(uint64_t shardSize) {
checkPayloadWritable();
m_payload.set_maxshardsize(shardSize);
}
uint64_t RetrieveQueue::getShardCount() {
checkPayloadReadable();
return m_payload.retrievequeueshards_size();
}
}} // namespace cta::objectstore
......@@ -37,6 +37,7 @@ public:
RetrieveQueue(GenericObject & go);
void initialize(const std::string & vid);
void commit();
void getPayloadFromHeader() override;
private:
// Validates all summaries are in accordance with each other.
......@@ -101,13 +102,44 @@ public:
// -- Generic parameters
std::string getVid();
private:
struct ShardForAddition {
bool newShard=false;
bool creationDone=false;
bool splitDone=false;
bool toSplit=false;
bool comitted=false;
ShardForAddition * splitDestination = nullptr;
bool fromSplit=false;
ShardForAddition * splitSource = nullptr;
std::string address;
uint64_t minFseq;
uint64_t maxFseq;
uint64_t jobsCount;
std::list<RetrieveQueue::JobToAdd> jobsToAdd;
size_t shardIndex = std::numeric_limits<size_t>::max();
};
void updateShardLimits(uint64_t fSeq, ShardForAddition & sfa);
void addJobToShardAndMaybeSplit(RetrieveQueue::JobToAdd & jobToAdd,
std::list<ShardForAddition>::iterator & shardForAddition, std::list<ShardForAddition> & shardList);
public:
/** Helper function for unit tests: use smaller shard size to validate ordered insertion */
void setShardSize(uint64_t shardSize);
/** Helper function for unit tests: validate that we have the expected number of shards */
uint64_t getShardCount();
private:
// The shard size. From experience, 100k is where we start to see performance difference,
// but nothing prevents us from using a smaller size.
// The performance will be roughly flat until the queue size reaches the square of this limit
// (meaning the queue object updates start to take too much time).
// with this current value of 25k, the performance should be roughly flat until 25k^2=625M.
static const uint64_t c_maxShardSize = 25000;
static const uint64_t c_defaultMaxShardSize = 25000;
uint64_t m_maxShardSize = c_defaultMaxShardSize;
};
}}
......@@ -166,6 +166,8 @@ auto RetrieveQueueShard::getJobsSummary() -> JobsSummary {
ret.jobs = m_payload.retrievejobs_size();
ret.minFseq = m_payload.retrievejobs(0).fseq();
ret.maxFseq = m_payload.retrievejobs(m_payload.retrievejobs_size()-1).fseq();
if (ret.minFseq > ret.maxFseq)
throw cta::exception::Exception("In RetrieveQueueShard::getJobsSummary(): wrong shard ordering.");
return ret;
}
......@@ -182,6 +184,12 @@ uint64_t RetrieveQueueShard::addJob(RetrieveQueue::JobToAdd& jobToAdd) {
j->set_minretrieverequestage(jobToAdd.policy.retrieveMinRequestAge);
j->set_starttime(jobToAdd.startTime);
m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize()+jobToAdd.fileSize);
// Sort the shard
size_t jobIndex = m_payload.retrievejobs_size() - 1;
while (jobIndex > 0 && m_payload.retrievejobs(jobIndex).fseq() < m_payload.retrievejobs(jobIndex-1).fseq()) {
m_payload.mutable_retrievejobs()->SwapElements(jobIndex-1, jobIndex);
jobIndex--;
}
return m_payload.retrievejobs_size();
}
......
......@@ -22,6 +22,8 @@
#include "AgentReference.hpp"
#include "common/log/DummyLogger.hpp"
#include <random>
namespace unitTests {
TEST(ObjectStore, RetrieveQueueBasicAccess) {
......@@ -31,7 +33,7 @@ TEST(ObjectStore, RetrieveQueueBasicAccess) {
cta::objectstore::AgentReference agentRef("unitTest", dl);
std::string retrieveQueueAddress = agentRef.nextId("RetrieveQueue");
{
// Try to create the tape entry
// Try to create the retrieve queue
cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be);
rq.initialize("V12345");
rq.insert();
......@@ -44,6 +46,103 @@ TEST(ObjectStore, RetrieveQueueBasicAccess) {
ASSERT_NO_THROW(rq.fetch());
rq.dump();
}
// Delete the queue entry
cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be);
cta::objectstore::ScopedExclusiveLock lock(rq);
rq.fetch();
rq.removeIfEmpty(lc);
ASSERT_FALSE(rq.exists());
}
TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) {
cta::objectstore::BackendVFS be;
cta::log::DummyLogger dl("dummyLogger");
cta::log::LogContext lc(dl);
cta::objectstore::AgentReference agentRef("unitTest", dl);
std::mt19937 gen((std::random_device())());
// Create 1000 jobs references.
std::list<cta::objectstore::RetrieveQueue::JobToAdd> jobsToAdd;
for (size_t i=0; i<1000; i++) {
cta::objectstore::RetrieveQueue::JobToAdd jta;
jta.copyNb = 1;
jta.fSeq = i;
jta.fileSize = 1000;
jta.policy.maxDrivesAllowed = 10;
jta.policy.retrieveMinRequestAge = 10;
jta.policy.retrievePriority = 1;
jta.startTime = ::time(nullptr);
std::stringstream address;
address << "someRequest-" << i;
jta.retrieveRequestAddress = address.str();
jobsToAdd.push_back(jta);
}
std::string retrieveQueueAddress = agentRef.nextId("RetrieveQueue");
{
// Try to create the retrieve queue
cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be);
rq.initialize("V12345");
// Set a small shard size to validate multi shard behaviors
rq.setShardSize(20);
rq.insert();
}
{
// Read the queue and insert jobs 3 by 3 (the insertion size is
// expected to be << shard size (5 here).
auto jobsToAddNow = jobsToAdd;
while (jobsToAddNow.size()) {
std::list<cta::objectstore::RetrieveQueue::JobToAdd> jobsBatch;
for (size_t i=0; i<15; i++) {
if (jobsToAddNow.size()) {
auto j=std::next(jobsToAddNow.begin(), (std::uniform_int_distribution<size_t>(0, jobsToAddNow.size() -1))(gen));
jobsBatch.emplace_back(*j);
jobsToAddNow.erase(j);
}
}
cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be);
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
rq.addJobsAndCommit(jobsBatch, agentRef, lc);
}
// Check the shard count is not too high. Due to random insertion, we might
// have some efficiencies, but we expect at least an average 5 jobs per shard
// (less than 500 shards).
cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be);
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
ASSERT_LT(rq.getShardCount(), 100);
}
{
// Try to read back
cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be);
ASSERT_THROW(rq.fetch(), cta::exception::Exception);
cta::objectstore::ScopedExclusiveLock lock(rq);
ASSERT_NO_THROW(rq.fetch());
// Pop jobs while we can. They should come out in fseq order as there is
// no interleaved push and pop.
uint64_t nextExpectedFseq=0;
while (rq.getJobsSummary().files) {
auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>());
std::set<std::string> jobsToSkip;
std::list<std::string> jobsToDelete;
for (auto &j: candidateJobs.candidates) {
std::stringstream address;
address << "someRequest-" << nextExpectedFseq;
ASSERT_EQ(address.str(), j.address);
jobsToSkip.insert(j.address);
jobsToDelete.emplace_back(j.address);
nextExpectedFseq++;
}
auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip);
if (candidateJobs2.candidateFiles) {
std::stringstream address;
address << "someRequest-" << nextExpectedFseq;
ASSERT_EQ(address.str(), candidateJobs2.candidates.front().address);
}
rq.removeJobsAndCommit(jobsToDelete);
}
ASSERT_EQ(nextExpectedFseq, 1000);
}
// Delete the root entry
cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be);
cta::objectstore::ScopedExclusiveLock lock(rq);
......@@ -51,4 +150,5 @@ TEST(ObjectStore, RetrieveQueueBasicAccess) {
rq.removeIfEmpty(lc);
ASSERT_FALSE(rq.exists());
}
}
\ No newline at end of file
......@@ -435,4 +435,5 @@ message RetrieveQueue {
required uint64 retrievejobscount = 10145;
required uint64 oldestjobcreationtime = 10150;
required uint64 mapsrebuildcount = 10160;
required uint64 maxshardsize = 10170;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment