Skip to content
Snippets Groups Projects
Commit 49f4a5ef authored by Cedric Caffy's avatar Cedric Caffy
Browse files

Added RetrieveQueue oldestjobage unit test for cta-admin sq

parent 87eeaaa0
No related branches found
No related tags found
No related merge requests found
......@@ -34,14 +34,21 @@
namespace unitTests {
/**
* Create Objectstore RetrieveRequest and insert them in a list that could be used to queue with the Algorithms
* @param requests the list of RetrieveRequests that will be queued in the objectstore
* @param requestPtrs the pointers of the RetrieveRequests that will be queued in the objectstore
* @param be objectstore backend
* @param agentRef the current agent that queues
* @param startFseq allows to set the FSeq of the first file to be queued (in case this method is called multiple times)
*/
void fillRetrieveRequests(
typename cta::objectstore::ContainerAlgorithms<cta::objectstore::RetrieveQueue,cta::objectstore::RetrieveQueueToTransfer>::InsertedElement::list &requests,
std::list<std::unique_ptr<cta::objectstore::RetrieveRequest> >& requestPtrs, //List to avoid memory leak on ArchiveQueueAlgorithms test
cta::objectstore::BackendVFS &be,
cta::objectstore::AgentReference &agentRef)
cta::objectstore::AgentReference &agentRef, uint64_t startFseq = 0)
{
using namespace cta::objectstore;
for(size_t i = 0; i < 10; ++i)
{
std::string rrAddr = agentRef.nextId("RetrieveRequest");
......@@ -63,7 +70,7 @@ void fillRetrieveRequests(
tf.fileSize = 1;
tf.copyNb = 1;
tf.creationTime = time(nullptr);
tf.fSeq = i;
tf.fSeq = startFseq;
tf.vid = "Tape0";
rqc.archiveFile.tapeFiles.push_back(tf);
}
......@@ -76,7 +83,7 @@ void fillRetrieveRequests(
rqc.mountPolicy.retrievePriority = 1;
requestPtrs.emplace_back(new cta::objectstore::RetrieveRequest(rrAddr, be));
requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement{
requestPtrs.back().get(), 1, i, 667, mp, cta::nullopt, cta::nullopt
requestPtrs.back().get(), 1, startFseq++, 667, mp, cta::nullopt, cta::nullopt
});
auto &rr = *requests.back().retrieveRequest;
rr.initialize();
......@@ -276,65 +283,62 @@ TEST(ObjectStore, RetrieveQueueAlgorithmsUpdatesOldestJobQueueTime) {
rel.release();
agent.initialize();
agent.insertAndRegisterSelf(lc);
std::list<std::unique_ptr<RetrieveRequest> > requestsPtrs;
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests;
fillRetrieveRequests(requests, requestsPtrs, be, agentRef); //memory leak here
/*{
// Second agent to test referenceAndSwitchOwnershipIfNecessary
BackendVFS be2;
AgentReference agentRef2("Agent 2", dl);
Agent agent2(agentRef2.getAgentAddress(), be2);
// Create the root entry
RootEntry re2(be2);
re2.initialize();
re2.insert();
// Create the agent register
EntryLogSerDeser el2("user0", "unittesthost", time(NULL));
ScopedExclusiveLock rel2(re2);
re2.addOrGetAgentRegisterPointerAndCommit(agentRef2, el2, lc);
rel2.release();
agent2.initialize();
agent2.insertAndRegisterSelf(lc);
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests2;
std::list<std::unique_ptr<RetrieveRequest> > requestsPtrs2;
fillRetrieveRequests(requests2, requestsPtrs2,be2, agentRef2);
auto a1 = agentRef2.getAgentAddress();
auto a2 = agentRef2.getAgentAddress();
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos2(be2, agentRef2);
retrieveAlgos2.referenceAndSwitchOwnershipIfNecessary("VID",
a2, a1, requests2, lc);
}*/
std::string vid = "Tape0";
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos(be, agentRef);
try {
std::string retrieveQueueAddress;
std::unique_ptr<cta::objectstore::RetrieveQueue> rq;
time_t firstBatchOldestJobStartTime;
{
std::list<std::unique_ptr<RetrieveRequest> > requestsPtrs;
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests;
fillRetrieveRequests(requests, requestsPtrs, be, agentRef,0);
//Insert a first batch of 10 requests
ASSERT_EQ(requests.size(), 10);
retrieveAlgos.referenceAndSwitchOwnership(vid,
agentRef.getAgentAddress(), requests, lc);
retrieveAlgos.referenceAndSwitchOwnership("VID",
re.fetchNoLock();
retrieveQueueAddress = re.getRetrieveQueueAddress(vid,JobQueueType::JobsToTransferForUser);
rq.reset(new cta::objectstore::RetrieveQueue(retrieveQueueAddress,be));
rq->fetchNoLock();
//Get the first batch oldestAge
firstBatchOldestJobStartTime = rq->getJobsSummary().oldestJobStartTime;
}
//Create another batch of 10 requests
{
std::list<std::unique_ptr<RetrieveRequest> > requestsPtrs;
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests;
fillRetrieveRequests(requests, requestsPtrs, be, agentRef,10);
ASSERT_EQ(requests.size(), 10);
//Sleep 1 second before requeueing
::sleep(1);
//Requeue
retrieveAlgos.referenceAndSwitchOwnership(vid,
agentRef.getAgentAddress(), requests, lc);
rq->fetchNoLock();
uint64_t secondBatchOldestJobStartTime;
secondBatchOldestJobStartTime = rq->getJobsSummary().oldestJobStartTime;
//As we did not pop, the first inserted batch of jobs is the oldest one
ASSERT_EQ(firstBatchOldestJobStartTime,secondBatchOldestJobStartTime);
}
// Now get the requests back
ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PopCriteria popCriteria;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
popCriteria.files = 100;
auto poppedJobs = retrieveAlgos.popNextBatch("VID", popCriteria, lc);
ASSERT_EQ(poppedJobs.summary.files, 10);
// Now pop the first 10 batches of jobs --> the queue oldestjobstarttime should be equal to the second batch oldestjobstarttime
ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PopCriteria popCriteria;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
popCriteria.files = 10;
auto poppedJobs = retrieveAlgos.popNextBatch(vid, popCriteria, lc);
ASSERT_EQ(poppedJobs.summary.files, 10);
// Validate that the summary has the same information as the popped elements
ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PoppedElementsSummary s;
for(auto &e: poppedJobs.elements) {
s += ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::getElementSummary(e);
}
ASSERT_EQ(s, poppedJobs.summary);
} catch (ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::OwnershipSwitchFailure & ex) {
for (auto & e: ex.failedElements) {
try {
throw e.failure;
} catch(std::exception &e) {
std::cout << e.what() << std::endl;
}
}
}
}
//The new oldestJobStartTime should be equal to the jobstarttime of the first job of the second batch
rq->fetchNoLock();
time_t oldestJobStartTime = rq->getJobsSummary().oldestJobStartTime;
ASSERT_TRUE(oldestJobStartTime > firstBatchOldestJobStartTime);
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment