Skip to content
Snippets Groups Projects
Commit 8ebbd146 authored by Michael Davis's avatar Michael Davis
Browse files

[os-generic-queues] Adds skeleton method definitions for RetrieveQueueAlgorithms

parent 388043c3
No related branches found
No related tags found
No related merge requests found
......@@ -139,7 +139,7 @@ struct ContainerTraits
ElementsToSkipSet &elemtsToSkip, log::LogContext &lc);
static const std::string c_containerTypeName; // = "genericContainer"
static const std::string c_identifyerType; // = "genericId"
static const std::string c_identifierType; // = "genericId"
};
......@@ -232,7 +232,7 @@ public:
timingList.insertAndReset("queueUnlockTime", t);
log::ScopedParamContainer params(lc);
params.add("C", ContainerTraits<C>::c_containerTypeName)
.add(ContainerTraits<C>::c_identifyerType, contId)
.add(ContainerTraits<C>::c_identifierType, contId)
.add("containerAddress", cont.getAddressIfSet());
contSummaryAfter.addDeltaToLog(contSummaryBefore, params);
timingList.addToLog(params);
......@@ -378,7 +378,7 @@ public:
}
log::ScopedParamContainer params(lc);
params.add("C", ContainerTraits<C>::c_containerTypeName)
.add(ContainerTraits<C>::c_identifyerType, contId)
.add(ContainerTraits<C>::c_identifierType, contId)
.add("containerAddress", cont.getAddressIfSet());
ret.summary.addDeltaToLog(previousSummary, params);
contSummaryAfter.addDeltaToLog(contSummaryBefore, params);
......@@ -390,7 +390,7 @@ public:
{
log::ScopedParamContainer params(lc);
params.add("C", ContainerTraits<C>::c_containerTypeName)
.add(ContainerTraits<C>::c_identifyerType, contId);
.add(ContainerTraits<C>::c_identifierType, contId);
ret.addToLog(params);
timingList.addToLog(params);
params.add("schedulerDbTime", totalTime.secs());
......
......@@ -22,6 +22,14 @@
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<ArchiveQueue>::c_containerTypeName = "ArchiveQueue";
template<>
const std::string ContainerTraits<ArchiveQueue>::c_identifierType = "tapepool";
// ContainerTraitsTypes
void ContainerTraitsTypes<ArchiveQueue>::PoppedElementsSummary::
addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer &params) {
params.add("filesAdded", files - previous.files)
......@@ -64,11 +72,7 @@ addToLog(log::ScopedParamContainer &params) {
.add("files", summary.files);
}
template<>
const std::string ContainerTraits<ArchiveQueue>::c_containerTypeName = "ArchiveQueue";
template<>
const std::string ContainerTraits<ArchiveQueue>::c_identifyerType = "tapepool";
// ContainerTraits
template<>
void ContainerTraits<ArchiveQueue>::getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef,
......
......@@ -17,9 +17,66 @@
*/
#include "RetrieveQueueAlgorithms.hpp"
#include "common/make_unique.hpp"
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<RetrieveQueue>::c_containerTypeName = "RetrieveQueue";
template<>
const std::string ContainerTraits<RetrieveQueue>::c_identifierType = "vid";
// ContainerTraitsTypes
void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsSummary::
addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer &params) {
#if 0
params.add("filesAdded", files - previous.files)
.add("bytesAdded", bytes - previous.bytes)
.add("filesBefore", previous.files)
.add("bytesBefore", previous.bytes)
.add("filesAfter", files)
.add("bytesAfter", bytes);
#endif
}
void ContainerTraitsTypes<RetrieveQueue>::ContainerSummary::
addDeltaToLog(const ContainerSummary &previous, log::ScopedParamContainer &params) {
#if 0
params.add("queueJobsBefore", previous.jobs)
.add("queueBytesBefore", previous.bytes)
.add("queueJobsAfter", jobs)
.add("queueBytesAfter", bytes);
#endif
}
auto ContainerTraits<RetrieveQueue>::PopCriteria::
operator-=(const PoppedElementsSummary &pes) -> PopCriteria & {
bytes -= pes.bytes;
files -= pes.files;
return *this;
}
void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsList::
insertBack(PoppedElementsList &&insertedList) {
for (auto &e: insertedList) {
std::list<PoppedElement>::emplace_back(std::move(e));
}
}
void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsList::insertBack(PoppedElement &&e) {
std::list<PoppedElement>::emplace_back(std::move(e));
}
void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsBatch::
addToLog(log::ScopedParamContainer &params) {
params.add("bytes", summary.bytes)
.add("files", summary.files);
}
// ContainerTraits
template<>
void ContainerTraits<RetrieveQueue>::
getLockedAndFetched(Container &cont, ScopedExclusiveLock &aqL, AgentReference &agRef,
......@@ -28,6 +85,63 @@ getLockedAndFetched(Container &cont, ScopedExclusiveLock &aqL, AgentReference &a
Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, QueueType::LiveJobs, lc);
}
template<>
void ContainerTraits<RetrieveQueue>::
getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc)
{
#if 0
// Try and get access to a queue.
size_t attemptCount = 0;
retry:
objectstore::RootEntry re(cont.m_objectStore);
re.fetchNoLock();
std::string aqAddress;
auto aql = re.dumpArchiveQueues(QueueType::LiveJobs);
for (auto & aqp : aql) {
if (aqp.tapePool == cId)
aqAddress = aqp.address;
}
if (!aqAddress.size()) throw NoSuchContainer("In ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(): no such archive queue");
// try and lock the archive queue. Any failure from here on means the end of the getting jobs.
cont.setAddress(aqAddress);
//findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter);
try {
if (contLock.isLocked()) contLock.release();
contLock.lock(cont);
cont.fetch();
//lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter);
} catch (cta::exception::Exception & ex) {
// The queue is now absent. We can remove its reference in the root entry.
// A new queue could have been added in the mean time, and be non-empty.
// We will then fail to remove from the RootEntry (non-fatal).
ScopedExclusiveLock rexl(re);
re.fetch();
try {
re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc);
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry");
} catch (RootEntry::ArchiveQueueNotEmpty & ex) {
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet())
.add("Message", ex.getMessageValue());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry");
} catch (RootEntry::NoSuchArchiveQueue & ex) {
// Somebody removed the queue in the mean time. Barely worth mentioning.
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::DEBUG, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry: already done.");
}
//emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter);
attemptCount++;
goto retry;
}
#endif
}
template<>
void ContainerTraits<RetrieveQueue>::
addReferencesAndCommit(Container &cont, InsertedElement::list &elemMemCont, AgentReference &agentRef,
......@@ -39,6 +153,38 @@ addReferencesAndCommit(Container &cont, InsertedElement::list &elemMemCont, Agen
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr)});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
#if 0
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
for (auto & e: elemMemCont) {
ElementDescriptor jd;
jd.copyNb = e.copyNb;
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
e.mountPolicy, time(nullptr)});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
#endif
}
template<>
void ContainerTraits<RetrieveQueue>::addReferencesIfNecessaryAndCommit(Container& cont,
InsertedElement::list& elemMemCont, AgentReference& agentRef, log::LogContext& lc)
{
#if 0
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
for (auto & e: elemMemCont) {
ElementDescriptor jd;
jd.copyNb = e.copyNb;
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
e.mountPolicy, time(nullptr)});
}
cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc);
#endif
}
template<>
......@@ -52,6 +198,22 @@ removeReferencesAndCommit(Container &cont, OpFailure<InsertedElement>::list &ele
cont.removeJobsAndCommit(elementsToRemove);
}
template<>
void ContainerTraits<RetrieveQueue>::
removeReferencesAndCommit(Container &cont, std::list<ElementAddress> &elementAddressList) {
cont.removeJobsAndCommit(elementAddressList);
}
template<>
auto ContainerTraits<RetrieveQueue>::
getContainerSummary(Container &cont) -> ContainerSummary {
ContainerSummary ret;
#if 0
ret.JobsSummary::operator=(cont.getJobsSummary());
#endif
return ret;
}
template<>
auto ContainerTraits<RetrieveQueue>::
switchElementsOwnership(InsertedElement::list &elemMemCont, const ContainerAddress &contAddress,
......@@ -83,4 +245,103 @@ switchElementsOwnership(InsertedElement::list &elemMemCont, const ContainerAddre
return ret;
}
template<>
auto ContainerTraits<RetrieveQueue>::
getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip,
log::LogContext &lc) -> PoppedElementsBatch
{
PoppedElementsBatch ret;
#if 0
auto candidateJobsFromQueue=cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip);
for (auto &cjfq: candidateJobsFromQueue.candidates) {
ret.elements.emplace_back(PoppedElement{cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore), cjfq.copyNb, cjfq.size,
common::dataStructures::ArchiveFile(), "", "", "", });
ret.summary.bytes += cjfq.size;
ret.summary.files++;
}
#endif
return ret;
}
template<>
auto ContainerTraits<RetrieveQueue>::
getElementSummary(const PoppedElement &poppedElement) -> PoppedElementsSummary {
PoppedElementsSummary ret;
#if 0
ret.bytes = poppedElement.bytes;
ret.files = 1;
#endif
return ret;
}
template<>
auto ContainerTraits<RetrieveQueue>::
switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const ContainerAddress &contAddress,
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t,
log::LogContext &lc) -> OpFailure<PoppedElement>::list
{
#if 0
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: popedElementBatch.elements) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
auto e = popedElementBatch.elements.begin();
#endif
OpFailure<PoppedElement>::list ret;
#if 0
while (e != popedElementBatch.elements.end()) {
try {
u->get()->wait();
e->archiveFile = u->get()->getArchiveFile();
e->archiveReportURL = u->get()->getArchiveReportURL();
e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL();
} catch (...) {
ret.push_back(OpFailure<PoppedElement>());
ret.back().element = &(*e);
ret.back().failure = std::current_exception();
}
u++;
e++;
}
timingList.insertAndReset("asyncUpdateCompletionTime", t);
#endif
return ret;
}
template<>
void ContainerTraits<RetrieveQueue>::
trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId,
log::LogContext &lc)
{
#if 0
if(cont.isEmpty()) {
// The current implementation is done unlocked.
contLock.release();
try {
// The queue should be removed as it is empty.
RootEntry re(cont.m_objectStore);
ScopedExclusiveLock rexl(re);
re.fetch();
re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc);
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::INFO, "In ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(): deleted empty queue");
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet())
.add("Message", ex.getMessageValue());
lc.log(log::INFO, "In ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(): could not delete a presumably empty queue");
}
//queueRemovalTime += localQueueRemovalTime = t.secs(utils::Timer::resetCounter);
}
#endif
}
}} // namespace cta::objectstore
......@@ -26,7 +26,7 @@ namespace cta { namespace objectstore {
template<>
struct ContainerTraitsTypes<RetrieveQueue>
{
struct ContainerSummary {
struct ContainerSummary : public RetrieveQueue::JobsSummary {
void addDeltaToLog(const ContainerSummary&, log::ScopedParamContainer&);
};
......@@ -54,10 +54,16 @@ struct ContainerTraitsTypes<RetrieveQueue>
uint64_t bytes;
};
struct PoppedElementsSummary {
//PoppedElementsSummary();
bool operator<(const PopCriteria&);
PoppedElementsSummary& operator+=(const PoppedElementsSummary&);
//PoppedElementsSummary(const PoppedElementsSummary&);
uint64_t bytes = 0;
uint64_t files = 0;
bool operator<(const PopCriteria &pc) {
return bytes < pc.bytes && files < pc.files;
}
PoppedElementsSummary& operator+=(const PoppedElementsSummary &other) {
bytes += other.bytes;
files += other.files;
return *this;
}
void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer&);
};
struct PoppedElementsList : public std::list<PoppedElement> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment