Commit 652bfe83 authored by Cedric Caffy's avatar Cedric Caffy
Browse files

Corrected oldest job age for RetrieveQueue, unit tests need to be

implemented
parent a80ae270
......@@ -250,4 +250,91 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
}
}
TEST(ObjectStore, RetrieveQueueAlgorithmsUpdatesOldestJobQueueTime) {
using namespace cta::objectstore;
// We will need a log object
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("dummy", "unitTest");
#else
cta::log::DummyLogger dl("dummy", "unitTest");
#endif
cta::catalogue::DummyCatalogue catalogue;
cta::log::LogContext lc(dl);
// Here we check for the ability to detect dead (but empty agents) and clean them up
BackendVFS be;
AgentReference agentRef("unitTestGarbageCollector", dl);
Agent agent(agentRef.getAgentAddress(), be);
// Create the root entry
RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
EntryLogSerDeser el("user0", "unittesthost", time(NULL));
ScopedExclusiveLock rel(re);
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
rel.release();
agent.initialize();
agent.insertAndRegisterSelf(lc);
std::list<std::unique_ptr<RetrieveRequest> > requestsPtrs;
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests;
fillRetrieveRequests(requests, requestsPtrs, be, agentRef); //memory leak here
/*{
// Second agent to test referenceAndSwitchOwnershipIfNecessary
BackendVFS be2;
AgentReference agentRef2("Agent 2", dl);
Agent agent2(agentRef2.getAgentAddress(), be2);
// Create the root entry
RootEntry re2(be2);
re2.initialize();
re2.insert();
// Create the agent register
EntryLogSerDeser el2("user0", "unittesthost", time(NULL));
ScopedExclusiveLock rel2(re2);
re2.addOrGetAgentRegisterPointerAndCommit(agentRef2, el2, lc);
rel2.release();
agent2.initialize();
agent2.insertAndRegisterSelf(lc);
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests2;
std::list<std::unique_ptr<RetrieveRequest> > requestsPtrs2;
fillRetrieveRequests(requests2, requestsPtrs2,be2, agentRef2);
auto a1 = agentRef2.getAgentAddress();
auto a2 = agentRef2.getAgentAddress();
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos2(be2, agentRef2);
retrieveAlgos2.referenceAndSwitchOwnershipIfNecessary("VID",
a2, a1, requests2, lc);
}*/
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos(be, agentRef);
try {
ASSERT_EQ(requests.size(), 10);
retrieveAlgos.referenceAndSwitchOwnership("VID",
agentRef.getAgentAddress(), requests, lc);
// Now get the requests back
ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PopCriteria popCriteria;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
popCriteria.files = 100;
auto poppedJobs = retrieveAlgos.popNextBatch("VID", popCriteria, lc);
ASSERT_EQ(poppedJobs.summary.files, 10);
// Validate that the summary has the same information as the popped elements
ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PoppedElementsSummary s;
for(auto &e: poppedJobs.elements) {
s += ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::getElementSummary(e);
}
ASSERT_EQ(s, poppedJobs.summary);
} catch (ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::OwnershipSwitchFailure & ex) {
for (auto & e: ex.failedElements) {
try {
throw e.failure;
} catch(std::exception &e) {
std::cout << e.what() << std::endl;
}
}
}
}
}
......@@ -699,10 +699,17 @@ void RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemo
}
// We still need to update the tracking queue side.
// Update stats and remove the jobs from the todo list.
bool needToRebuild = false;
time_t oldestJobCreationTime = m_payload.oldestjobcreationtime();
for (auto & j: removalResult.removedJobs) {
maxDriveAllowedMap.decCount(j.maxDrivesAllowed);
priorityMap.decCount(j.priority);
minRetrieveRequestAgeMap.decCount(j.minRetrieveRequestAge);
if(j.startTime <= oldestJobCreationTime){
//the job we remove was the oldest one, we should rebuild the queue
//to update the oldestjobcreationtime counter
needToRebuild = true;
}
if (j.activityDescription) {
// We have up a partial activity description, but this is enough to decCount.
RetrieveActivityDescription activityDescription;
......@@ -720,8 +727,9 @@ void RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemo
// Also update the shard pointers's stats. In case of mismatch, we will trigger a rebuild.
shardPointer->set_shardbytescount(shardPointer->shardbytescount() - removalResult.bytesRemoved);
shardPointer->set_shardjobscount(shardPointer->shardjobscount() - removalResult.jobsRemoved);
if (shardPointer->shardbytescount() != removalResult.bytesAfter
|| shardPointer->shardjobscount() != removalResult.jobsAfter) {
if (!needToRebuild && (shardPointer->shardbytescount() != removalResult.bytesAfter
|| shardPointer->shardjobscount() != removalResult.jobsAfter)) {
rebuild();
}
// We will commit when exiting anyway...
......@@ -745,6 +753,9 @@ void RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemo
}
); // end of remove_if
// And commit the queue (once per shard should not hurt performance).
if(needToRebuild){
rebuild();
}
commit();
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment