Commit 0cd67052 authored by Eric Cano's avatar Eric Cano
Browse files

Moved archive requeues garbage collection to generic algorithms.

parent ab9c0b39
......@@ -59,61 +59,11 @@ public:
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
/* class ScopedIntent {
public:
ScopedIntent(Agent & agent, std::string container, std::string name, serializers::ObjectType objectType):
m_agent(agent), m_container(container), m_name(name), m_objectType(objectType), m_present(false) {
m_agent.addToOwnership(m_name);
m_present = true;
}
void removeFromIntent() {
if(!m_present) return;
m_agent.removeFromOwnership(m_name);
m_present = false;
}
~ScopedIntent() {
try {
removeFromIntent();
} catch (std::exception &) {
} catch (...) {throw;}
}
private:
Agent & m_agent;
std::string m_container;
std::string m_name;
serializers::ObjectType m_objectType;
bool m_present;
};*/
/*class ScopedOwnership {
public:
ScopedOwnership(Agent & agent, std::string name):
m_agent(agent), m_name(name), m_present(false) {
m_agent.addToOwnership(m_name);
m_present = true;
}
void removeFromOwnership() {
if(!m_present) return;
m_agent.removeFromOwnership( m_name);
m_present = false;
}
~ScopedOwnership() {
try {
removeFromOwnership();
} catch (std::exception &) {
} catch (...) {throw;}
}
private:
Agent & m_agent;
std::string m_name;
bool m_present;
};*/
private:
void addToOwnership(std::string name);
void removeFromOwnership(std::string name);
public:
std::list<std::string> getOwnershipList();
......
......@@ -91,6 +91,12 @@ public:
};
typedef std::set<ElementAddress> ElementsToSkipSet;
class OwnershipSwitchFailure: public cta::exception::Exception {
public:
OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {};
typename OpFailure<InsertedElement>::list failedElements;
};
static void trimContainerIfNeeded(Container & cont);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
......@@ -102,7 +108,10 @@ public:
log::LogContext & lc);
static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId,
log::LogContext & lc);
static void addReferencesAndCommit(Container & cont, ElementMemoryContainer & elemMemCont);
static void addReferencesAndCommit(Container & cont, typename InsertedElement::list & elemMemCont,
AgentReference & agentRef, log::LogContext & lc);
static void addReferencesIfNecessaryAndCommit(Container & cont, typename InsertedElement::list & elemMemCont,
AgentReference & agentRef, log::LogContext & lc);
void removeReferencesAndCommit(Container & cont, typename OpFailure<InsertedElement>::list & elementsOpFailures);
void removeReferencesAndCommit(Container & cont, std::list<ElementAddress>& elementAddressList);
static ElementPointerContainer switchElementsOwnership(ElementMemoryContainer & elemMemCont, const ContainerAddress & contAddress,
......@@ -121,12 +130,13 @@ public:
typedef typename ContainerTraits<C>::InsertedElement InsertedElement;
typedef typename ContainerTraits<C>::PopCriteria PopCriteria;
typedef typename ContainerTraits<C>::OwnershipSwitchFailure OwnershipSwitchFailure;
/** Reference objects in the container and then switch their ownership them. Objects
* are provided existing and owned by algorithm's agent. Returns a list of
* @returns list of elements for which the addition or ownership switch failed.
* @throws */
/** Reference objects in the container and then switch their ownership. Objects
* are provided existing and owned by algorithm's agent.
*/
void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
const typename ContainerTraits<C>::ContainerIdentifyer & prevContId,
typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) {
C cont(m_backend);
ScopedExclusiveLock contLock;
......@@ -135,7 +145,7 @@ public:
ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc);
ContainerTraits<C>::addReferencesAndCommit(cont, elements, m_agentReference, lc);
auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(),
m_agentReference.getAgentAddress(), timingList, t, lc);
prevContId, timingList, t, lc);
// If ownership switching failed, remove failed object from queue to not leave stale pointers.
if (failedOwnershipSwitchElements.size()) {
ContainerTraits<C>::removeReferencesAndCommit(cont, failedOwnershipSwitchElements);
......@@ -167,6 +177,69 @@ public:
}
}
/** Reference objects in the container if needed and then switch their ownership (if needed). Objects
* are expected to be owned by agent, and not listed in the container but situations might vary.
* This function is typically used by the garbage collector. We do noe take care of dereferencing
* the object from the caller.
*/
void referenceAndSwitchOwnershipIfNecessary(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
typename ContainerTraits<C>::ContainerAddress & previousOwnerAddress,
typename ContainerTraits<C>::ContainerAddress & contAddress,
typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) {
C cont(m_backend);
ScopedExclusiveLock contLock;
log::TimingList timingList;
utils::Timer t;
ContainerTraits<C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc);
contAddress = cont.getAddressIfSet();
auto contSummaryBefore = ContainerTraits<C>::getContainerSummary(cont);
timingList.insertAndReset("queueLockFetchTime", t);
ContainerTraits<C>::addReferencesIfNecessaryAndCommit(cont, elements, m_agentReference, lc);
timingList.insertAndReset("queueProcessAndCommitTime", t);
auto failedOwnershipSwitchElements = ContainerTraits<C>::switchElementsOwnership(elements, cont.getAddressIfSet(),
previousOwnerAddress, timingList, t, lc);
timingList.insertAndReset("requestsUpdatingTime", t);
// If ownership switching failed, remove failed object from queue to not leave stale pointers.
if (failedOwnershipSwitchElements.size()) {
ContainerTraits<C>::removeReferencesAndCommit(cont, failedOwnershipSwitchElements);
timingList.insertAndReset("queueRecommitTime", t);
}
// We are now done with the container.
auto contSummaryAfter = ContainerTraits<C>::getContainerSummary(cont);
contLock.release();
timingList.insertAndReset("queueUnlockTime", t);
log::ScopedParamContainer params(lc);
params.add("C", ContainerTraits<C>::c_containerTypeName)
.add(ContainerTraits<C>::c_identifyerType, contId)
.add("containerAddress", cont.getAddressIfSet());
contSummaryAfter.addDeltaToLog(contSummaryBefore, params);
timingList.addToLog(params);
if (failedOwnershipSwitchElements.empty()) {
// That's it, we're done.
lc.log(log::INFO, "In ContainerAlgorithms::referenceAndSwitchOwnershipIfNecessary(): Requeued a batch of elements.");
return;
} else {
// Bad case: just return the failure set to the caller.
typename ContainerTraits<C>::OwnershipSwitchFailure failureEx(
"In ContainerAlgorithms<>::referenceAndSwitchOwnershipIfNecessar(): failed to switch ownership of some elements");
failureEx.failedElements = failedOwnershipSwitchElements;
params.add("errorCount", failedOwnershipSwitchElements.size());
lc.log(log::WARNING, "In ContainerAlgorithms::referenceAndSwitchOwnershipIfNecessary(): "
"Encountered problems while requeuing a batch of elements");
throw failureEx;
}
}
/**
* Addition of jobs to container. Convenience overload for cases when current agent is the previous owner
* (most cases except garbage collection).
*/
void referenceAndSwitchOwnership(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
typename ContainerTraits<C>::InsertedElement::list & elements, log::LogContext & lc) {
referenceAndSwitchOwnership(contId, m_agentReference.getAgentAddress(), elements, lc);
}
typename ContainerTraits<C>::PoppedElementsBatch popNextBatch(const typename ContainerTraits<C>::ContainerIdentifyer & contId,
typename ContainerTraits<C>::PopCriteria & popCriteria, log::LogContext & lc) {
// Prepare the return value
......
......@@ -46,6 +46,21 @@ void ContainerTraits<ArchiveQueue>::addReferencesAndCommit(Container& cont, Inse
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
void ContainerTraits<ArchiveQueue>::addReferencesIfNecessaryAndCommit(Container& cont, InsertedElement::list& elemMemCont,
AgentReference& agentRef, log::LogContext& lc) {
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
for (auto & e: elemMemCont) {
ElementDescriptor jd;
jd.copyNb = e.copyNb;
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
e.mountPolicy, time(nullptr)});
}
cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc);
}
void ContainerTraits<ArchiveQueue>::removeReferencesAndCommit(Container& cont, OpFailure<InsertedElement>::list& elementsOpFailures) {
std::list<std::string> elementsToRemove;
for (auto & eof: elementsOpFailures) {
......
......@@ -32,7 +32,7 @@ public:
static const std::string c_containerTypeName; //= "ArchiveQueue";
static const std::string c_identifyerType; // = "tapepool";
struct InsertedElement {
std::unique_ptr<ArchiveRequest> archiveRequest;
std::shared_ptr<ArchiveRequest> archiveRequest;
uint16_t copyNb;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::MountPolicy mountPolicy;
......@@ -68,6 +68,9 @@ public:
static void addReferencesAndCommit(Container & cont, InsertedElement::list & elemMemCont,
AgentReference & agentRef, log::LogContext & lc);
static void addReferencesIfNecessaryAndCommit(Container & cont, InsertedElement::list & elemMemCont,
AgentReference & agentRef, log::LogContext & lc);
static void removeReferencesAndCommit(Container & cont, OpFailure<InsertedElement>::list & elementsOpFailures);
static void removeReferencesAndCommit(Container & cont, std::list<ElementAddress>& elementAddressList);
......
......@@ -474,10 +474,13 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16
auto *jl=payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
if (j->copynb() == copyNumber) {
if (j->owner() != previousOwner) {
throw Backend::WrongPreviousOwner("In ArchiveRequest::asyncUpdateJobOwner()::lambda(): Job not owned.");
// The owner might already be the right one (in garbage collection cases), in which case, it's job done.
if (j->owner() != owner) {
if (j->owner() != previousOwner) {
throw Backend::WrongPreviousOwner("In ArchiveRequest::asyncUpdateJobOwner()::lambda(): Job not owned.");
}
j->set_owner(owner);
}
j->set_owner(owner);
// We also need to gather all the job content for the user to get in-memory
// representation.
// TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequest.
......
This diff is collapsed.
......@@ -58,15 +58,18 @@ public:
std::map<std::string, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests;
std::list<std::shared_ptr<GenericObject>> otherObjects;
/// Fill up the fetchedObjects with objects of interest.
void fetchOwnedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, log::LogContext & lc);
void fetchOwnedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore,
log::LogContext & lc);
/// Fill up the sorter with the fetched objects
void sortFetchedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc);
void sortFetchedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore,
cta::catalogue::Catalogue & catalogue, log::LogContext & lc);
/// Lock, fetch and update archive jobs
void lockFetchAndUpdateArchiveJobs(Agent & agent, AgentReference & agentReference, Backend & objectStore, log::LogContext & lc);
/// Lock, fetch and update retrieve jobs
void lockFetchAndUpdateRetrieveJobs(Agent & agent, AgentReference & agentReference, Backend & objectStore, log::LogContext & lc);
// Lock, fetch and update other objects
void lockFetchAndUpdateOtherObjects(Agent & agent, AgentReference & agentReference, Backend & objectStore, cta::catalogue::Catalogue & catalogue, log::LogContext & lc);
void lockFetchAndUpdateOtherObjects(Agent & agent, AgentReference & agentReference, Backend & objectStore,
cta::catalogue::Catalogue & catalogue, log::LogContext & lc);
};
private:
Backend & m_objectStore;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment