Commit dea2d33c authored by Eric Cano's avatar Eric Cano
Browse files

Intermediate commit: started creation of garbage collector class

parent b1e9cc60
......@@ -125,7 +125,7 @@ public:
while (true) {
m_agent.heartbeat(m_agent);
// Get the list of current agents
std::list<std::string> agentNames = ar.getElements(m_agent);
std::list<std::string> agentNames = ar.getAgents(m_agent);
// If no one is running, go away after a delay
if(!agentNames.size()) {
if (noAgentTimer.secs() > 1.0)
......
......@@ -17,7 +17,7 @@ namespace cta { namespace objectstore {
* It handles (in the base class):
*/
class Agent: protected ObjectOps<serializers::Agent> {
class Agent: public ObjectOps<serializers::Agent> {
public:
Agent(Backend & os);
......@@ -27,6 +27,8 @@ public:
void registerSelf();
void unregisterSelf();
/* class ScopedIntent {
public:
ScopedIntent(Agent & agent, std::string container, std::string name, serializers::ObjectType objectType):
......
#include "AgentRegister.hpp"
#include "ProtcolBuffersAlgorithms.hpp"
cta::objectstore::AgentRegister::AgentRegister(Backend & os):
ObjectOps<serializers::AgentRegister>(os) {}
cta::objectstore::AgentRegister::AgentRegister(const std::string & name, Backend & os):
ObjectOps<serializers::AgentRegister>(os, name) {}
void cta::objectstore::AgentRegister::addElement (std::string name) {
void cta::objectstore::AgentRegister::addAgent (std::string name) {
checkPayloadWritable();
std::string * ag = m_payload.mutable_elements()->Add();
*ag = name;
m_payload.add_agents(name);
m_payload.add_untrackedagents(name);
}
void cta::objectstore::AgentRegister::removeElement (const std::string & name) {
void cta::objectstore::AgentRegister::removeAgent (const std::string & name) {
checkPayloadReadable();
serializers::removeString(m_payload.mutable_elements(), name);
serializers::removeString(m_payload.mutable_agents(), name);
serializers::removeString(m_payload.mutable_untrackedagents(), name);
}
void cta::objectstore::AgentRegister::addIntendedElement(std::string name) {
void cta::objectstore::AgentRegister::trackAgent(std::string name) {
checkPayloadWritable();
std::string * ag = m_payload.mutable_intendedelements()->Add();
*ag = name;
// Check that the agent is present (next statement throws an exception
// if the agent is not known)
serializers::findString(m_payload.mutable_agents(), name);
serializers::removeString(m_payload.mutable_untrackedagents(), name);
}
void cta::objectstore::AgentRegister::upgradeIntendedElementToActual(std::string name) {
void cta::objectstore::AgentRegister::untrackAgent(std::string name) {
checkPayloadWritable();
serializers::removeString(m_payload.mutable_intendedelements(), name);
std::string * ag = m_payload.mutable_elements()->Add();
*ag = name;
// Check that the agent is present (next statement throws an exception
// if the agent is not known)
serializers::findString(m_payload.mutable_agents(), name);
// Check that the agent is not already in the list to prevent double
// inserts
try {
serializers::findString(m_payload.mutable_untrackedagents(), name);
} catch (serializers::NotFound &) {
m_payload.add_untrackedagents(name);
}
}
void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& name) {
checkPayloadWritable();
serializers::removeString(m_payload.mutable_intendedelements(), name);
std::list<std::string> cta::objectstore::AgentRegister::getAgents() {
std::list<std::string> ret;
for (int i=0; i<m_payload.agents_size(); i++) {
ret.push_back(m_payload.agents(i));
}
return ret;
}
std::list<std::string> cta::objectstore::AgentRegister::getElements() {
std::list<std::string> cta::objectstore::AgentRegister::getUntrackedAgents() {
std::list<std::string> ret;
for (int i=0; i<m_payload.elements_size(); i++) {
ret.push_back(m_payload.elements(i));
for (int i=0; i<m_payload.untrackedagents_size(); i++) {
ret.push_back(m_payload.untrackedagents(i));
}
return ret;
}
......@@ -49,13 +62,13 @@ std::string cta::objectstore::AgentRegister::dump() {
checkPayloadReadable();
std::stringstream ret;
ret<< "<<<< AgentRegister " << getNameIfSet() << " dump start" << std::endl
<< "Array size=" << m_payload.elements_size() << std::endl;
for (int i=0; i<m_payload.elements_size(); i++) {
ret << "element[" << i << "]=" << m_payload.elements(i) << std::endl;
<< "Agents array size=" << m_payload.agents_size() << std::endl;
for (int i=0; i<m_payload.agents_size(); i++) {
ret << "element[" << i << "]=" << m_payload.agents(i) << std::endl;
}
ret << "Intent array size=" << m_payload.intendedelements_size() << std::endl;
for (int i=0; i<m_payload.intendedelements_size(); i++) {
ret << "intendedElement[" << i << "]=" << m_payload.intendedelements(i) << std::endl;
ret << "Untracked agents array size=" << m_payload.untrackedagents_size() << std::endl;
for (int i=0; i<m_payload.untrackedagents_size(); i++) {
ret << "intendedElement[" << i << "]=" << m_payload.untrackedagents(i) << std::endl;
}
ret<< ">>>> AgentRegister " << getNameIfSet() << " dump end" << std::endl;
return ret.str();
......
......@@ -10,13 +10,14 @@ class Agent;
class AgentRegister: public ObjectOps<serializers::AgentRegister> {
public:
AgentRegister(Backend & os);
AgentRegister(const std::string & name, Backend & os);
void addElement (std::string name);
void removeElement (const std::string & name);
void addIntendedElement (std::string name);
void upgradeIntendedElementToActual(std::string name);
void removeIntendedElement (const std::string & name);
std::list<std::string> getElements();
void addAgent (std::string name);
void removeAgent (const std::string & name);
void trackAgent (std::string name);
void untrackAgent(std::string name);
std::list<std::string> getAgents();
std::list<std::string> getUntrackedAgents();
std::string dump();
};
......
#include "AgentWatchdog.hpp"
\ No newline at end of file
#pragma once
#include "Agent.hpp"
#include "utils/Timer.hpp"
namespace cta { namespace objectstore {
class AgentWatchdog {
public:
AgentWatchdog(Agent & agent): m_agent(agent),
m_heartbeatCounter(readHeartbeat()) {}
bool checkAlive() {
uint64_t newHeartBeatCount = readHeartbeat();
if (newHeartBeatCount == m_heartbeatCounter && m_timer.secs() > 5)
return false;
m_heartbeatCounter = newHeartBeatCount;
m_timer.reset();
return true;
}
private:
cta::utils::Timer m_timer;
Agent & m_agent;
uint64_t m_heartbeatCounter;
uint64_t readHeartbeat() {
ScopedSharedLock lock(m_agent);
m_agent.fetch();
return m_agent.getHeartbeatCount();
}
};
}}
......@@ -15,11 +15,14 @@ add_library (CTAObjectStore
RootEntry.cpp
Agent.cpp
AgentRegister.cpp
AgentWatchdog.cpp
BackendVFS.cpp
BackendRados.cpp
ObjectOps.cpp
ProtocolBuffersAlgorithms.cpp
FIFO.cpp
GenericObject.cpp
GarbageCollector.cpp
exception/Backtrace.cpp
exception/Errnum.cpp
exception/Exception.cpp
......
#include "FIFO.hpp"
const size_t cta::objectstore::FIFO::c_compactionSize = 50;
void cta::objectstore::FIFO::initialize() {
ObjectOps<serializers::FIFO>::initialize();
m_payload.set_readpointer(0);
m_payloadInterpreted = true;
}
std::string cta::objectstore::FIFO::peek() {
checkPayloadReadable();
if (m_payload.readpointer() >= (size_t)m_payload.name_size())
throw FIFOEmpty("In FIFO::Transaction::peek: FIFO empty");
return m_payload.name(m_payload.readpointer());
}
void cta::objectstore::FIFO::pop() {
checkPayloadWritable();
if (m_payload.readpointer() >= (size_t)m_payload.name_size())
throw FIFOEmpty("In FIFO::Transaction::popAndUnlock: FIFO empty");
m_payload.set_readpointer(m_payload.readpointer() + 1);
if ((uint64_t)m_payload.readpointer() > c_compactionSize) {
compact();
}
}
void cta::objectstore::FIFO::compact() {
uint64_t oldReadPointer = m_payload.readpointer();
uint64_t oldSize = m_payload.name_size();
// Copy the elements at position oldReadPointer + i to i (squash all)
// before the read pointer
for (int i = oldReadPointer; i < m_payload.mutable_name()->size(); i++) {
*m_payload.mutable_name(i - oldReadPointer) = m_payload.name(i);
}
// Shorten the name array by oldReadPointer elements
for (uint64_t i = 0; i < oldReadPointer; i++) {
m_payload.mutable_name()->RemoveLast();
}
// reset the read pointer
m_payload.set_readpointer(0);
// Check the size is as expected
if ((uint64_t) m_payload.name_size() != oldSize - oldReadPointer) {
std::stringstream err;
err << "In FIFO::compactCurrentState: wrong size after compaction: "
<< "oldSize=" << oldSize << " oldReadPointer=" << oldReadPointer
<< " newSize=" << m_payload.name_size();
throw cta::exception::Exception(err.str());
}
}
void cta::objectstore::FIFO::push(std::string name) {
checkPayloadWritable();
m_payload.add_name(name);
}
uint64_t cta::objectstore::FIFO::size() {
checkPayloadReadable();
uint64_t ret = m_payload.name_size() - m_payload.readpointer();
return ret;
}
std::string cta::objectstore::FIFO::dump() {
checkPayloadReadable();
std::stringstream ret;
ret << "<<<< FIFO dump start: " << getNameIfSet() << std::endl
<< "Read pointer=" << m_payload.readpointer() << std::endl
<< "Array size=" << m_payload.name_size() << std::endl;
for (int i = m_payload.readpointer(); i < m_payload.name_size(); i++) {
ret << "name[phys=" << i << " ,log=" << i - m_payload.readpointer()
<< "]=" << m_payload.name(i) << std::endl;
}
ret << ">>>> FIFO dump end for " << getNameIfSet() << std::endl;
return ret.str();
}
std::list<std::string> cta::objectstore::FIFO::getContent() {
checkPayloadReadable();
std::list<std::string> ret;
for (int i = m_payload.readpointer(); i < m_payload.name_size(); i++) {
ret.push_back(m_payload.name(i));
}
return ret;
}
#include <gtest/gtest.h>
#include "BackendVFS.hpp"
#include "exception/Exception.hpp"
#include "FIFO.hpp"
#include "Agent.hpp"
TEST(FIFO, BasicFuctionnality) {
cta::objectstore::BackendVFS be;
cta::objectstore::Agent agent(be);
agent.generateName("unitTest");
std::string fifoName = agent.nextId("FIFO");
std::list<std::string> expectedData;
{
// Try to create the FIFO entry
cta::objectstore::FIFO ff(fifoName,be);
ff.initialize();
ff.insert();
}
{
for (int i=0; i<100; i++) {
cta::objectstore::FIFO ff(fifoName,be);
cta::objectstore::ScopedExclusiveLock lock(ff);
ff.fetch();
expectedData.push_back(agent.nextId("TestData"));
ff.push(expectedData.back());
ff.commit();
}
}
{
// Check that the data it as expected
cta::objectstore::FIFO ff(fifoName,be);
cta::objectstore::ScopedSharedLock lock(ff);
ff.fetch();
std::list<std::string> dump = ff.getContent();
lock.release();
ASSERT_EQ(expectedData.size(), dump.size());
std::list<std::string>::const_iterator foundI = dump.begin();
std::list<std::string>::const_iterator expectI = expectedData.begin();
while (foundI != dump.end() && expectI != expectedData.end()) {
ASSERT_EQ(*foundI, *expectI);
foundI++;
expectI++;
}
ASSERT_TRUE(foundI == dump.end());
ASSERT_TRUE(expectI == expectedData.end());
cta::objectstore::ScopedSharedLock lock2(ff);
ff.fetch();
//std::cout <<
ff.dump();
}
{
cta::objectstore::FIFO ff(fifoName,be);
cta::objectstore::ScopedExclusiveLock lock(ff);
ff.fetch();
while (ff.size() && expectedData.size()) {
ASSERT_EQ(ff.peek(), expectedData.front());
ff.pop();
ff.commit();
ff.fetch();
expectedData.pop_front();
}
}
// Delete the FIFO entry
cta::objectstore::FIFO ff(fifoName,be);
cta::objectstore::ScopedExclusiveLock lock(ff);
ff.fetch();
ASSERT_EQ(0, ff.size());
ff.remove();
ASSERT_EQ(false, ff.exists());
}
\ No newline at end of file
#include "GarbageCollector.hpp"
#include "RootEntry.hpp"
namespace cta { namespace objectstore {
const size_t GarbageCollector::c_maxWatchedAgentsPerGC = 2;
GarbageCollector::GarbageCollector(Backend & os, Agent & agent):
m_objectStore(os), m_ourAgent(agent), m_agentRegister(os) {
RootEntry re(m_objectStore);
ScopedSharedLock reLock(re);
re.fetch();
m_agentRegister.setName(re.getAgentRegister());
reLock.release();
ScopedSharedLock arLock(m_agentRegister);
m_agentRegister.fetch();
}
void GarbageCollector::runOnePass() {
// Bump our own heart beat
{
ScopedExclusiveLock lock (m_ourAgent);
m_ourAgent.fetch();
m_ourAgent.bumpHeartbeat();
m_ourAgent.commit();
}
trimGoneTargets();
aquireTargets();
checkHeartbeats();
}
void GarbageCollector::trimGoneTargets() {
}
}}
#pragma once
#include "GenericObject.hpp"
#include "Agent.hpp"
#include "AgentWatchdog.hpp"
#include "AgentRegister.hpp"
/**
* Plan => Garbage collector keeps track of the agents.
* If an agent is declared dead => tape ownership of owned objects
* Using the backup owner, re-post the objet to the container.
* All containers will have a "repost" method, which is more thourough
* (and expensive) than the usual one. It can for example prevent double posting.
*/
namespace cta { namespace objectstore {
class GarbageCollector {
public:
GarbageCollector(Backend & os, Agent & agent);
void runOnePass();
void aquireTargets();
void trimGoneTargets();
void checkHeartbeats();
void cleanupDeadAgent();
void reinjectIntendedObject();
private:
Backend & m_objectStore;
Agent & m_ourAgent;
AgentRegister m_agentRegister;
std::map<std::string, std::auto_ptr<AgentWatchdog> > m_watchedAgents;
static const size_t c_maxWatchedAgentsPerGC;
};
}}
\ No newline at end of file
#include "GenericObject.hpp"
namespace cta { namespace objectstore {
void GenericObject::fetch() {
// Check that the object is locked, one way or another
if(!m_locksCount)
throw NotLocked("In ObjectOps::fetch(): object not locked");
m_existingObject = true;
// Get the header from the object store. We don't care for the type
m_header.ParseFromString(m_objectStore.read(getNameIfSet()));
m_headerInterpreted = true;
}
serializers::ObjectType GenericObject::type() {
checkHeaderReadable();
return m_header.type();
}
void GenericObject::commit() {
checkHeaderWritable();
m_objectStore.atomicOverwrite(getNameIfSet(), m_header.SerializeAsString());
}
void GenericObject::insert() {
throw ForbiddenOperation("In GenericObject::insert: this operation is not possible");
}
void GenericObject::initialize() {
throw ForbiddenOperation("In GenericObject::initialize: this operation is not possible");
}
}}
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
namespace cta { namespace objectstore {
class GenericObject: public ObjectOps<serializers::GenericObject> {
public:
GenericObject(const std::string & name, Backend & os):
ObjectOps<serializers::GenericObject>(os) {};
class ForbiddenOperation: public cta::exception::Exception {
public:
ForbiddenOperation(const std::string & w): cta::exception::Exception(w) {}
};
// Overload of ObjectOps's implementation: this special object tolerates all
// types of objects
void fetch();
// Overload of ObjectOps's implementation: we will leave the payload transparently
// untouched and only deal with header parameters
void commit();
// Get the object's type (type is forced implicitely in other classes)
serializers::ObjectType type();
// Overload of ObjectOps's implementation: this operation is forbidden. Generic
// Object is only used to manipulate existing objects
void insert();
// Overload of ObjectOps's implementation: this operation is forbidden. Generic
// Object is only used to manipulate existing objects
void initialize();
};
}}
#pragma once
namespace cta { namespace objectstore {
class ObjectHeader {
};
}}
\ No newline at end of file
......@@ -5,6 +5,7 @@ namespace cta { namespace objectstore {
#define MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(A) \
template <> const serializers::ObjectType ObjectOps<serializers::A>::typeId = serializers::A##_t
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(GenericObject);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RootEntry);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(AgentRegister);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Agent);
......
......@@ -209,7 +209,7 @@ public:
}
void commit() {
checkWritable();
checkPayloadWritable();
if (!m_existingObject)
throw NewObject("In ObjectOps::commit: trying to update a new object");
// Serialise the payload into the header
......
......@@ -20,7 +20,7 @@ public:
try {
AgentRegister ar(re.getAgentRegister(agent), agent);
ret << ar.dump(agent) << std::endl;
std::list<std::string> agList = ar.getElements(agent);
std::list<std::string> agList = ar.getAgents(agent);
for (std::list<std::string>::iterator i=agList.begin(); i!=agList.end(); i++) {
AgentVisitor a(*i, agent);
ret << a.dump(agent) << std::endl;
......
#pragma once
#include <google/protobuf/repeated_field.h>
#include "exception/Exception.hpp"
namespace cta { namespace objectstore { namespace serializers {
void removeString(::google::protobuf::RepeatedPtrField< ::std::string>* field,
const std::string & value);
class NotFound: public cta::exception::Exception {
public:
NotFound(const std::string & w): cta::exception::Exception(w) {}
};
size_t findString(::google::protobuf::RepeatedPtrField< ::std::string>* field,
const std::string & value);
}}}
\ No newline at end of file
......@@ -5,7 +5,7 @@ void cta::objectstore::serializers::removeString(::google::protobuf::RepeatedPtr
bool found;
do {
found = false;
for (int i=0; i<field->size(); i++) {
for (size_t i=0; i<(size_t)field->size(); i++) {
if (value == field->Get(i)) {
found = true;
field->SwapElements(i, field->size()-1);
......@@ -14,4 +14,15 @@ void cta::objectstore::serializers::removeString(::google::protobuf::RepeatedPtr
}
}
} while (found);
}
size_t cta::objectstore::serializers::findString(
::google::protobuf::RepeatedPtrField< ::std::string>* field,
const std::string& value) {
for (size_t i=0; i<(size_t)field->size(); i++) {
if (value == field->Get(i)) {
return i;
}
}
throw NotFound("In cta::objectstore::serializers::findString: string not found");
}
\ No newline at end of file