Commit 43a1aa3b authored by Eric Cano's avatar Eric Cano
Browse files

Moved catalogue access of Scheduler::queueRetrieve() into OStoreDB::queueRetrieve()

Changed the default behavior of DummyCatalogue so unit tests keep passing.
Adapted SchedulerDatabase API.
parent d4132fd8
......@@ -122,14 +122,14 @@ public:
common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string>& vids) const {
// Minimal implementation of VidToMap for retrieve request unit tests. We just support
// disabled status for the tapes.
// If the tape is not listed, it is listed as disabled in the return value.
// If the tape is not listed, it is listed as enabled in the return value.
threading::MutexLocker lm(m_tapeEnablingMutex);
common::dataStructures::VidToTapeMap ret;
for (const auto & v: vids) {
try {
ret[v].disabled = !m_tapeEnabling.at(v);
} catch (std::out_of_range &) {
ret[v].disabled = true;
ret[v].disabled = false;
}
}
return ret;
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
......@@ -597,6 +598,8 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
}
// Mark the tape as enabled
catalogue.addEnabledTape("Tape0");
// Mark the other tape as disabled
catalogue.addDisabledTape("Tape1");
// Create the garbage collector and run it twice.
cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector");
cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be);
......
......@@ -242,7 +242,7 @@ std::map<std::string, Helpers::RetrieveQueueStatisticsWithTime> Helpers::g_retri
cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex;
//------------------------------------------------------------------------------
// Helpers::getLockedAndFetchedArchiveQueue()
// Helpers::getRetrieveQueueStatistics()
//------------------------------------------------------------------------------
std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueStatistics(
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string>& vidsToConsider,
......
......@@ -767,14 +767,17 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue
//------------------------------------------------------------------------------
// OStoreDB::queueRetrieve()
//------------------------------------------------------------------------------
void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria,
const std::string &vid) {
std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria) {
assertAgentAddressSet();
// Get the best vid from the cache
std::set<std::string> candidateVids;
for (auto & tf:criteria.archiveFile.tapeFiles) candidateVids.insert(tf.second.vid);
std::string bestVid=Helpers::selectBestRetrieveQueue(candidateVids, m_catalogue, m_objectStore);
// Check that the requested retrieve job (for the provided vid) exists.
if (!std::count_if(criteria.archiveFile.tapeFiles.cbegin(),
criteria.archiveFile.tapeFiles.end(),
[vid](decltype(*criteria.archiveFile.tapeFiles.cbegin()) & tf){ return tf.second.vid == vid; }))
[bestVid](decltype(*criteria.archiveFile.tapeFiles.cbegin()) & tf){ return tf.second.vid == bestVid; }))
throw RetrieveRequestHasNoCopies("In OStoreDB::queueRetrieve(): no tape file for requested vid.");
// In order to post the job, construct it first in memory.
objectstore::RetrieveRequest rReq(m_agentReference->nextId("RetrieveRequest"), m_objectStore);
......@@ -792,7 +795,7 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest&
RootEntry re(m_objectStore);
ScopedExclusiveLock rel(re);
re.fetch();
auto rqAddr=re.addOrGetRetrieveQueueAndCommit(vid, *m_agentReference);
auto rqAddr=re.addOrGetRetrieveQueueAndCommit(bestVid, *m_agentReference);
// Create the request.
rel.release();
RetrieveQueue rq(rqAddr, m_objectStore);
......@@ -800,7 +803,7 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest&
rq.fetch();
// We need to find the job corresponding to the vid
for (auto & j: rReq.getArchiveFile().tapeFiles) {
if (j.second.vid == vid) {
if (j.second.vid == bestVid) {
rq.addJob(j.second.copyNb, j.second.fSeq, rReq.getAddressIfSet(), criteria.archiveFile.fileSize,
criteria.mountPolicy, rReq.getEntryLog().time);
rReq.setActiveCopyNumber(j.second.copyNb);
......@@ -819,6 +822,7 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest&
rrl.release();
// And relinquish ownership form agent
m_agentReference->removeFromOwnership(rReq.getAddressIfSet(), m_objectStore);
return bestVid;
}
//------------------------------------------------------------------------------
......
......@@ -231,9 +231,8 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies);
CTA_GENERATE_EXCEPTION_CLASS(TapeCopyNumberOutOfRange);
void queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria,
const std::string &vid) override;
std::string queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria) override;
std::list<RetrieveRequestDump> getRetrieveRequestsByVid(const std::string& vid) const override;
......
......@@ -139,10 +139,9 @@ public:
return m_OStoreDB.getRetrieveQueueStatistics(criteria, vidsToConsider);
}
void queueRetrieve(const common::dataStructures::RetrieveRequest& rqst,
const common::dataStructures::RetrieveFileQueueCriteria &criteria,
const std::string &vid) override {
m_OStoreDB.queueRetrieve(rqst, criteria, vid);
std::string queueRetrieve(const common::dataStructures::RetrieveRequest& rqst,
const common::dataStructures::RetrieveFileQueueCriteria &criteria) override {
return m_OStoreDB.queueRetrieve(rqst, criteria);
}
std::list<cta::common::dataStructures::DriveState> getDriveStates() const override {
......
......@@ -130,41 +130,7 @@ void Scheduler::queueRetrieve(
// Get the queue criteria
const common::dataStructures::RetrieveFileQueueCriteria queueCriteria =
m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester);
// Get the statuses of the tapes on which we have files.
std::set<std::string> vids;
for (auto & tf: queueCriteria.archiveFile.tapeFiles) {
vids.insert(tf.second.vid);
}
auto tapeStatuses = m_catalogue.getTapesByVid(vids);
// Filter out the tapes with are disabled
for (auto & t: tapeStatuses) {
if (t.second.disabled)
vids.erase(t.second.vid);
}
if (vids.empty())
throw exception::NonRetryableError("In Scheduler::queueRetrieve(): all copies are on disabled tapes");
auto catalogueTime = t.secs(utils::Timer::resetCounter);
// Get the statistics for the potential tapes on which we will retrieve.
auto stats=m_db.getRetrieveQueueStatistics(queueCriteria, vids);
// Sort the potential queues.
stats.sort(SchedulerDatabase::RetrieveQueueStatistics::leftGreaterThanRight);
// If there are several equivalent entries, choose randomly among them.
// First element will always be selected.
std::set<std::string> candidateVids;
for (auto & s: stats) {
if (!(s<stats.front()) && !(s>stats.front()))
candidateVids.insert(s.vid);
}
if (candidateVids.empty())
throw exception::Exception("In Scheduler::queueRetrieve(): failed to sort and select candidate VIDs");
// We need to get a random number [0, candidateVids.size() -1]
std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
std::uniform_int_distribution<size_t> distribution(0, candidateVids.size() -1);
size_t index=distribution(dre);
auto it=candidateVids.cbegin();
std::advance(it, index);
std::string selectedVid=*it;
m_db.queueRetrieve(request, queueCriteria, selectedVid);
std::string selectedVid = m_db.queueRetrieve(request, queueCriteria);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("fileId", request.archiveFileID)
......@@ -201,7 +167,6 @@ void Scheduler::queueRetrieve(
.add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed)
.add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge)
.add("policyPriority", queueCriteria.mountPolicy.retrievePriority)
.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "Queued retrieve request");
}
......
......@@ -243,16 +243,16 @@ public:
const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria,
const std::set<std::string> & vidsToConsider) = 0;
/**
* Queues the specified request.
* Queues the specified request. As the object store has access to the catalogue,
* the best queue (most likely to go, and not disabled can be chosen directly there).
*
* @param rqst The request.
* @param criteria The criteria retrieved from the CTA catalogue to be used to
* decide how to quue the request.
* @param vid: the vid of the retrieve queue on which we will queue the request.
* @return the selected vid (mostly for logging)
*/
virtual void queueRetrieve(const cta::common::dataStructures::RetrieveRequest &rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria,
const std::string &vid) = 0;
virtual std::string queueRetrieve(const cta::common::dataStructures::RetrieveRequest &rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria) = 0;
/**
* Returns all of the existing retrieve jobs grouped by tape and then
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment