Commit d624cae5 authored by Eric Cano's avatar Eric Cano
Browse files

Complete the implementation of GarbageCollector and started implementation of its unit test.

parent f295a546
......@@ -13,6 +13,9 @@
cta::objectstore::Agent::Agent(Backend & os):
ObjectOps<serializers::Agent>(os), m_nextId(0) {}
cta::objectstore::Agent::Agent(const std::string & name, Backend & os):
ObjectOps<serializers::Agent>(os, name), m_nextId(0) {}
void cta::objectstore::Agent::generateName(const std::string & typeName) {
std::stringstream aid;
// Get time
......
......@@ -21,6 +21,8 @@ class Agent: public ObjectOps<serializers::Agent> {
public:
Agent(Backend & os);
Agent(const std::string & name, Backend & os);
void generateName(const std::string & typeName);
std::string nextId(const std::string & childType);
......
......@@ -6,21 +6,27 @@
namespace cta { namespace objectstore {
class AgentWatchdog {
public:
AgentWatchdog(Agent & agent): m_agent(agent),
m_heartbeatCounter(readHeartbeat()) {}
AgentWatchdog(const std::string & name, Backend & os): m_agent(name, os),
m_heartbeatCounter(readHeartbeat()), m_timeout(5.0) {}
bool checkAlive() {
uint64_t newHeartBeatCount = readHeartbeat();
if (newHeartBeatCount == m_heartbeatCounter && m_timer.secs() > 5)
if (newHeartBeatCount == m_heartbeatCounter && m_timer.secs() > m_timeout)
return false;
m_heartbeatCounter = newHeartBeatCount;
m_timer.reset();
return true;
}
void setTimeout(double timeout) {
m_timeout = timeout;
}
private:
cta::utils::Timer m_timer;
Agent & m_agent;
Agent m_agent;
uint64_t m_heartbeatCounter;
double m_timeout;
uint64_t readHeartbeat() {
ScopedSharedLock lock(m_agent);
......
......@@ -60,6 +60,7 @@ set(ObjectStoreUnitTests
BackendTest.cpp
RootEntryTest.cpp
FIFOTest.cpp
GarbageCollectorTest.cpp
)
add_executable(unitTests unitTests.cpp ${ObjectStoreUnitTests})
......
#include "FIFO.hpp"
#include "ProtcolBuffersAlgorithms.hpp"
const size_t cta::objectstore::FIFO::c_compactionSize = 50;
......@@ -56,6 +57,15 @@ void cta::objectstore::FIFO::push(std::string name) {
m_payload.add_name(name);
}
void cta::objectstore::FIFO::pushIfNotPresent(std::string name) {
checkPayloadWritable();
try {
serializers::findString(m_payload.mutable_name(), name);
} catch (serializers::NotFound &) {
m_payload.add_name(name);
}
}
uint64_t cta::objectstore::FIFO::size() {
checkPayloadReadable();
uint64_t ret = m_payload.name_size() - m_payload.readpointer();
......
......@@ -25,6 +25,8 @@ public:
void push(std::string name);
void pushIfNotPresent (std::string name);
std::string dump();
uint64_t size();
......
#include "GarbageCollector.hpp"
#include "RootEntry.hpp"
#include "FIFO.hpp"
#include <algorithm>
namespace cta { namespace objectstore {
const size_t GarbageCollector::c_maxWatchedAgentsPerGC = 2;
const size_t GarbageCollector::c_maxWatchedAgentsPerGC = 2;
GarbageCollector::GarbageCollector(Backend & os, Agent & agent):
m_objectStore(os), m_ourAgent(agent), m_agentRegister(os) {
GarbageCollector::GarbageCollector(Backend & os, Agent & agent):
m_objectStore(os), m_ourAgent(agent), m_agentRegister(os), m_timeout(5.0) {
RootEntry re(m_objectStore);
ScopedSharedLock reLock(re);
re.fetch();
......@@ -13,9 +15,9 @@ namespace cta { namespace objectstore {
reLock.release();
ScopedSharedLock arLock(m_agentRegister);
m_agentRegister.fetch();
}
}
void GarbageCollector::runOnePass() {
void GarbageCollector::runOnePass() {
// Bump our own heart beat
{
ScopedExclusiveLock lock (m_ourAgent);
......@@ -26,9 +28,163 @@ namespace cta { namespace objectstore {
trimGoneTargets();
aquireTargets();
checkHeartbeats();
}
void GarbageCollector::trimGoneTargets() {
ScopedSharedLock arLock(m_agentRegister);
m_agentRegister.fetch();
arLock.release();
std::list<std::string> agentList = m_agentRegister.getAgents();
for (std::map<std::string, AgentWatchdog * >::iterator wa
= m_watchedAgents.begin();
wa != m_watchedAgents.end();) {
if (agentList.end() == std::find(agentList.begin(), agentList.end(), wa->first)) {
ScopedExclusiveLock oaLock(m_ourAgent);
m_ourAgent.fetch();
m_ourAgent.removeFromOwnership(wa->first);
m_ourAgent.commit();
oaLock.release();
delete wa->second;
m_watchedAgents.erase(wa++);
} else {
wa++;
}
}
}
void GarbageCollector::aquireTargets() {
ScopedExclusiveLock arLock(m_agentRegister);
m_agentRegister.fetch();
// Get the list of untracked agents
std::list<std::string> candidatesList = m_agentRegister.getUntrackedAgents();
std::list<std::string>::const_iterator c = candidatesList.begin();
// We can now take ownership of new agents, up to our max...
// and we don't monitor ourselves!
while (m_watchedAgents.size() < c_maxWatchedAgentsPerGC
&& c!=candidatesList.end()) {
// We don't monitor ourselves
if (*c != m_ourAgent.getNameIfSet()) {
// So we have a candidate we might want to monitor
// First, check that the agent entry exists, and that ownership
// is indeed pointing to the agent register
Agent ag(*c, m_objectStore);
if (!ag.exists()) {
// This is a dangling pointer to a dead object:
// remove it in the agentRegister.
m_agentRegister.removeAgent(*c);
continue;
}
ScopedExclusiveLock agLock(ag);
ag.fetch();
// Check that the actual owner is the agent register.
// otherwise, it should not be listed as an agent to monitor
if (ag.getOwner() != m_agentRegister.getNameIfSet()) {
m_agentRegister.trackAgent(ag.getNameIfSet());
agLock.release();
continue;
}
// We are now interested in tracking this agent. So we will transfer its
// ownership. We alredy have an exclusive lock on the agent.
// Lock ours
ScopedExclusiveLock oaLock(m_ourAgent);
m_ourAgent.fetch();
m_ourAgent.addToOwnership(ag.getNameIfSet());
m_ourAgent.commit();
// We now have a pointer to the agent, we can make the ownership official
ag.setOwner(m_ourAgent.getNameIfSet());
ag.commit();
// And we can remove the now dangling pointer from the agent register
// (we hold an exclusive lock all along)
m_agentRegister.trackAgent(ag.getNameIfSet());
m_agentRegister.commit();
// Agent is now officially ours, let's track it
m_watchedAgents[ag.getNameIfSet()] =
new AgentWatchdog(ag.getNameIfSet(), m_objectStore);
m_watchedAgents[ag.getNameIfSet()]->setTimeout(m_timeout);
}
}
// Commit all the modifications to the agent register
m_agentRegister.commit();
}
void GarbageCollector::trimGoneTargets() {
void GarbageCollector::checkHeartbeats() {
// Check the heartbeats of the watched agents
// We can still fail on many steps
for (std::map<std::string, AgentWatchdog * >::iterator wa = m_watchedAgents.begin();
wa != m_watchedAgents.end();) {
// Get the heartbeat. Clean dead agents and remove references to them
if (!wa->second->checkAlive()) {
cleanupDeadAgent(wa->first);
ScopedExclusiveLock oaLock(m_ourAgent);
m_ourAgent.removeFromOwnership(wa->first);
m_ourAgent.commit();
delete wa->second;
m_watchedAgents.erase(wa++);
} else {
wa++;
}
}
}
void GarbageCollector::cleanupDeadAgent(const std::string & name) {
// Check that we are still owners of the agent (sanity check).
Agent agent(name, m_objectStore);
ScopedExclusiveLock agLock(agent);
agent.fetch();
if (agent.getOwner() != m_ourAgent.getNameIfSet()) {
throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: the ownership is not ours as expected");
}
// Return all objects owned by the agent to their respective backup owners
std::list<std::string> ownedObjects = agent.getOwnershipList();
for (std::list<std::string>::iterator obj = ownedObjects.begin();
obj!= ownedObjects.end(); obj++) {
// Find the object
GenericObject go(*obj, m_objectStore);
// If the objet does not exist, we're done.
if (!go.exists())
continue;
ScopedExclusiveLock goLock(go);
go.fetch();
// if the object is already owned by someone else, this was a dangling pointer:
// nothing to do.
if (go.getOwner() != name)
continue;
// We reached a point where we have to actually move to ownership to somewhere
// else. Find that somewhere else
GenericObject gContainter(go.getBackupOwner(), m_objectStore);
if (!go.exists()) {
throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: backup owner does not exist!");
}
ScopedSharedLock gContLock(gContainter);
gContainter.fetch();
gContLock.release();
switch(gContainter.type()) {
case serializers::FIFO_t: {
FIFO fifo(go.getBackupOwner(), m_objectStore);
ScopedExclusiveLock ffLock(fifo);
fifo.fetch();
fifo.pushIfNotPresent(go.getNameIfSet());
fifo.commit();
// We now have a pointer to the object. Make the change official.
go.setOwner(go.getBackupOwner());
go.commit();
ffLock.release();
goLock.release();
}
default: {
throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: unexpected container type!");
}
}
// We now processed all the owned objects. We can delete the agent's entry
agent.remove();
// And remove the (dangling) pointers to it
ScopedExclusiveLock arLock(m_agentRegister);
m_agentRegister.fetch();
m_agentRegister.removeAgent(name);
m_agentRegister.commit();
}
}
}}
......@@ -27,15 +27,18 @@ public:
void checkHeartbeats();
void cleanupDeadAgent();
void cleanupDeadAgent(const std::string & name);
void reinjectIntendedObject();
void reinjectOwnedObject();
void setTimeout(double timeout);
private:
Backend & m_objectStore;
Agent & m_ourAgent;
AgentRegister m_agentRegister;
std::map<std::string, std::auto_ptr<AgentWatchdog> > m_watchedAgents;
std::map<std::string, AgentWatchdog * > m_watchedAgents;
static const size_t c_maxWatchedAgentsPerGC;
double m_timeout;
};
}}
\ No newline at end of file
#include <gtest/gtest.h>
#include "BackendVFS.hpp"
#include "exception/Exception.hpp"
#include "GarbageCollector.hpp"
#include "FIFO.hpp"
#include "Agent.hpp"
#include "AgentRegister.hpp"
#include "RootEntry.hpp"
TEST(GarbageCollector, BasicFuctionnality) {
cta::objectstore::BackendVFS be;
cta::objectstore::Agent agent(be);
agent.generateName("unitTestGarbageCollector");
// Create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
// Create the agent register
re.allocateOrGetAgentRegister(agent);
// Create 2 agents, A and B and register them
cta::objectstore::Agent agA(be), agB(be);
agA.generateName("unitTestAgentA");
agB.generateName("unitTestAgentB");
// Create target FIFO
std::string fifoName = agent.nextId("FIFO");
std::list<std::string> expectedData;
{
// Try to create the FIFO entry
cta::objectstore::FIFO ff(fifoName,be);
ff.initialize();
ff.insert();
}
{
for (int i=0; i<100; i++) {
// We create FIFOs here, but any object can do.
// Create a new object
cta::objectstore::FIFO newFIFO(agent.nextId("RandomObject"), be);
// Small shortcut: insert the link to the new object straight into the FIFO
cta::objectstore::FIFO centralFifo(fifoName, be);
cta::objectstore::ScopedExclusiveLock lock(centralFifo);
centralFifo.fetch();
expectedData.push_back(agent.nextId("TestData"));
centralFifo.push(expectedData.back());
centralFifo.commit();
lock.release();
// Then actually create the object
newFIFO.initialize();
newFIFO.setOwner(fifoName);
newFIFO.setBackupOwner(fifoName);
newFIFO.insert();
}
}
// TODO: take ownership of FIFO contents in agA and agB, and then garbage collect.
// The FIFO should get all its objects back.
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment