Commit 8ac988d2 authored by Eric Cano's avatar Eric Cano
Browse files

Implemented OStoreDB::getNextArchiveJobsToReportBatch().

parent 2389888b
......@@ -65,6 +65,7 @@ public:
typedef std::list<OpFailure> list;
};
class PoppedElement;
class PoppedElementsSummary;
class PopCriteria {
public:
......@@ -114,14 +115,21 @@ public:
AgentReference & agentRef, log::LogContext & lc);
void removeReferencesAndCommit(Container & cont, typename OpFailure<InsertedElement>::list & elementsOpFailures);
void removeReferencesAndCommit(Container & cont, std::list<ElementAddress>& elementAddressList);
static ElementPointerContainer switchElementsOwnership(ElementMemoryContainer & elemMemCont, const ContainerAddress & contAddress,
const ContainerAddress & previousOwnerAddress, log::LogContext & lc);
static typename OpFailure<InsertedElement>::list switchElementsOwnership(typename InsertedElement::list & elemMemCont,
const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t,
log::LogContext & lc);
static typename OpFailure<PoppedElement>::list switchElementsOwnership(PoppedElementsBatch & popedElementBatch,
const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t,
log::LogContext & lc);
template <class Element>
static PoppedElementsSummary getElementSummary(const Element &);
static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria,
ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
};
/************************************************************************************************************/
/* The algorithms themselves ********************************************************************************/
/************************************************************************************************************/
template <class C>
class ContainerAlgorithms {
public:
......
......@@ -140,12 +140,12 @@ public:
class ArchiveQueueToReport: public ArchiveQueue {
public:
template<typename...Ts> ArchiveQueueToReport(Ts...args): ArchiveQueue(args...) {}
template<typename...Ts> ArchiveQueueToReport(Ts&...args): ArchiveQueue(args...) {}
};
class ArchiveQueueFailed: public ArchiveQueue {
public:
template<typename...Ts> ArchiveQueueFailed(Ts...args): ArchiveQueue(args...) {}
template<typename...Ts> ArchiveQueueFailed(Ts&...args): ArchiveQueue(args...) {}
};
}}
......@@ -189,6 +189,10 @@ void ContainerTraits<ArchiveQueue>::PoppedElementsBatch::addToLog(log::ScopedPar
.add("files", summary.files);
}
void ContainerTraits<ArchiveQueueToReport>::PoppedElementsBatch::addToLog(log::ScopedParamContainer& params) {
params.add("files", summary.files);
}
auto ContainerTraits<ArchiveQueue>::getPoppingElementsCandidates(Container& cont, PopCriteria& unfulfilledCriteria,
ElementsToSkipSet& elemtsToSkip, log::LogContext& lc) -> PoppedElementsBatch {
PoppedElementsBatch ret;
......@@ -221,6 +225,12 @@ auto ContainerTraits<ArchiveQueue>::getElementSummary(const PoppedElement& poppe
return ret;
}
auto ContainerTraits<ArchiveQueueToReport>::getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary {
PoppedElementsSummary ret;
ret.files = 1;
return ret;
}
void ContainerTraits<ArchiveQueue>::PoppedElementsList::insertBack(PoppedElementsList&& insertedList) {
for (auto &e: insertedList) {
std::list<PoppedElement>::emplace_back(std::move(e));
......@@ -242,39 +252,6 @@ auto ContainerTraits<ArchiveQueueToReport>::PopCriteria::operator-=(const Popped
return *this;
}
auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(PoppedElementsBatch & popedElementBatch,
const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t,
log::LogContext & lc)
-> OpFailure<PoppedElement>::list {
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: popedElementBatch.elements) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
auto e = popedElementBatch.elements.begin();
OpFailure<PoppedElement>::list ret;
while (e != popedElementBatch.elements.end()) {
try {
u->get()->wait();
e->archiveFile = u->get()->getArchiveFile();
e->archiveReportURL = u->get()->getArchiveReportURL();
e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL();
} catch (...) {
ret.push_back(OpFailure<PoppedElement>());
ret.back().element = &(*e);
ret.back().failure = std::current_exception();
}
u++;
e++;
}
timingList.insertAndReset("asyncUpdateCompletionTime", t);
return ret;
}
void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId,
log::LogContext& lc) {
if (cont.isEmpty()) {
......
......@@ -137,9 +137,38 @@ public:
ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
static OpFailure<PoppedElement>::list switchElementsOwnership(PoppedElementsBatch & popedElementBatch,
template <class t_PoppedElementsBatch>
static OpFailure<PoppedElement>::list switchElementsOwnership(t_PoppedElementsBatch & popedElementBatch,
const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t,
log::LogContext & lc);
log::LogContext & lc) {
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: popedElementBatch.elements) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
auto e = popedElementBatch.elements.begin();
OpFailure<PoppedElement>::list ret;
while (e != popedElementBatch.elements.end()) {
try {
u->get()->wait();
e->archiveFile = u->get()->getArchiveFile();
e->archiveReportURL = u->get()->getArchiveReportURL();
e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL();
} catch (...) {
ret.push_back(OpFailure<PoppedElement>());
ret.back().element = &(*e);
ret.back().failure = std::current_exception();
}
u++;
e++;
}
timingList.insertAndReset("asyncUpdateCompletionTime", t);
return ret;
}
static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc);
......@@ -147,6 +176,7 @@ public:
template<>
class ContainerTraits<ArchiveQueueToReport>: public ContainerTraits<ArchiveQueue> {
public:
class PoppedElementsSummary;
class PopCriteria {
public:
......@@ -173,6 +203,8 @@ class ContainerTraits<ArchiveQueueToReport>: public ContainerTraits<ArchiveQueue
void addToLog(log::ScopedParamContainer &);
};
static PoppedElementsSummary getElementSummary(const PoppedElement &);
static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria,
ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
};
......
......@@ -686,7 +686,35 @@ OStoreDB::ArchiveQueueItor_t OStoreDB::getArchiveJobItor(const std::string &tape
//------------------------------------------------------------------------------
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::getNextArchiveJobsToReportBatch(
uint64_t filesRequested, log::LogContext& logContext) {
throw cta::exception::Exception("TODOTODO: implement.");
typedef objectstore::ContainerAlgorithms<ArchiveQueueToReport> AQTRAlgo;
AQTRAlgo aqtrAlgo(m_objectStore, *m_agentReference);
// Decide from which queue we are going to pop.
RootEntry re(m_objectStore);
re.fetchNoLock();
while (true) {
auto queueList = re.dumpArchiveQueues(QueueType::JobsToReport);
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret;
if (queueList.empty()) return ret;
// Try to get jobs from the first queue. If it is empty, it will be trimmed,
// so we can got for another round.
AQTRAlgo::PopCriteria criteria;
criteria.files = filesRequested;
auto jobs = aqtrAlgo.popNextBatch(queueList.front().tapePool, QueueType::JobsToReport, criteria, logContext);
if (jobs.elements.empty()) continue;
for (auto & j: jobs.elements) {
std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), *this, nullptr));
aj->tapeFile.copyNb = j.copyNb;
aj->archiveFile = j.archiveFile;
aj->archiveReportURL = j.archiveReportURL;
aj->errorReportURL = j.errorReportURL;
aj->srcURL = j.srcURL;
// We leave the tape file not set. It does not exist in all cases (not in case of failure).
aj->m_jobOwned = true;
aj->m_mountId = 0;
ret.emplace_back(std::move(aj));
}
return ret;
}
}
//------------------------------------------------------------------------------
......@@ -1676,7 +1704,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
// We can construct the return value.
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret;
for (auto & j: jobs.elements) {
std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), m_oStoreDB, *this));
std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), m_oStoreDB, this));
aj->tapeFile.copyNb = j.copyNb;
aj->archiveFile = j.archiveFile;
aj->archiveReportURL = j.archiveReportURL;
......@@ -1719,8 +1747,17 @@ void OStoreDB::ArchiveMount::complete(time_t completionTime) {
//------------------------------------------------------------------------------
// OStoreDB::ArchiveJob::ArchiveJob()
//------------------------------------------------------------------------------
OStoreDB::ArchiveJob::ArchiveJob(const std::string& jobAddress, OStoreDB& oStoreDB, ArchiveMount& am): m_jobOwned(false),
m_oStoreDB(oStoreDB), m_archiveRequest(jobAddress, m_oStoreDB.m_objectStore), m_archiveMount(am) {}
OStoreDB::ArchiveJob::ArchiveJob(const std::string& jobAddress, OStoreDB& oStoreDB, ArchiveMount* am): m_jobOwned(false),
m_oStoreDB(oStoreDB), m_archiveRequest(jobAddress, m_oStoreDB.m_objectStore), m_archiveMount(am) { }
//------------------------------------------------------------------------------
// OStoreDB::ArchiveJob::getArchiveMount()
//------------------------------------------------------------------------------
OStoreDB::ArchiveMount& OStoreDB::ArchiveJob::getArchiveMount() {
if (!m_archiveMount)
throw cta::exception::Exception("In OStoreDB::ArchiveJob::getArchiveMount(): trying to access a non-set mount.");
return *m_archiveMount;
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::RetrieveMount()
......
......@@ -163,6 +163,7 @@ public:
/* === Archive Job Handling =============================================== */
class ArchiveJob: public SchedulerDatabase::ArchiveJob {
friend class OStoreDB::ArchiveMount;
friend class OStoreDB;
public:
CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
......@@ -174,13 +175,14 @@ public:
void bumpUpTapeFileCount(uint64_t newFileCount) override;
~ArchiveJob() override;
private:
ArchiveJob(const std::string &, OStoreDB &, ArchiveMount &);
ArchiveJob(const std::string &, OStoreDB &, ArchiveMount *);
bool m_jobOwned;
uint64_t m_mountId;
std::string m_tapePool;
OStoreDB & m_oStoreDB;
objectstore::ArchiveRequest m_archiveRequest;
ArchiveMount & m_archiveMount;
ArchiveMount *m_archiveMount;
ArchiveMount & getArchiveMount();
std::unique_ptr<objectstore::ArchiveRequest::AsyncJobSuccessfulUpdater> m_jobUpdate;
};
friend class ArchiveJob;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment