Commit d5a3ec1c authored by Michael Davis's avatar Michael Davis
Browse files

[os-generic-queues] Adds partial specialisation for Archive and Retrieve queue types

parent c9ac249c
......@@ -31,6 +31,7 @@
namespace cta { namespace objectstore {
#if 0
// Generic queue types for partial specialisation
struct ArchiveQueue_t {};
struct RetrieveQueue_t {};
......@@ -160,6 +161,7 @@ struct ContainerTraits
static const std::string c_containerTypeName; // = "genericContainer"
static const std::string c_identifierType; // = "genericId"
};
#endif
......
......@@ -36,7 +36,7 @@
namespace unitTests {
void fill_retrieve_requests(
typename cta::objectstore::ContainerAlgorithms<cta::objectstore::RetrieveQueue_t,cta::objectstore::RetrieveQueue>::InsertedElement::list &requests,
typename cta::objectstore::ContainerAlgorithms<cta::objectstore::RetrieveQueue,cta::objectstore::RetrieveQueue>::InsertedElement::list &requests,
cta::objectstore::BackendVFS &be,
cta::objectstore::AgentReference &agentRef)
{
......@@ -73,7 +73,7 @@ void fill_retrieve_requests(
rqc.mountPolicy.maxDrivesAllowed = 1;
rqc.mountPolicy.retrieveMinRequestAge = 1;
rqc.mountPolicy.retrievePriority = 1;
requests.emplace_back(ContainerAlgorithms<RetrieveQueue_t,RetrieveQueue>::InsertedElement{
requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueue>::InsertedElement{
cta::make_unique<RetrieveRequest>(rrAddr, be), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_Pending
});
auto &rr = *requests.back().retrieveRequest;
......@@ -116,7 +116,7 @@ void fill_retrieve_requests(
rqc.mountPolicy.maxDrivesAllowed = 1;
rqc.mountPolicy.retrieveMinRequestAge = 1;
rqc.mountPolicy.retrievePriority = 1;
requests.emplace_back(ContainerAlgorithms<RetrieveQueue_t,RetrieveQueue>::InsertedElement{
requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueue>::InsertedElement{
cta::make_unique<RetrieveRequest>(rrAddr, be), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransfer
});
auto & rr=*requests.back().retrieveRequest;
......@@ -160,7 +160,7 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
rel.release();
agent.initialize();
agent.insertAndRegisterSelf(lc);
ContainerAlgorithms<ArchiveQueue_t,ArchiveQueue>::InsertedElement::list requests;
ContainerAlgorithms<ArchiveQueue,ArchiveQueue>::InsertedElement::list requests;
std::list<std::unique_ptr<cta::objectstore::ArchiveRequest>> archiveRequests;
for (size_t i=0; i<10; i++) {
std::string arAddr = agentRef.nextId("ArchiveRequest");
......@@ -179,7 +179,7 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
aFile.fileSize = 667;
aFile.storageClass = "sc";
archiveRequests.emplace_back(new cta::objectstore::ArchiveRequest(arAddr, be));
requests.emplace_back(ContainerAlgorithms<ArchiveQueue_t,ArchiveQueue>::InsertedElement{archiveRequests.back().get(), 1, aFile, mp,
requests.emplace_back(ContainerAlgorithms<ArchiveQueue,ArchiveQueue>::InsertedElement{archiveRequests.back().get(), 1, aFile, mp,
cta::nullopt});
auto & ar=*requests.back().archiveRequest;
auto copyNb = requests.back().copyNb;
......@@ -194,10 +194,10 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
ar.setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr)));
ar.insert();
}
ContainerAlgorithms<ArchiveQueue_t,ArchiveQueue> archiveAlgos(be, agentRef);
ContainerAlgorithms<ArchiveQueue,ArchiveQueue> archiveAlgos(be, agentRef);
archiveAlgos.referenceAndSwitchOwnership("Tapepool", QueueType::JobsToTransfer, requests, lc);
// Now get the requests back
ContainerTraits<ArchiveQueue_t,ArchiveQueue>::PopCriteria popCriteria;
ContainerTraits<ArchiveQueue,ArchiveQueue>::PopCriteria popCriteria;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
popCriteria.files = 100;
auto poppedJobs = archiveAlgos.popNextBatch("Tapepool", QueueType::JobsToTransfer, popCriteria, lc);
......@@ -230,7 +230,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
rel.release();
agent.initialize();
agent.insertAndRegisterSelf(lc);
ContainerAlgorithms<RetrieveQueue_t,RetrieveQueue>::InsertedElement::list requests;
ContainerAlgorithms<RetrieveQueue,RetrieveQueue>::InsertedElement::list requests;
fill_retrieve_requests(requests, be, agentRef);
{
......@@ -249,17 +249,17 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
rel2.release();
agent2.initialize();
agent2.insertAndRegisterSelf(lc);
ContainerAlgorithms<RetrieveQueue_t,RetrieveQueue>::InsertedElement::list requests2;
ContainerAlgorithms<RetrieveQueue,RetrieveQueue>::InsertedElement::list requests2;
fill_retrieve_requests(requests2, be2, agentRef2);
auto a1 = agentRef2.getAgentAddress();
auto a2 = agentRef2.getAgentAddress();
ContainerAlgorithms<RetrieveQueue_t,RetrieveQueue> retrieveAlgos2(be2, agentRef2);
ContainerAlgorithms<RetrieveQueue,RetrieveQueue> retrieveAlgos2(be2, agentRef2);
retrieveAlgos2.referenceAndSwitchOwnershipIfNecessary("VID", QueueType::JobsToTransfer,
a2, a1, requests2, lc);
}
ContainerAlgorithms<RetrieveQueue_t,RetrieveQueue> retrieveAlgos(be, agentRef);
ContainerAlgorithms<RetrieveQueue,RetrieveQueue> retrieveAlgos(be, agentRef);
try {
ASSERT_EQ(requests.size(), 10);
......@@ -267,19 +267,19 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
agentRef.getAgentAddress(), requests, lc);
// Now get the requests back
ContainerTraits<RetrieveQueue_t,RetrieveQueue>::PopCriteria popCriteria;
ContainerTraits<RetrieveQueue,RetrieveQueue>::PopCriteria popCriteria;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
popCriteria.files = 100;
auto poppedJobs = retrieveAlgos.popNextBatch("VID", QueueType::JobsToTransfer, popCriteria, lc);
ASSERT_EQ(poppedJobs.summary.files, 10);
// Validate that the summary has the same information as the popped elements
ContainerTraits<RetrieveQueue_t,RetrieveQueue>::PoppedElementsSummary s;
ContainerTraits<RetrieveQueue,RetrieveQueue>::PoppedElementsSummary s;
for(auto &e: poppedJobs.elements) {
s += ContainerTraits<RetrieveQueue_t,RetrieveQueue>::getElementSummary(e);
s += ContainerTraits<RetrieveQueue,RetrieveQueue>::getElementSummary(e);
}
ASSERT_EQ(s, poppedJobs.summary);
} catch (ContainerTraits<RetrieveQueue_t,RetrieveQueue>::OwnershipSwitchFailure & ex) {
} catch (ContainerTraits<RetrieveQueue,RetrieveQueue>::OwnershipSwitchFailure & ex) {
for (auto & e: ex.failedElements) {
try {
throw e.failure;
......
......@@ -22,264 +22,22 @@
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<ArchiveQueue_t,ArchiveQueue>::c_containerTypeName = "ArchiveQueue";
// ArchiveQueue full specialisations for ContainerTraits.
template<>
const std::string ContainerTraits<ArchiveQueue_t,ArchiveQueue>::c_identifierType = "tapepool";
// ContainerTraitsTypes
const std::string ContainerTraits<ArchiveQueue,ArchiveQueue>::c_containerTypeName = "ArchiveQueue";
template<>
void ContainerTraitsTypes<ArchiveQueue_t,ArchiveQueue>::PoppedElementsSummary::
addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer &params) {
params.add("filesAdded", files - previous.files)
.add("bytesAdded", bytes - previous.bytes)
.add("filesBefore", previous.files)
.add("bytesBefore", previous.bytes)
.add("filesAfter", files)
.add("bytesAfter", bytes);
}
#if 0
void ContainerTraitsTypes<ArchiveQueue_t,ArchiveQueueToReport>::PoppedElementsSummary::
addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer &params) {
params.add("filesAdded", files - previous.files)
.add("filesBefore", previous.files)
.add("filesAfter", files);
}
#endif
const std::string ContainerTraits<ArchiveQueue,ArchiveQueue>::c_identifierType = "tapepool";
template<>
void ContainerTraitsTypes<ArchiveQueue_t,ArchiveQueue>::ContainerSummary::
addDeltaToLog(ContainerSummary& previous, log::ScopedParamContainer& params) {
params.add("queueJobsBefore", previous.jobs)
.add("queueBytesBefore", previous.bytes)
.add("queueJobsAfter", jobs)
.add("queueBytesAfter", bytes);
}
template<>
auto ContainerTraits<ArchiveQueue_t,ArchiveQueue>::PopCriteria::
operator-=(const PoppedElementsSummary &pes) -> PopCriteria & {
bytes -= pes.bytes;
files -= pes.files;
return *this;
}
template<>
void ContainerTraitsTypes<ArchiveQueue_t,ArchiveQueue>::PoppedElementsList::
insertBack(PoppedElementsList &&insertedList) {
for (auto &e: insertedList) {
std::list<PoppedElement>::emplace_back(std::move(e));
}
}
template<>
void ContainerTraitsTypes<ArchiveQueue_t,ArchiveQueue>::PoppedElementsList::insertBack(PoppedElement &&e) {
std::list<PoppedElement>::emplace_back(std::move(e));
}
template<>
void ContainerTraitsTypes<ArchiveQueue_t,ArchiveQueue>::PoppedElementsBatch::
addToLog(log::ScopedParamContainer &params) {
params.add("bytes", summary.bytes)
.add("files", summary.files);
}
#if 0
template<>
void ContainerTraitsTypes<ArchiveQueue_t,ArchiveQueueToReport>::PoppedElementsBatch::
addToLog(log::ScopedParamContainer &params) {
params.add("files", summary.files);
}
#endif
// ContainerTraits
template<>
void ContainerTraits<ArchiveQueue_t,ArchiveQueue>::getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef,
const ContainerIdentifier& contId, QueueType queueType, log::LogContext& lc) {
Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, queueType, lc);
}
template<>
void ContainerTraits<ArchiveQueue_t,ArchiveQueue>::addReferencesAndCommit(Container& cont, InsertedElement::list& elemMemCont,
AgentReference& agentRef, log::LogContext& lc) {
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
for (auto & e: elemMemCont) {
ElementDescriptor jd;
jd.copyNb = e.copyNb;
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
cta::common::dataStructures::MountPolicy mp;
if (e.mountPolicy)
mp=*e.mountPolicy;
else
mp=cta::common::dataStructures::MountPolicy();
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
mp, time(nullptr)});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
template<>
void ContainerTraits<ArchiveQueue_t,ArchiveQueue>::addReferencesIfNecessaryAndCommit(Container& cont, InsertedElement::list& elemMemCont,
AgentReference& agentRef, log::LogContext& lc) {
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
for (auto & e: elemMemCont) {
ElementDescriptor jd;
jd.copyNb = e.copyNb;
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
cta::common::dataStructures::MountPolicy mp = e.mountPolicy ? *e.mountPolicy : cta::common::dataStructures::MountPolicy();
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
mp, time(nullptr)});
}
cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc);
}
template<>
void ContainerTraits<ArchiveQueue_t,ArchiveQueue>::removeReferencesAndCommit(Container& cont, OpFailure<InsertedElement>::list& elementsOpFailures) {
std::list<std::string> elementsToRemove;
for (auto & eof: elementsOpFailures) {
elementsToRemove.emplace_back(eof.element->archiveRequest->getAddressIfSet());
}
cont.removeJobsAndCommit(elementsToRemove);
}
template<>
void ContainerTraits<ArchiveQueue_t,ArchiveQueue>::removeReferencesAndCommit(Container& cont, std::list<ElementAddress>& elementAddressList) {
cont.removeJobsAndCommit(elementAddressList);
}
template<>
auto ContainerTraits<ArchiveQueue_t,ArchiveQueue>::getContainerSummary(Container& cont) -> ContainerSummary {
ContainerSummary ret;
ret.JobsSummary::operator=(cont.getJobsSummary());
return ret;
}
#if 0
<<<<<<< HEAD
template<>
auto ContainerTraits<ArchiveQueue_t,ArchiveQueue>::switchElementsOwnership(PoppedElementsBatch & popedElementBatch, const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t, log::LogContext & lc) -> OpFailure<PoppedElement>::list {
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: popedElementBatch.elements) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
auto e = popedElementBatch.elements.begin();
OpFailure<PoppedElement>::list ret;
while (e != popedElementBatch.elements.end()) {
try {
u->get()->wait();
e->archiveFile = u->get()->getArchiveFile();
e->archiveReportURL = u->get()->getArchiveReportURL();
e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL();
} catch (...) {
ret.push_back(OpFailure<PoppedElement>());
ret.back().element = &(*e);
ret.back().failure = std::current_exception();
}
u++;
e++;
}
timingList.insertAndReset("asyncUpdateCompletionTime", t);
return ret;
}
=======
>>>>>>> reportQueues
#endif
template<>
auto ContainerTraits<ArchiveQueue_t,ArchiveQueue>::switchElementsOwnership(InsertedElement::list& elemMemCont, const ContainerAddress& contAddress, const ContainerAddress& previousOwnerAddress, log::TimingList& timingList, utils::Timer & t, log::LogContext& lc) -> OpFailure<InsertedElement>::list {
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: elemMemCont) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress, cta::nullopt));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
auto e = elemMemCont.begin();
OpFailure<InsertedElement>::list ret;
while (e != elemMemCont.end()) {
try {
u->get()->wait();
} catch (...) {
ret.push_back(OpFailure<InsertedElement>());
ret.back().element = &(*e);
ret.back().failure = std::current_exception();
}
u++;
e++;
}
timingList.insertAndReset("asyncUpdateCompletionTime", t);
return ret;
}
template<>
void ContainerTraits<ArchiveQueue_t,ArchiveQueue>::getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock,
const ContainerIdentifier& cId, QueueType queueType, log::LogContext& lc) {
// Try and get access to a queue.
size_t attemptCount = 0;
retry:
objectstore::RootEntry re(cont.m_objectStore);
re.fetchNoLock();
std::string aqAddress;
auto aql = re.dumpArchiveQueues(queueType);
for (auto & aqp : aql) {
if (aqp.tapePool == cId)
aqAddress = aqp.address;
}
if (!aqAddress.size()) throw NoSuchContainer("In ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(): no such archive queue");
// try and lock the archive queue. Any failure from here on means the end of the getting jobs.
cont.setAddress(aqAddress);
//findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter);
try {
if (contLock.isLocked()) contLock.release();
contLock.lock(cont);
cont.fetch();
//lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter);
} catch (cta::exception::Exception & ex) {
// The queue is now absent. We can remove its reference in the root entry.
// A new queue could have been added in the mean time, and be non-empty.
// We will then fail to remove from the RootEntry (non-fatal).
ScopedExclusiveLock rexl(re);
re.fetch();
try {
re.removeArchiveQueueAndCommit(cId, queueType, lc);
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry");
} catch (RootEntry::ArchiveQueueNotEmpty & ex) {
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet())
.add("Message", ex.getMessageValue());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry");
} catch (RootEntry::NoSuchArchiveQueue & ex) {
// Somebody removed the queue in the mean time. Barely worth mentioning.
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::DEBUG, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry: already done.");
}
//emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter);
attemptCount++;
goto retry;
}
}
template<>
auto ContainerTraits<ArchiveQueue_t,ArchiveQueue>::getPoppingElementsCandidates(Container& cont, PopCriteria& unfulfilledCriteria,
......@@ -310,45 +68,9 @@ auto ContainerTraits<ArchiveQueue_t,ArchiveQueue>::getPoppingElementsCandidates(
return ret;
}
#if 0
auto ContainerTraits<ArchiveQueue_t,ArchiveQueueToReport>::getPoppingElementsCandidates(Container& cont, PopCriteria& unfulfilledCriteria,
ElementsToSkipSet& elemtsToSkip, log::LogContext& lc) -> PoppedElementsBatch {
PoppedElementsBatch ret;
auto candidateJobsFromQueue=cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files, elemtsToSkip);
for (auto &cjfq: candidateJobsFromQueue.candidates) {
ret.elements.emplace_back(PoppedElement());
ContainerTraits<ArchiveQueue_t,ArchiveQueue>::PoppedElement & elem = ret.elements.back();
elem.archiveRequest = cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore);
elem.copyNb = cjfq.copyNb;
elem.bytes = cjfq.size;
elem.archiveFile = common::dataStructures::ArchiveFile();
elem.srcURL = "";
elem.archiveReportURL = "";
elem.errorReportURL = "";
elem.latestError = "";
elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::Report;
ret.summary.files++;
}
return ret;
}
template<>
auto ContainerTraits<ArchiveQueue_t,ArchiveQueue>::getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary {
PoppedElementsSummary ret;
ret.bytes = poppedElement.bytes;
ret.files = 1;
return ret;
}
#endif
#if 0
<<<<<<< HEAD
=======
auto ContainerTraits<ArchiveQueue_t,ArchiveQueueToReport>::getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary {
PoppedElementsSummary ret;
ret.files = 1;
return ret;
}
void ContainerTraits<ArchiveQueue_t,ArchiveQueue>::PoppedElementsList::insertBack(PoppedElementsList&& insertedList) {
for (auto &e: insertedList) {
......@@ -366,40 +88,9 @@ auto ContainerTraits<ArchiveQueue_t,ArchiveQueue>::PopCriteria::operator-=(const
return *this;
}
auto ContainerTraits<ArchiveQueue_t,ArchiveQueueToReport>::PopCriteria::operator-=(const PoppedElementsSummary& pes) -> PopCriteria & {
files -= pes.files;
return *this;
}
>>>>>>> reportQueues
#endif
#if 0
template<>
void ContainerTraits<ArchiveQueue_t,ArchiveQueue>::trimContainerIfNeeded(Container& cont, QueueType queueType, ScopedExclusiveLock & contLock,
const ContainerIdentifier & cId, log::LogContext& lc) {
if (cont.isEmpty()) {
// The current implementation is done unlocked.
contLock.release();
try {
// The queue should be removed as it is empty.
RootEntry re(cont.m_objectStore);
ScopedExclusiveLock rexl(re);
re.fetch();
re.removeArchiveQueueAndCommit(cId, queueType, lc);
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::INFO, "In ContainerTraits<ArchiveQueue_t,ArchiveQueue>::trimContainerIfNeeded(): deleted empty queue");
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet())
.add("Message", ex.getMessageValue());
lc.log(log::INFO, "In ContainerTraits<ArchiveQueue_t,ArchiveQueue>::trimContainerIfNeeded(): could not delete a presumably empty queue");
}
//queueRemovalTime += localQueueRemovalTime = t.secs(utils::Timer::resetCounter);
}
}
#endif
}} // namespace cta::objectstore
......@@ -23,13 +23,15 @@
namespace cta { namespace objectstore {
// Partial specialisation of ArchiveQueue traits
template<typename C>
struct ContainerTraitsTypes<ArchiveQueue_t,C>
struct ContainerTraits<ArchiveQueue,C>
{
struct ContainerSummary: public C::JobsSummary {
struct ContainerSummary : public ArchiveQueue::JobsSummary {
void addDeltaToLog(ContainerSummary&, log::ScopedParamContainer&);
};
struct InsertedElement {
ArchiveRequest* archiveRequest;
uint16_t copyNb;
......@@ -63,6 +65,7 @@ struct ContainerTraitsTypes<ArchiveQueue_t,C>
uint64_t bytes = 0;
uint64_t files = 0;
bool operator< (const PopCriteria & pc) {
// This returns false if bytes or files are equal but the other value is less. Is that the intended behaviour?
return bytes < pc.bytes && files < pc.files;
}
PoppedElementsSummary& operator+=(const PoppedElementsSummary &other) {
......@@ -81,18 +84,385 @@ struct ContainerTraitsTypes<ArchiveQueue_t,C>
PoppedElementsSummary summary;
void addToLog(log::ScopedParamContainer&);
};
typedef C Container;
typedef std::string ContainerAddress;
typedef std::string ElementAddress;
typedef std::string ContainerIdentifier;
typedef std::list<std::unique_ptr<InsertedElement>> ElementMemoryContainer;
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
typedef std::set<ElementAddress> ElementsToSkipSet;
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
template<typename Element>
struct OpFailure {
Element *element;
std::exception_ptr failure;
typedef std::list<OpFailure> list;
OpFailure() {}
OpFailure(Element *e, const std::exception_ptr &f) : element(e), failure(f) {}
};
struct OwnershipSwitchFailure: public cta::exception::Exception {
OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {};
typename OpFailure<InsertedElement>::list failedElements;
};
template<typename Element>
static ElementAddress getElementAddress(const Element &e) {
return e.archiveRequest->getAddressIfSet();
}
static ContainerSummary getContainerSummary(Container &cont);
static void trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock,
const ContainerIdentifier &cId, log::LogContext &lc) {
trimContainerIfNeeded(cont, QueueType::JobsToTransfer, contLock, cId, lc);
}
static void getLockedAndFetched(Container &cont, ScopedExclusiveLock &contLock, AgentReference &agRef,
const ContainerIdentifier &cId, QueueType queueType, log::LogContext &lc);
static void getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock,
const ContainerIdentifier &cId, QueueType queueType, log::LogContext &lc);
static void addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemCont,
AgentReference &agentRef, log::LogContext &lc);
static void addReferencesIfNecessaryAndCommit(Container &cont, typename InsertedElement::list &elemMemCont,
AgentReference &agentRef, log::LogContext &lc);
static void removeReferencesAndCommit(Container &cont, typename OpFailure<InsertedElement>::list &elementsOpFailures);
static void removeReferencesAndCommit(Container &cont, std::list<ElementAddress> &elementAddressList);
static typename OpFailure<InsertedElement>::list
switchElementsOwnership(typename InsertedElement::list &elemMemCont, const ContainerAddress &contAddress,
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc);
static typename OpFailure<PoppedElement>::list
switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const ContainerAddress &contAddress,
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc);
static PoppedElementsSummary getElementSummary(const PoppedElement &);
static PoppedElementsBatch getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
ElementsToSkipSet &elemtsToSkip, log::LogContext &lc);
static const std::string c_containerTypeName;
static const std::string c_identifierType;
private:
static void trimContainerIfNeeded(Container &cont, QueueType queueType, ScopedExclusiveLock &contLock,
const ContainerIdentifier &cId, log::LogContext &lc);
};
// ArchiveQueue partial specialisations for ContainerTraits.
//
// Add a full specialisation to override for a specific ArchiveQueue type.
template<typename C>
void ContainerTraits<ArchiveQueue,C>::ContainerSummary::
addDeltaToLog(ContainerSummary &previous, log::ScopedParamContainer &params) {
params.add("queueJobsBefore", previous.jobs)
.add("queueBytesBefore", previous.bytes)
.add("queueJobsAfter", jobs)
.add("queueBytesAfter", bytes);
}
template<typename C>
auto ContainerTraits<ArchiveQueue,C>::PopCriteria::
operator-=(const PoppedElementsSummary &pes) -> PopCriteria & {
bytes -= pes.bytes;
files -= pes.files;