Skip to content
Snippets Groups Projects
Commit 79341106 authored by Michael Davis's avatar Michael Davis
Browse files

[os-generic-queues] Creates ContainerTraitsTypes class

parent b4fee298
No related branches found
No related tags found
No related merge requests found
/*
/**
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
......@@ -24,7 +24,6 @@
*/
#include <string>
#include "ArchiveRequest.hpp"
#include "Helpers.hpp"
#include "common/log/LogContext.hpp"
#include "common/exception/Exception.hpp"
......@@ -32,59 +31,69 @@
namespace cta { namespace objectstore {
/**
* Container traits definition. To be specialized class by class.
* This is mostly a model.
*/
template <class C>
class ContainerTraits{
public:
typedef C Container;
typedef std::string ContainerAddress;
typedef std::string ElementAddress;
typedef std::string ContainerIdentifyer;
static const std::string c_containerTypeName; //= "genericContainer";
static const std::string c_identifyerType; // = "genericId";
class ContainerSummary {
public:
template<typename C>
struct ContainerTraitsTypes
{
struct ContainerSummary {
void addDeltaToLog(const ContainerSummary&, log::ScopedParamContainer&);
};
ContainerSummary getContainerSummary(Container &);
struct InsertedElement {
typedef std::list<InsertedElement> list;
};
struct ElementDescriptor {};
struct PoppedElementsSummary;
struct PopCriteria {
PopCriteria();
PopCriteria& operator-= (const PoppedElementsSummary &);
};
struct PoppedElementsSummary {
bool operator<(const PopCriteria&);
PoppedElementsSummary& operator+=(const PoppedElementsSummary&);
PoppedElementsSummary(const PoppedElementsSummary&);
void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer&);
};
};
/**
* Container traits definition. To be specialized class by class.
*/
template<typename C>
struct ContainerTraits
{
typedef C Container;
typedef std::string ContainerAddress;
typedef std::string ElementAddress;
typedef std::string ContainerIdentifyer;
static const std::string c_containerTypeName; // = "genericContainer"
static const std::string c_identifyerType; // = "genericId"
using ContainerSummary = typename ContainerTraitsTypes<C>::ContainerSummary;
ContainerSummary getContainerSummary(Container&);
using InsertedElement = typename ContainerTraitsTypes<C>::InsertedElement;
typedef std::list<std::unique_ptr<InsertedElement>> ElementMemoryContainer;
typedef std::list <InsertedElement *> ElementPointerContainer;
class ElementDescriptor {};
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
typedef std::list<InsertedElement*> ElementPointerContainer;
using ElementDescriptor = typename ContainerTraitsTypes<C>::ElementDescriptor;
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
template <class Element>
template<typename Element>
struct OpFailure {
Element * element;
std::exception_ptr failure;
typedef std::list<OpFailure> list;
};
class PoppedElementsSummary;
class PopCriteria {
public:
PopCriteria();
PopCriteria& operator-= (const PoppedElementsSummary &);
};
class PoppedElementsList {
public:
using PopCriteria = typename ContainerTraitsTypes<C>::PopCriteria;
using PoppedElementsSummary = typename ContainerTraitsTypes<C>::PoppedElementsSummary;
struct PoppedElementsList {
PoppedElementsList();
void insertBack(PoppedElementsList &&);
};
class PoppedElementsSummary {
public:
bool operator< (const PopCriteria &);
PoppedElementsSummary& operator+= (const PoppedElementsSummary &);
PoppedElementsSummary(const PoppedElementsSummary&);
void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer &);
};
class PoppedElementsBatch {
public:
struct PoppedElementsBatch {
PoppedElementsList elements;
PoppedElementsSummary summary;
void addToLog(log::ScopedParamContainer &);
......@@ -101,7 +110,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
template <class Element>
template<typename Element>
static ElementAddress getElementAddress(const Element & e);
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & contLock, AgentReference & agRef, const ContainerIdentifyer & cId,
......@@ -114,15 +123,14 @@ public:
AgentReference & agentRef, log::LogContext & lc);
void removeReferencesAndCommit(Container & cont, typename OpFailure<InsertedElement>::list & elementsOpFailures);
void removeReferencesAndCommit(Container & cont, std::list<ElementAddress>& elementAddressList);
static ElementPointerContainer switchElementsOwnership(ElementMemoryContainer & elemMemCont, const ContainerAddress & contAddress,
const ContainerAddress & previousOwnerAddress, log::LogContext & lc);
template <class Element>
static ElementPointerContainer switchElementsOwnership(ElementMemoryContainer & elemMemCont, const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::LogContext & lc);
template<typename Element>
static PoppedElementsSummary getElementSummary(const Element &);
static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria,
ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
};
template <class C>
template<typename C>
class ContainerAlgorithms {
public:
ContainerAlgorithms(Backend & backend, AgentReference & agentReference):
......@@ -144,8 +152,7 @@ public:
utils::Timer t;
ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc);
ContainerTraits<C>::addReferencesAndCommit(cont, elements, m_agentReference, lc);
auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(),
prevContId, timingList, t, lc);
auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(), prevContId, timingList, t, lc);
// If ownership switching failed, remove failed object from queue to not leave stale pointers.
if (failedOwnershipSwitchElements.size()) {
ContainerTraits<C>::removeReferencesAndCommit(cont, failedOwnershipSwitchElements);
......@@ -196,8 +203,7 @@ public:
timingList.insertAndReset("queueLockFetchTime", t);
ContainerTraits<C>::addReferencesIfNecessaryAndCommit(cont, elements, m_agentReference, lc);
timingList.insertAndReset("queueProcessAndCommitTime", t);
auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(),
previousOwnerAddress, timingList, t, lc);
auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(), previousOwnerAddress, timingList, t, lc);
timingList.insertAndReset("requestsUpdatingTime", t);
// If ownership switching failed, remove failed object from queue to not leave stale pointers.
if (failedOwnershipSwitchElements.size()) {
......@@ -280,8 +286,7 @@ public:
localTimingList.insertAndReset("ownershipAdditionTime", t);
m_agentReference.addBatchToOwnership(candidateElementsAddresses, m_backend);
// We can now attempt to switch ownership of elements
auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(candidateElements,
m_agentReference.getAgentAddress(), cont.getAddressIfSet(), localTimingList, t, lc);
auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(candidateElements, m_agentReference.getAgentAddress(), cont.getAddressIfSet(), localTimingList, t, lc);
if (failedOwnershipSwitchElements.empty()) {
localTimingList.insertAndReset("updateResultProcessingTime", t);
// This is the easy case (and most common case). Everything went through fine.
......
......@@ -22,15 +22,19 @@
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<ArchiveQueue>::c_containerTypeName = "ArchiveQueue";
template<>
const std::string ContainerTraits<ArchiveQueue>::c_identifyerType = "tapepool";
template<>
void ContainerTraits<ArchiveQueue>::getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef,
const ContainerIdentifyer& contId, log::LogContext& lc) {
Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, QueueType::LiveJobs, lc);
}
template<>
void ContainerTraits<ArchiveQueue>::addReferencesAndCommit(Container& cont, InsertedElement::list& elemMemCont,
AgentReference& agentRef, log::LogContext& lc) {
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
......@@ -46,6 +50,7 @@ void ContainerTraits<ArchiveQueue>::addReferencesAndCommit(Container& cont, Inse
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
template<>
void ContainerTraits<ArchiveQueue>::addReferencesIfNecessaryAndCommit(Container& cont, InsertedElement::list& elemMemCont,
AgentReference& agentRef, log::LogContext& lc) {
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
......@@ -61,6 +66,7 @@ void ContainerTraits<ArchiveQueue>::addReferencesIfNecessaryAndCommit(Container&
cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc);
}
template<>
void ContainerTraits<ArchiveQueue>::removeReferencesAndCommit(Container& cont, OpFailure<InsertedElement>::list& elementsOpFailures) {
std::list<std::string> elementsToRemove;
for (auto & eof: elementsOpFailures) {
......@@ -69,36 +75,20 @@ void ContainerTraits<ArchiveQueue>::removeReferencesAndCommit(Container& cont, O
cont.removeJobsAndCommit(elementsToRemove);
}
template<>
void ContainerTraits<ArchiveQueue>::removeReferencesAndCommit(Container& cont, std::list<ElementAddress>& elementAddressList) {
cont.removeJobsAndCommit(elementAddressList);
}
void ContainerTraits<ArchiveQueue>::PoppedElementsSummary::addDeltaToLog(const PoppedElementsSummary& previous,
log::ScopedParamContainer& params) {
params.add("filesAdded", files - previous.files)
.add("bytesAdded", bytes - previous.bytes)
.add("filesBefore", previous.files)
.add("bytesBefore", previous.bytes)
.add("filesAfter", files)
.add("bytesAfter", bytes);
}
void ContainerTraits<ArchiveQueue>::ContainerSummary::addDeltaToLog(ContainerSummary& previous, log::ScopedParamContainer& params) {
params.add("queueJobsBefore", previous.jobs)
.add("queueBytesBefore", previous.bytes)
.add("queueJobsAfter", jobs)
.add("queueBytesAfter", bytes);
}
template<>
auto ContainerTraits<ArchiveQueue>::getContainerSummary(Container& cont) -> ContainerSummary {
ContainerSummary ret;
ret.JobsSummary::operator=(cont.getJobsSummary());
return ret;
}
auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(InsertedElement::list& elemMemCont, const ContainerAddress& contAddress,
const ContainerAddress& previousOwnerAddress, log::TimingList& timingList, utils::Timer & t, log::LogContext& lc)
-> OpFailure<InsertedElement>::list {
template<>
auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(InsertedElement::list& elemMemCont, const ContainerAddress& contAddress, const ContainerAddress& previousOwnerAddress, log::TimingList& timingList, utils::Timer & t, log::LogContext& lc) -> OpFailure<InsertedElement>::list {
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: elemMemCont) {
ArchiveRequest & ar = *e.archiveRequest;
......@@ -124,6 +114,7 @@ auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(InsertedElement::lis
return ret;
}
template<>
void ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock,
const ContainerIdentifyer& cId, log::LogContext& lc) {
// Try and get access to a queue.
......@@ -177,11 +168,13 @@ void ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(Container& cont,
}
}
template<>
void ContainerTraits<ArchiveQueue>::PoppedElementsBatch::addToLog(log::ScopedParamContainer& params) {
params.add("bytes", summary.bytes)
.add("files", summary.files);
}
template<>
auto ContainerTraits<ArchiveQueue>::getPoppingElementsCandidates(Container& cont, PopCriteria& unfulfilledCriteria,
ElementsToSkipSet& elemtsToSkip, log::LogContext& lc) -> PoppedElementsBatch {
PoppedElementsBatch ret;
......@@ -195,6 +188,7 @@ auto ContainerTraits<ArchiveQueue>::getPoppingElementsCandidates(Container& cont
return ret;
}
template<>
auto ContainerTraits<ArchiveQueue>::getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary {
PoppedElementsSummary ret;
ret.bytes = poppedElement.bytes;
......@@ -202,26 +196,27 @@ auto ContainerTraits<ArchiveQueue>::getElementSummary(const PoppedElement& poppe
return ret;
}
template<>
void ContainerTraits<ArchiveQueue>::PoppedElementsList::insertBack(PoppedElementsList&& insertedList) {
for (auto &e: insertedList) {
std::list<PoppedElement>::emplace_back(std::move(e));
}
}
template<>
void ContainerTraits<ArchiveQueue>::PoppedElementsList::insertBack(PoppedElement&& e) {
std::list<PoppedElement>::emplace_back(std::move(e));
}
template<>
auto ContainerTraits<ArchiveQueue>::PopCriteria::operator-=(const PoppedElementsSummary& pes) -> PopCriteria & {
bytes -= pes.bytes;
files -= pes.files;
return *this;
}
auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(PoppedElementsBatch & popedElementBatch,
const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t,
log::LogContext & lc)
-> OpFailure<PoppedElement>::list {
template<>
auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(PoppedElementsBatch & popedElementBatch, const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t, log::LogContext & lc) -> OpFailure<PoppedElement>::list {
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: popedElementBatch.elements) {
ArchiveRequest & ar = *e.archiveRequest;
......@@ -251,6 +246,7 @@ auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(PoppedElementsBatch
return ret;
}
template<>
void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId,
log::LogContext& lc) {
if (cont.isEmpty()) {
......@@ -277,7 +273,4 @@ void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, Scope
}
}
}} // namespace cta::objectstore
......@@ -22,6 +22,68 @@
namespace cta { namespace objectstore {
template<>
struct ContainerTraitsTypes<ArchiveQueue>
{
class ContainerSummary: public ArchiveQueue::JobsSummary {
public:
void addDeltaToLog(ContainerSummary&, log::ScopedParamContainer&);
};
struct InsertedElement {
std::shared_ptr<ArchiveRequest> archiveRequest;
uint16_t copyNb;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::MountPolicy mountPolicy;
typedef std::list<InsertedElement> list;
};
typedef ArchiveRequest::JobDump ElementDescriptor;
struct PoppedElementsSummary;
struct PopCriteria {
PopCriteria& operator-=(const PoppedElementsSummary&);
uint64_t bytes = 0;
uint64_t files = 0;
};
struct PoppedElementsSummary {
uint64_t bytes = 0;
uint64_t files = 0;
bool operator< (const PopCriteria & pc) {
return bytes < pc.bytes && files < pc.files;
}
PoppedElementsSummary& operator+=(const PoppedElementsSummary &other) {
bytes += other.bytes;
files += other.files;
return *this;
}
void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer&);
};
};
void ContainerTraitsTypes<ArchiveQueue>::PoppedElementsSummary::
addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer &params)
{
params.add("filesAdded", files - previous.files)
.add("bytesAdded", bytes - previous.bytes)
.add("filesBefore", previous.files)
.add("bytesBefore", previous.bytes)
.add("filesAfter", files)
.add("bytesAfter", bytes);
}
void ContainerTraitsTypes<ArchiveQueue>::ContainerSummary::
addDeltaToLog(ContainerSummary& previous, log::ScopedParamContainer& params)
{
params.add("queueJobsBefore", previous.jobs)
.add("queueBytesBefore", previous.bytes)
.add("queueJobsAfter", jobs)
.add("queueBytesAfter", bytes);
}
#if 0
template <>
class ContainerTraits<ArchiveQueue> {
public:
......@@ -31,18 +93,6 @@ public:
typedef std::string ContainerIdentifyer;
static const std::string c_containerTypeName; //= "ArchiveQueue";
static const std::string c_identifyerType; // = "tapepool";
struct InsertedElement {
std::shared_ptr<ArchiveRequest> archiveRequest;
uint16_t copyNb;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::MountPolicy mountPolicy;
typedef std::list<InsertedElement> list;
};
class ContainerSummary: public ArchiveQueue::JobsSummary {
public:
void addDeltaToLog(ContainerSummary &, log::ScopedParamContainer &);
};
static ContainerSummary getContainerSummary(Container &cont);
......@@ -53,7 +103,6 @@ public:
typedef std::list<OpFailure> list;
};
typedef ArchiveRequest::JobDump ElementDescriptor;
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
template <class Element>
......@@ -75,9 +124,7 @@ public:
static void removeReferencesAndCommit(Container & cont, std::list<ElementAddress>& elementAddressList);
static OpFailure<InsertedElement>::list switchElementsOwnership(InsertedElement::list & elemMemCont,
const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t,
log::LogContext & lc);
static OpFailure<InsertedElement>::list switchElementsOwnership(InsertedElement::list & elemMemCont, const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t, log::LogContext & lc);
class OwnershipSwitchFailure: public cta::exception::Exception {
public:
......@@ -95,33 +142,12 @@ public:
std::string errorReportURL;
std::string srcURL;
};
class PoppedElementsSummary;
class PopCriteria {
public:
PopCriteria& operator-= (const PoppedElementsSummary &);
uint64_t bytes = 0;
uint64_t files = 0;
};
class PoppedElementsList: public std::list<PoppedElement> {
public:
void insertBack(PoppedElementsList &&);
void insertBack(PoppedElement &&);
};
class PoppedElementsSummary {
public:
uint64_t bytes = 0;
uint64_t files = 0;
bool operator< (const PopCriteria & pc) {
return bytes < pc.bytes && files < pc.files;
}
PoppedElementsSummary& operator+= (const PoppedElementsSummary & other) {
bytes += other.bytes;
files += other.files;
return *this;
}
void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer &);
};
class PoppedElementsBatch {
public:
PoppedElementsList elements;
......@@ -137,12 +163,10 @@ public:
ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
static OpFailure<PoppedElement>::list switchElementsOwnership(PoppedElementsBatch & popedElementBatch,
const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t,
log::LogContext & lc);
static OpFailure<PoppedElement>::list switchElementsOwnership(PoppedElementsBatch & popedElementBatch, const ContainerAddress & contAddress, const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t, log::LogContext & lc);
static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc);
};
#endif
}} // namespace cta::objectstore
\ No newline at end of file
}} // namespace cta::objectstore
......@@ -34,8 +34,6 @@
namespace cta { namespace objectstore {
class ArchiveQueue;
class RetrieveQueue;
class ScopedExclusiveLock;
class AgentReference;
class DriveState;
......@@ -124,4 +122,4 @@ public:
static std::list<cta::common::dataStructures::DriveState> getAllDriveStates(Backend & backend, log::LogContext & lc);
};
}} // namespace cta::objectstore
\ No newline at end of file
}} // namespace cta::objectstore
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment