Commit 721c5a54 authored by Eric Cano's avatar Eric Cano
Browse files

Improved ArchiveRequest garbage collection so it uses Helpers::getLockedAndFetchedArchiveQueue().

This required to pass an AgentReference through to all garbageCollect() member functions of all objects.
The changed behavior required adaptation of the corresponding unit test (the archive queues now need to
be properly referenced in the root entry).
parent 1eb1e6d9
......@@ -107,7 +107,7 @@ bool cta::objectstore::Agent::isEmpty() {
return true;
}
void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner) {
void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) {
checkPayloadWritable();
// We are here limited to checking the presumed owner and mark the agent as
// untracked in the agent register in case of match, else we do nothing
......
......@@ -56,7 +56,7 @@ public:
bool isEmpty();
void garbageCollect(const std::string &presumedOwner) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
/* class ScopedIntent {
public:
......
......@@ -41,7 +41,7 @@ void cta::objectstore::AgentRegister::initialize() {
m_payloadInterpreted = true;
}
void cta::objectstore::AgentRegister::garbageCollect(const std::string &presumedOwner) {
void cta::objectstore::AgentRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) {
checkPayloadWritable();
if (!isEmpty()) {
throw (NotEmpty("Trying to garbage collect a non-empty AgentRegister: internal error"));
......
......@@ -34,7 +34,7 @@ public:
AgentRegister(const std::string & name, Backend & os);
void initialize();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void garbageCollect(const std::string &presumedOwner) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
bool isEmpty();
void addAgent (std::string name);
void removeAgent (const std::string & name);
......
......@@ -73,7 +73,7 @@ bool ArchiveQueue::isEmpty() {
return true;
}
void ArchiveQueue::garbageCollect(const std::string &presumedOwner) {
void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) {
checkPayloadWritable();
// If the agent is not anymore the owner of the object, then only the very
// last operation of the tape pool creation failed. We have nothing to do.
......
......@@ -97,7 +97,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
// Garbage collection
void garbageCollect(const std::string &presumedOwner) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
std::string dump();
};
......
......@@ -19,6 +19,7 @@
#include "ArchiveRequest.hpp"
#include "GenericObject.hpp"
#include "ArchiveQueue.hpp"
#include "Helpers.hpp"
#include "common/dataStructures/EntryLog.hpp"
#include "MountPolicySerDeser.hpp"
......@@ -282,7 +283,7 @@ auto ArchiveRequest::dumpJobs() -> std::list<ArchiveRequest::JobDump> {
return ret;
}
void ArchiveRequest::garbageCollect(const std::string &presumedOwner) {
void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) {
checkPayloadWritable();
// The behavior here depends on which job the agent is supposed to own.
// We should first find this job (if any). This is for covering the case
......@@ -301,9 +302,11 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner) {
// If we fail to reconnect, we have to fail the job and potentially
// finish the request.
try {
ArchiveQueue aq(j->owner(), m_objectStore);
ScopedExclusiveLock tpl(aq);
aq.fetch();
// Get the queue where we should requeue the job. The queue might need to be
// recreated (this will be done by helper).
ArchiveQueue aq(m_objectStore);
ScopedExclusiveLock tpl;
Helpers::getLockedAndFetchedArchiveQueue(aq, tpl, agentReference, j->tapepool());
ArchiveRequest::JobDump jd;
jd.copyNb = j->copynb();
jd.tapePool = j->tapepool();
......
......@@ -112,7 +112,7 @@ public:
};
std::list<JobDump> dumpJobs();
void garbageCollect(const std::string &presumedOwner) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
std::string dump();
};
......
......@@ -70,7 +70,7 @@ void DriveRegister::initialize() {
//------------------------------------------------------------------------------
// DriveRegister::garbageCollect())
//------------------------------------------------------------------------------
void DriveRegister::garbageCollect(const std::string &presumedOwner) {
void DriveRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) {
checkPayloadWritable();
// If the agent is not anymore the owner of the object, then only the very
// last operation of the drive register creation failed. We have nothing to do.
......
......@@ -38,7 +38,7 @@ public:
DriveRegister(GenericObject & go);
void initialize();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void garbageCollect(const std::string &presumedOwner) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
bool isEmpty();
// Drives management =========================================================
......
......@@ -17,14 +17,15 @@
*/
#include "GarbageCollector.hpp"
#include "AgentReference.hpp"
#include "RootEntry.hpp"
#include <algorithm>
namespace cta { namespace objectstore {
const size_t GarbageCollector::c_maxWatchedAgentsPerGC = 5;
GarbageCollector::GarbageCollector(Backend & os, Agent & agent):
m_objectStore(os), m_ourAgent(agent), m_agentRegister(os) {
GarbageCollector::GarbageCollector(Backend & os, AgentReference & agentReference):
m_objectStore(os), m_ourAgentReference(agentReference), m_agentRegister(os) {
RootEntry re(m_objectStore);
ScopedSharedLock reLock(re);
re.fetch();
......@@ -49,11 +50,12 @@ void GarbageCollector::trimGoneTargets(log::LogContext & lc) {
= m_watchedAgents.begin();
wa != m_watchedAgents.end();) {
if (agentList.end() == std::find(agentList.begin(), agentList.end(), wa->first)) {
ScopedExclusiveLock oaLock(m_ourAgent);
m_ourAgent.fetch();
m_ourAgent.removeFromOwnership(wa->first);
Agent ourAgent(m_ourAgentReference.getAgentAddress(), m_objectStore);
ScopedExclusiveLock oaLock(ourAgent);
ourAgent.fetch();
ourAgent.removeFromOwnership(wa->first);
std::string removedAgent = wa->first;
m_ourAgent.commit();
ourAgent.commit();
oaLock.release();
delete wa->second;
m_watchedAgents.erase(wa++);
......@@ -77,7 +79,7 @@ void GarbageCollector::aquireTargets(log::LogContext & lc) {
for (;m_watchedAgents.size() < c_maxWatchedAgentsPerGC
&& c!=candidatesList.end(); c++) {
// We don't monitor ourselves
if (*c != m_ourAgent.getAddressIfSet()) {
if (*c != m_ourAgentReference.getAgentAddress()) {
// So we have a candidate we might want to monitor
// First, check that the agent entry exists, and that ownership
// is indeed pointing to the agent register
......@@ -100,16 +102,17 @@ void GarbageCollector::aquireTargets(log::LogContext & lc) {
// We are now interested in tracking this agent. So we will transfer its
// ownership. We alredy have an exclusive lock on the agent.
// Lock ours
ScopedExclusiveLock oaLock(m_ourAgent);
m_ourAgent.fetch();
m_ourAgent.addToOwnership(ag.getAddressIfSet());
m_ourAgent.commit();
Agent ourAgent(m_ourAgentReference.getAgentAddress(), m_objectStore);
ScopedExclusiveLock oaLock(ourAgent);
ourAgent.fetch();
ourAgent.addToOwnership(ag.getAddressIfSet());
ourAgent.commit();
// We now have a pointer to the agent, we can make the ownership official
ag.setOwner(m_ourAgent.getAddressIfSet());
ag.setOwner(ourAgent.getAddressIfSet());
ag.commit();
log::ScopedParamContainer params(lc);
params.add("agentAddress", ag.getAddressIfSet())
.add("gcAgentAddress", m_ourAgent.getAddressIfSet());
.add("gcAgentAddress", ourAgent.getAddressIfSet());
lc.log(log::INFO, "In GarbageCollector::aquireTargets(): started tracking an untracked agent");
// Agent is officially our, we can remove it from the untracked agent's
// list
......@@ -138,10 +141,11 @@ void GarbageCollector::checkHeartbeats(log::LogContext & lc) {
// Get the heartbeat. Clean dead agents and remove references to them
if (!wa->second->checkAlive()) {
cleanupDeadAgent(wa->first, lc);
ScopedExclusiveLock oaLock(m_ourAgent);
m_ourAgent.fetch();
m_ourAgent.removeFromOwnership(wa->first);
m_ourAgent.commit();
Agent ourAgent(m_ourAgentReference.getAgentAddress(), m_objectStore);
ScopedExclusiveLock oaLock(ourAgent);
ourAgent.fetch();
ourAgent.removeFromOwnership(wa->first);
ourAgent.commit();
delete wa->second;
m_watchedAgents.erase(wa++);
} else {
......@@ -157,8 +161,8 @@ void GarbageCollector::checkHeartbeats(log::LogContext & lc) {
agent.fetch();
log::ScopedParamContainer params(lc);
params.add("agentAddress", agent.getAddressIfSet())
.add("gcAgentAddress", m_ourAgent.getAddressIfSet());
if (agent.getOwner() != m_ourAgent.getAddressIfSet()) {
.add("gcAgentAddress", m_ourAgentReference.getAgentAddress());
if (agent.getOwner() != m_ourAgentReference.getAgentAddress()) {
log::ScopedParamContainer params(lc);
lc.log(log::WARNING, "In GarbageCollector::cleanupDeadAgent(): skipping agent which is not owned by this garbage collector as thought.");
// The agent is removed from our ownership by the calling function: we're done.
......@@ -178,7 +182,7 @@ void GarbageCollector::checkHeartbeats(log::LogContext & lc) {
go.fetch();
// Call GenericOpbject's garbage collect method, which in turn will
// delegate to the object type's garbage collector.
go.garbageCollectDispatcher(goLock, address);
go.garbageCollectDispatcher(goLock, address, m_ourAgentReference);
lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): garbage collected owned object.");
} else {
lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): skipping garbage collection of now gone object.");
......
......@@ -36,7 +36,7 @@ namespace cta { namespace objectstore {
class GarbageCollector {
public:
GarbageCollector(Backend & os, Agent & agent);
GarbageCollector(Backend & os, AgentReference & agentReference);
void runOnePass(log::LogContext & lc);
......@@ -51,7 +51,7 @@ public:
void reinjectOwnedObject(log::LogContext & lc);
private:
Backend & m_objectStore;
Agent & m_ourAgent;
AgentReference & m_ourAgentReference;
AgentRegister m_agentRegister;
std::map<std::string, AgentWatchdog * > m_watchedAgents;
static const size_t c_maxWatchedAgentsPerGC;
......
......@@ -21,6 +21,10 @@
#include "common/exception/Exception.hpp"
#include "common/dataStructures/ArchiveFile.hpp"
#include "common/log/DummyLogger.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
#endif
#include "GarbageCollector.hpp"
#include "RootEntry.hpp"
#include "Agent.hpp"
......@@ -70,7 +74,7 @@ TEST(ObjectStore, GarbageCollectorBasicFuctionnality) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgent);
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......@@ -129,7 +133,7 @@ TEST(ObjectStore, GarbageCollectorRegister) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgent);
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......@@ -190,7 +194,7 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgent);
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......@@ -251,7 +255,7 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgent);
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......@@ -269,13 +273,14 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) {
TEST(ObjectStore, GarbageCollectorArchiveRequest) {
// We will need a log object
#ifdef STDOUT_LOGGING
cta::log::StdoutLogger dl("unitTest");
#else
cta::log::DummyLogger dl("unitTest");
#endif
cta::log::LogContext lc(dl);
// Here we check that can successfully call ArchiveRequests's garbage collector
cta::objectstore::BackendVFS be;
cta::objectstore::AgentReference agentRef("unitTestGarbageCollector");
agentRef.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
// Create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
......@@ -284,9 +289,18 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::objectstore::ScopedExclusiveLock rel(re);
// Create the agent for objects creation
cta::objectstore::AgentReference agentRef("unitTestCreateEnv");
// Finish root creation.
re.addOrGetAgentRegisterPointerAndCommit(agentRef, el);
rel.release();
// Create an agent
// continue agent creation.
agentRef.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agent(agentRef.getAgentAddress(), be);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf();
// Create an agent to garbage collected
cta::objectstore::AgentReference agrA("unitTestAgentA");
agrA.setQueueFlushTimeout(std::chrono::milliseconds(0));
cta::objectstore::Agent agA(agrA.getAgentAddress(), be);
......@@ -299,19 +313,18 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
// - partially linked to tape pools
// - linked to all tape pools
// - In the 2 latter cases, the job could have been picked up for processing
// - already
//w
// Create 2 tape pools (not owned).
//
// Create 2 archive queues
std::string tpAddr[2];
for (int i=0; i<2; i++)
{
std::stringstream aqid;
aqid << "ArchiveQueue" << i;
tpAddr[i] = agentRef.nextId(aqid.str());
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
std::stringstream tapePoolName;
tapePoolName << "TapePool" << i;
tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef);
cta::objectstore::ArchiveQueue aq(tpAddr[i], be);
aq.initialize(aqid.str());
aq.setOwner("");
aq.insert();
}
// Create the various ATFR's, stopping one step further each time.
int pass=0;
......@@ -337,8 +350,8 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
aFile.fileSize = 667;
aFile.storageClass = "sc";
ar.setArchiveFile(aFile);
ar.addJob(1, "ArchiveQueue0", tpAddr[0], 1, 1);
ar.addJob(2, "ArchiveQueue1", tpAddr[1], 1, 1);
ar.addJob(1, "TapePool0", tpAddr[0], 1, 1);
ar.addJob(2, "TapePool1", tpAddr[1], 1, 1);
ar.setOwner(agA.getAddressIfSet());
cta::common::dataStructures::MountPolicy mp;
ar.setMountPolicy(mp);
......@@ -407,7 +420,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgent);
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......@@ -433,6 +446,22 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
// We should not be able to remove the agent register (as it should be empty)
rel.lock(re);
re.fetch();
// Remove jobs from archive queues
std::list<std::string> tapePools = { "TapePool0", "TapePool1" };
for (auto & tp: tapePools) {
// Empty queue
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp), be);
cta::objectstore::ScopedExclusiveLock aql(aq);
aq.fetch();
for (auto &j: aq.dumpJobs()) {
aq.removeJob(j.address);
}
aq.commit();
aql.release();
// Remove queues from root
re.removeArchiveQueueAndCommit(tp);
}
ASSERT_NO_THROW(re.removeAgentRegisterAndCommit());
ASSERT_NO_THROW(re.removeIfEmpty());
// TODO: this unit test still leaks tape pools and requests
......
......@@ -77,47 +77,47 @@ namespace {
using cta::objectstore::ScopedExclusiveLock;
template <class C>
void garbageCollectWithType(GenericObject * gop, ScopedExclusiveLock& lock,
const std::string &presumedOwner) {
const std::string &presumedOwner, AgentReference & agentReference) {
C typedObject(*gop);
lock.transfer(typedObject);
typedObject.garbageCollect(presumedOwner);
typedObject.garbageCollect(presumedOwner, agentReference);
// Release the lock now as if we let the caller do, it will point
// to the then-removed typedObject.
lock.release();
}
}
void GenericObject::garbageCollect(const std::string& presumedOwner) {
void GenericObject::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) {
throw ForbiddenOperation("In GenericObject::garbageCollect(): GenericObject cannot be garbage collected");
}
void GenericObject::garbageCollectDispatcher(ScopedExclusiveLock& lock,
const std::string &presumedOwner) {
const std::string &presumedOwner, AgentReference & agentReference) {
checkHeaderWritable();
switch(m_header.type()) {
case serializers::AgentRegister_t:
garbageCollectWithType<AgentRegister>(this, lock, presumedOwner);
garbageCollectWithType<AgentRegister>(this, lock, presumedOwner, agentReference);
break;
case serializers::Agent_t:
garbageCollectWithType<Agent>(this, lock, presumedOwner);
garbageCollectWithType<Agent>(this, lock, presumedOwner, agentReference);
break;
case serializers::DriveRegister_t:
garbageCollectWithType<DriveRegister>(this, lock, presumedOwner);
garbageCollectWithType<DriveRegister>(this, lock, presumedOwner, agentReference);
break;
case serializers::SchedulerGlobalLock_t:
garbageCollectWithType<SchedulerGlobalLock>(this, lock, presumedOwner);
garbageCollectWithType<SchedulerGlobalLock>(this, lock, presumedOwner, agentReference);
break;
case serializers::ArchiveRequest_t:
garbageCollectWithType<ArchiveRequest>(this, lock, presumedOwner);
garbageCollectWithType<ArchiveRequest>(this, lock, presumedOwner, agentReference);
break;
case serializers::RetrieveRequest_t:
garbageCollectWithType<RetrieveRequest>(this, lock, presumedOwner);
garbageCollectWithType<RetrieveRequest>(this, lock, presumedOwner, agentReference);
break;
case serializers::ArchiveQueue_t:
garbageCollectWithType<ArchiveQueue>(this, lock, presumedOwner);
garbageCollectWithType<ArchiveQueue>(this, lock, presumedOwner, agentReference);
break;
case serializers::RetrieveQueue_t:
garbageCollectWithType<RetrieveQueue>(this, lock, presumedOwner);
garbageCollectWithType<RetrieveQueue>(this, lock, presumedOwner, agentReference);
break;
default: {
std::stringstream err;
......
......@@ -51,7 +51,7 @@ public:
/** Overload of ObjectOps's implementation: this operation is forbidden. Generic
* Object cannot be garbage collected as-is */
void garbageCollect(const std::string& presumedOwner) override;
void garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) override;
/** This dispatcher function will call the object's garbage collection function.
* It also handles the passed lock and returns is unlocked.
......@@ -60,8 +60,9 @@ public:
*
* @param lock reference to the generic object's lock
* @param presumedOwner address of the agent which pointed to the object
* @param agent reference object allowing creation of new objects when needed (at least for requeuing of requests)
*/
void garbageCollectDispatcher(ScopedExclusiveLock & lock, const std::string &presumedOwner);
void garbageCollectDispatcher(ScopedExclusiveLock & lock, const std::string &presumedOwner, AgentReference & agentReference);
/** This dispatcher function will call the object's dump.
* It also handles the passed lock.
......
......@@ -26,6 +26,8 @@
namespace cta { namespace objectstore {
class AgentReference;
class ObjectOpsBase {
friend class ScopedLock;
friend class ScopedSharedLock;
......@@ -286,7 +288,7 @@ public:
/**
* This function should be overloaded in the inheriting classes
*/
virtual void garbageCollect(const std::string &presumedOwner) = 0;
virtual void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) = 0;
protected:
......
......@@ -175,6 +175,6 @@ void cta::objectstore::RetrieveQueue::removeJob(const std::string& retriveToFile
} while (found);
}
void cta::objectstore::RetrieveQueue::garbageCollect(const std::string &presumedOwner) {
void cta::objectstore::RetrieveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) {
throw cta::exception::Exception("In RetrieveQueue::garbageCollect(): not implemented");
}
......@@ -34,7 +34,7 @@ public:
RetrieveQueue(const std::string & address, Backend & os);
RetrieveQueue(GenericObject & go);
void initialize(const std::string & vid);
void garbageCollect(const std::string &presumedOwner) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
bool isEmpty();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void removeIfEmpty();
......
......@@ -46,7 +46,7 @@ void RetrieveRequest::initialize() {
m_payloadInterpreted = true;
}
void RetrieveRequest::garbageCollect(const std::string& presumedOwner) {
void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) {
throw cta::exception::Exception("In RetrieveRequest::garbageCollect(): not implemented.");
}
......
......@@ -42,7 +42,7 @@ public:
RetrieveRequest(const std::string & address, Backend & os);
RetrieveRequest(GenericObject & go);
void initialize();
void garbageCollect(const std::string &presumedOwner) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
// Job management ============================================================
void addJob(uint64_t copyNumber, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries);
void setJobSelected(uint16_t copyNumber, const std::string & owner);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment