Commit 966cdd7e authored by Cedric Caffy's avatar Cedric Caffy
Browse files

[objectstore] ArchiveQueue oldestjobcreationtime is now updated at each pop from the queue

parent b6c920ca
......@@ -12,6 +12,7 @@ This release contains an improvement allowing to fetch the EOS free space via an
### Bug fixes
- cta/CTA#917 Corrected the bug in the cta-admin showqueues command in the case ArchiveForUser and ArchiveForRepack exist for the same tapepool
- cta/CTA#919 Archive queue oldestjobcreationtime is now updated at each pop from the ArchiveQueue
# v3.1-8
......
......@@ -494,6 +494,7 @@ Currently contains a helper for the client-ar script, which should be installed
- Upstream EOS 4.8.26-1
- cta/CTA#907 For backpressure, the EOS free space can be fetched by calling an external script
- cta/CTA#917 Corrected the bug in the cta-admin showqueues command in the case ArchiveForUser and ArchiveForRepack exist for the same tapepool
- cta/CTA#919 Archive queue oldestjobcreationtime is now updated at each pop from the ArchiveQueue
* Fri Oct 22 2020 julien.leduc (at) cern.ch - 3.1-8
- CTA software Recommended Access Order (RAO) implemented for LTO drives
- cta-admin repack ls tabular output improvements
......
......@@ -20,6 +20,7 @@
#include "AgentReference.hpp"
#include "Agent.hpp"
#include "common/log/DummyLogger.hpp"
#include "common/log/StdoutLogger.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#include "catalogue/DummyCatalogue.hpp"
#include "BackendVFS.hpp"
......@@ -110,6 +111,51 @@ void fillRetrieveRequests(
}
}
/**
* Create Objectstore ArchiveRequest and insert them in a list that could be used to queue with the Algorithms
* @param requests the list of ArchiveRequests that will be queued in the objectstore
* @param requestPtrs the pointers of the ArchiveRequests that will be queued in the objectstore
* @param be objectstore backend
* @param agentRef the current agent that queues
*/
void fillArchiveRequests(typename cta::objectstore::ContainerAlgorithms<cta::objectstore::ArchiveQueue,cta::objectstore::ArchiveQueueToTransferForUser>::InsertedElement::list &requests,
std::list<std::unique_ptr<cta::objectstore::ArchiveRequest> >& requestPtrs, //List to avoid memory leak on ArchiveQueueAlgorithms test
cta::objectstore::BackendVFS &be,
cta::objectstore::AgentReference &agentRef){
using namespace cta::objectstore;
for (size_t i=0; i<10; i++) {
std::string arAddr = agentRef.nextId("ArchiveRequest");
agentRef.addToOwnership(arAddr, be);
cta::common::dataStructures::MountPolicy mp;
// This will be a copy number 1.
cta::common::dataStructures::ArchiveFile aFile;
aFile.archiveFileID = 123456789L;
aFile.diskFileId = "eos://diskFile";
aFile.checksumBlob.insert(cta::checksum::NONE, "");
aFile.creationTime = 0;
aFile.reconciliationTime = 0;
aFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo();
aFile.diskInstance = "eoseos";
aFile.fileSize = 667;
aFile.storageClass = "sc";
requestPtrs.emplace_back(new cta::objectstore::ArchiveRequest(arAddr, be));
requests.emplace_back(ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser>::InsertedElement{requestPtrs.back().get(), 1, aFile, mp,
cta::nullopt});
auto & ar=*requests.back().archiveRequest;
auto copyNb = requests.back().copyNb;
ar.initialize();
ar.setArchiveFile(aFile);
ar.addJob(copyNb, "tapepool", agentRef.getAgentAddress(), 1, 1, 1);
ar.setMountPolicy(mp);
ar.setArchiveReportURL("");
ar.setArchiveErrorReportURL("");
ar.setRequester(cta::common::dataStructures::RequesterIdentity("user0", "group0"));
ar.setSrcURL("root://eoseos/myFile");
ar.setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr)));
ar.insert();
}
}
TEST(ObjectStore, ArchiveQueueAlgorithms) {
using namespace cta::objectstore;
// We will need a log object
......@@ -366,4 +412,88 @@ TEST(ObjectStore, RetrieveQueueAlgorithmsUpdatesOldestJobQueueTime) {
ASSERT_TRUE(oldestJobStartTime > firstBatchOldestJobStartTime);
}
TEST(ObjectStore, ArchiveQueueAlgorithmsUpdatesOldestJobQueueTime){
using namespace cta::objectstore;
// We will need a log object
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("dummy", "unitTest");
#else
cta::log::DummyLogger dl("dummy", "unitTest");
#endif
cta::catalogue::DummyCatalogue catalogue;
cta::log::LogContext lc(dl);
// Here we check for the ability to detect dead (but empty agents) and clean them up
BackendVFS be;
AgentReference agentRef("unitTestArchiveQueueAlgorithms", dl);
Agent agent(agentRef.getAgentAddress(), be);
// Create the root entry
RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
EntryLogSerDeser el("user0", "unittesthost", time(NULL));
ScopedExclusiveLock rel(re);
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el, lc);
rel.release();
agent.initialize();
agent.insertAndRegisterSelf(lc);
std::string tapepool = "tapepool";
ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser> archiveAlgos(be, agentRef);
std::string archiveQueueAddress;
std::unique_ptr<cta::objectstore::ArchiveQueue> aq;
time_t firstBatchOldestJobStartTime;
{
std::list<std::unique_ptr<ArchiveRequest> > requestsPtrs;
ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser>::InsertedElement::list requests;
fillArchiveRequests(requests, requestsPtrs, be, agentRef);
//Insert a first batch of 10 requests
ASSERT_EQ(requests.size(), 10);
archiveAlgos.referenceAndSwitchOwnership(tapepool,
agentRef.getAgentAddress(), requests, lc);
re.fetchNoLock();
archiveQueueAddress = re.getArchiveQueueAddress(tapepool,JobQueueType::JobsToTransferForUser);
aq.reset(new cta::objectstore::ArchiveQueue(archiveQueueAddress,be));
aq->fetchNoLock();
//Get the first batch oldestAge
firstBatchOldestJobStartTime = aq->getJobsSummary().oldestJobStartTime;
}
//Create another batch of 10 requests
{
std::list<std::unique_ptr<ArchiveRequest> > requestsPtrs;
ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser>::InsertedElement::list requests;
fillArchiveRequests(requests, requestsPtrs, be, agentRef);
ASSERT_EQ(requests.size(), 10);
//Sleep 1 second before requeueing
::sleep(1);
//Requeue
archiveAlgos.referenceAndSwitchOwnership(tapepool,
agentRef.getAgentAddress(), requests, lc);
aq->fetchNoLock();
uint64_t secondBatchOldestJobStartTime;
secondBatchOldestJobStartTime = aq->getJobsSummary().oldestJobStartTime;
//As we did not pop, the first inserted batch of jobs is the oldest one
ASSERT_EQ(firstBatchOldestJobStartTime,secondBatchOldestJobStartTime);
}
// Now pop the first 10 batches of jobs --> the queue oldestjobstarttime should be equal to the second batch oldestjobstarttime
ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PopCriteria popCriteria;
popCriteria.files = 10;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
auto poppedJobs = archiveAlgos.popNextBatch(tapepool, popCriteria, lc);
ASSERT_EQ(poppedJobs.summary.files, 10);
//The new oldestJobStartTime should be equal to the jobstarttime of the first job of the second batch
aq->fetchNoLock();
time_t oldestJobStartTime = aq->getJobsSummary().oldestJobStartTime;
ASSERT_TRUE(oldestJobStartTime > firstBatchOldestJobStartTime);
}
}
......@@ -70,7 +70,6 @@ void ArchiveQueue::initialize(const std::string& name) {
void ArchiveQueue::commit() {
if (!checkMapsAndShardsCoherency()) {
rebuild();
m_payload.set_mapsrebuildcount(m_payload.mapsrebuildcount()+1);
}
ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>::commit();
}
......@@ -197,10 +196,56 @@ void ArchiveQueue::rebuild() {
m_payload.set_archivejobscount(totalJobs);
m_payload.set_archivejobstotalsize(totalBytes);
m_payload.set_oldestjobcreationtime(oldestJobCreationTime);
m_payload.set_mapsrebuildcount(m_payload.mapsrebuildcount()+1);
// We went through all the shard, re-updated the summaries, removed references to
// gone shards. Done.
}
void ArchiveQueue::recomputeOldestJobCreationTime(){
checkPayloadWritable();
std::list<ArchiveQueueShard> shards;
std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers;
for (auto & sa: m_payload.archivequeueshards()) {
shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore));
shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch());
}
auto s = shards.begin();
auto sf = shardsFetchers.begin();
time_t oldestJobCreationTime=std::numeric_limits<time_t>::max();
while (s != shards.end()) {
// Each shard could be gone
try {
(*sf)->wait();
} catch (Backend::NoSuchObject & ex) {
// Remove the shard from the list
auto aqs = m_payload.mutable_archivequeueshards()->begin();
while (aqs != m_payload.mutable_archivequeueshards()->end()) {
if (aqs->address() == s->getAddressIfSet()) {
aqs = m_payload.mutable_archivequeueshards()->erase(aqs);
} else {
aqs++;
}
}
goto nextShard;
}
{
// The shard is still around, let's compute its oldest job
for (auto & j: s->dumpJobs()) {
if (j.startTime < oldestJobCreationTime) oldestJobCreationTime = j.startTime;
}
}
nextShard:;
s++;
sf++;
}
if(oldestJobCreationTime != std::numeric_limits<time_t>::max()){
m_payload.set_oldestjobcreationtime(oldestJobCreationTime);
}
}
bool ArchiveQueue::isEmpty() {
checkPayloadReadable();
......@@ -514,6 +559,7 @@ void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemov
}
); // end of remove_if
// And commit the queue (once per shard should not hurt performance).
recomputeOldestJobCreationTime();
commit();
}
}
......
......@@ -61,6 +61,9 @@ private:
// Rebuild from shards if something goes wrong.
void rebuild();
// Recompute oldest job creation time
void recomputeOldestJobCreationTime();
public:
// Set/get tape pool
void setTapePool(const std::string & name);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment