Commit 84032229 authored by Eric Cano's avatar Eric Cano
Browse files

Added proper tracking of summary maps in RetriveQueue.

Like in ArchiveQueue, added per file cache of parameters, summary rebuilds and rebuild count.
parent 186a224b
......@@ -39,9 +39,32 @@ void cta::objectstore::RetrieveQueue::initialize(const std::string &vid) {
m_payload.set_oldestjobcreationtime(0);
m_payload.set_retrievejobstotalsize(0);
m_payload.set_vid(vid);
m_payload.set_mapsrebuildcount(0);
m_payloadInterpreted = true;
}
void cta::objectstore::RetrieveQueue::commit() {
// Before calling ObjectOps::commit, check that we have coherent queue summaries
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap());
if (maxDriveAllowedMap.total() != (uint64_t)m_payload.retrievejobs_size() ||
priorityMap.total() != (uint64_t)m_payload.retrievejobs_size() ||
minRetrieveRequestAgeMap.total() != (uint64_t)m_payload.retrievejobs_size()) {
// The maps counts are off: recompute them.
maxDriveAllowedMap.clear();
priorityMap.clear();
minRetrieveRequestAgeMap.clear();
for (size_t i=0; i<(size_t)m_payload.retrievejobs_size(); i++) {
maxDriveAllowedMap.incCount(m_payload.retrievejobs(i).maxdrivesallowed());
priorityMap.incCount(m_payload.retrievejobs(i).priority());
minRetrieveRequestAgeMap.incCount(m_payload.retrievejobs(i).minretrieverequestage());
}
m_payload.set_mapsrebuildcount(m_payload.mapsrebuildcount()+1);
}
ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::commit();
}
bool cta::objectstore::RetrieveQueue::isEmpty() {
checkPayloadReadable();
return !m_payload.retrievejobs_size();
......@@ -94,6 +117,9 @@ void cta::objectstore::RetrieveQueue::addJob(uint64_t copyNb,
j->set_address(retrieveRequestAddress);
j->set_size(size);
j->set_copynb(copyNb);
j->set_priority(policy.retrievePriority);
j->set_minretrieverequestage(policy.retrieveMinRequestAge);
j->set_maxdrivesallowed(policy.maxDrivesAllowed);
}
cta::objectstore::RetrieveQueue::JobsSummary cta::objectstore::RetrieveQueue::getJobsSummary() {
......@@ -157,11 +183,17 @@ void cta::objectstore::RetrieveQueue::removeJob(const std::string& retriveToFile
bool found=false;
do {
found=false;
found = false;
// Push the found entry all the way to the end.
for (size_t i=0; i<(size_t)jl->size(); i++) {
if (jl->Get(i).address() == retriveToFileAddress) {
found = true;
// Keep track of the mounting criteria
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
maxDriveAllowedMap.decCount(jl->Get(i).maxdrivesallowed());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
priorityMap.decCount(jl->Get(i).priority());
ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap());
minRetrieveRequestAgeMap.decCount(jl->Get(i).minretrieverequestage());
while (i+1 < (size_t)jl->size()) {
jl->SwapElements(i, i+1);
i++;
......
......@@ -34,6 +34,7 @@ public:
RetrieveQueue(const std::string & address, Backend & os);
RetrieveQueue(GenericObject & go);
void initialize(const std::string & vid);
void commit();
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
bool isEmpty();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
......
......@@ -152,6 +152,9 @@ message RetrieveJobPointer {
required uint64 size = 3101;
required string address = 3102;
required uint32 copynb = 3103;
required uint64 priority = 3104;
required uint64 minretrieverequestage = 3105;
required uint64 maxdrivesallowed = 3106;
}
// ------------- Mount criteria and quota -------------------------------------
......@@ -397,4 +400,5 @@ message RetrieveQueue {
repeated ValueCountPair maxdrivesallowedmap = 10133;
required uint64 retrievejobstotalsize = 10140;
required uint64 oldestjobcreationtime = 10150;
required uint64 mapsrebuildcount = 10160;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment