Commit 7feba896 authored by Eric Cano's avatar Eric Cano
Browse files

Intermediate commit: started to implement the garbage collection, plus a...

Intermediate commit: started to implement the garbage collection, plus a producer and consumer action.
The Agent class is becomming too complex, so started to split it up between the current agent (living with the application it represents)
and the AgentVisitor, which will look at it from the outside (for the garbage collector).
parent c10a0514
#pragma once
#include "Agent.hpp"
#include "RootEntry.hpp"
#include "RecallJob.hpp"
#include "JobPool.hpp"
#include "FIFO.hpp"
#include "AgentRegister.hpp"
#include <unistd.h>
namespace cta { namespace objectstore {
class Action {
public:
void execute() {}
virtual void execute() {}
virtual ~Action() {}
};
class JobPoster: public Action {
public:
JobPoster (Agent & agent, int number, int objective):
m_agent(agent), m_number(number), m_objective(objective), m_achieved(0){}
virtual void execute() {
std::stringstream name;
name << "JobPoster-" << m_number;
m_agent.setup(name.str());
m_agent.create();
RootEntry re(m_agent);
JobPool jp(re.getJobPool(m_agent), m_agent);
std::string fifoName = jp.getRecallFIFO(m_agent);
FIFO fifo(fifoName, m_agent);
std::cout << name << " starting";
while (m_achieved < m_objective) {
std::stringstream src, dst;
src << "S-" << m_number << "-" << m_achieved;
dst << "D-" << m_number << "-" << m_achieved;
// This create the recall job and return the name. It is
// automatically added to the creation intent log of m_agent.
std::string rjName = RecallJob::create(fifoName, src.str(), dst.str(), m_agent);
// Post the job
fifo.push(rjName, m_agent);
// We can now release it from the intent log
m_agent.removeFromIntent(fifoName, rjName, "RecallJob");
std::cout << rjName << " created" << std::endl;
m_achieved++;
}
std::cout << name << " complete" << std::endl;
}
private:
Agent & m_agent;
int m_number;
int m_objective;
int m_achieved;
};
class Recaller: public Action {
public:
Recaller (Agent & agent, int number): m_agent(agent), m_number(number) {}
virtual void execute() {
std::stringstream name;
name << "RecallerAction-" << m_number;
m_agent.setup(name.str());
m_agent.create();
RootEntry re(m_agent);
JobPool jp(re.getJobPool(m_agent), m_agent);
FIFO fifo(jp.getRecallFIFO(m_agent), m_agent);
std::cout << name << " starting";
while (true) {
try {
// Pop a job from the FIFO
FIFO::Transaction tr = fifo.startTransaction(m_agent);
std::string rjName = tr.peek();
m_agent.addToOwnership(tr.peek(), "RecallJob");
tr.popAndUnlock();
RecallJob rj(rjName, m_agent);
// Sleep on it for a while
usleep(100 * 1000);
// Log the deletion
std::cout << "RecallJob " << rj.source(m_agent) << " => "
<< rj.destination(m_agent) << " is done" << std::endl;
rj.remove();
} catch (FIFO::FIFOEmpty &) { break; }
std::cout << name << "complete: FIFO empty" << std::endl;
}
}
private:
Agent & m_agent;
int m_number;
};
class GarbageCollector: public Action {
public:
GarbageCollector (Agent & agent): m_agent(agent) {}
virtual void execute() {
std::stringstream name;
name << "GarbageCollector";
m_agent.setup(name.str());
m_agent.create();
RootEntry re(m_agent);
AgentRegister ar(re.getAgentRegister(m_agent), m_agent);
std::cout << name << " starting";
utils::Timer noAgentTimer;
std::map<std::string, AgentWatchdog *> watchdogs;
while (true) {
// Get the list of current agents
std::list<std::string> agentNames = ar.getElements(m_agent);
// If no one is running, go away after a delay
if(!agentNames.size()) {
if (noAgentTimer.secs() > 1.0)
break;
} else {
noAgentTimer.reset();
}
// On a first pass, trim the watchdog list of now-gone agents
for (std::map<std::string, AgentWatchdog *>::iterator i=watchdogs.begin();
i != watchdogs.end();) {
if (std::find(agentNames.begin(), agentNames.end(), i->first) == agentNames.end()) {
delete i->second;
// The post-increment returns the iterator to erase,
// but sets i to the next value, which will remain valid.
watchdogs.erase(i++);
} else {
i++;
}
}
// On a second pass, check that we are acquaint with all processes
for (std::list<std::string>::iterator i=agentNames.begin();
i != agentNames.end(); i++) {
if(watchdogs.find(*i) == watchdogs.end()) {
watchdogs[*i] = new AgentWatchdog(*i, m_agent);
}
}
// And now check the heartbeats of the agents
for (std::map<std::string, AgentWatchdog *>::iterator i=watchdogs.begin();
i != watchdogs.end();) {
if (!i->second->checkAlive(m_agent)) {
collectGarbage(i->first);
delete i->second;
// The post-increment returns the iterator to erase,
// but sets i to the next value, which will remain valid.
watchdogs.erase(i++);
}
}
}
}
private:
Agent & m_agent;
void collectGarbage(const std::string & agentName) {
// When collecting the garbage of an agent, we have to iterate through its
// intended and owned objects, validate that they are still owned by the dead
// agent, and re-post them to the container where they should be (and ownership)
// is re-set to the container.
Agent ag(agentName, m_agent);
std::list<Agent::intentEntry> intendedObjects = ag.getIntentLog();
}
};
}}
......
......@@ -7,22 +7,22 @@
#include <ctime>
cta::objectstore::Agent::Agent(ObjectStore & os):
ObjectOps<cta::objectstore::serializers::Agent>(os),
ObjectOps<serializers::Agent>(os),
m_nextId(0), m_setupDone(false), m_creationDone(false), m_observerVersion(false) {};
cta::objectstore::Agent::Agent(ObjectStore & os, const std::string & typeName):
ObjectOps<cta::objectstore::serializers::Agent>(os),
ObjectOps<serializers::Agent>(os),
m_nextId(0), m_setupDone(false), m_creationDone(false), m_observerVersion(false) {
setup(typeName);
}
// Passive constructor, used for looking at existing agent records
cta::objectstore::Agent::Agent(const std::string & name, Agent & agent):
ObjectOps<cta::objectstore::serializers::Agent>(agent.objectStore(), name),
ObjectOps<serializers::Agent>(agent.objectStore(), name),
m_nextId(0), m_setupDone(true), m_creationDone(true), m_observerVersion(true)
{
// check the presence of the entry
cta::objectstore::serializers::Agent as;
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
}
......@@ -45,7 +45,7 @@ void cta::objectstore::Agent::setup(const std::string & typeName) {
<< std::setw(2) << localNow.tm_hour << ":"
<< std::setw(2) << localNow.tm_min << ":"
<< std::setw(2) << localNow.tm_sec;
ObjectOps<cta::objectstore::serializers::Agent>::setName(aid.str());
ObjectOps<serializers::Agent>::setName(aid.str());
m_setupDone = true;
}
......@@ -55,7 +55,7 @@ void cta::objectstore::Agent::create() {
RootEntry re(*this);
AgentRegister ar(re.allocateOrGetAgentRegister(*this), *this);
ar.addIntendedElement(selfName(), *this);
cta::objectstore::serializers::Agent as;
serializers::Agent as;
as.set_name(selfName());
writeChild(selfName(), as);
ar.upgradeIntendedElementToActual(selfName(), *this);
......@@ -110,10 +110,10 @@ cta::objectstore::ContextHandleImplementation<myOS> &
void cta::objectstore::Agent::addToIntend (std::string container, std::string name, std::string typeName) {
if (!m_creationDone)
throw CreationNotDone("In Agent::addToIntend(): creation() not yet done");
cta::objectstore::serializers::Agent as;
serializers::Agent as;
ContextHandle & ctx = getFreeContext();
lockExclusiveAndRead(as, ctx);
cta::objectstore::serializers::ObjectCreationIntent * oca =
serializers::ObjectCreationIntent * oca =
as.mutable_creationintent()->Add();
oca->set_container(container);
oca->set_name(name);
......@@ -125,7 +125,7 @@ void cta::objectstore::Agent::addToIntend (std::string container, std::string na
void cta::objectstore::Agent::removeFromIntent (std::string container, std::string name, std::string typeName) {
if (!m_creationDone)
throw CreationNotDone("In Agent::removeFromIntent(): creation() not yet done");
cta::objectstore::serializers::Agent as;
serializers::Agent as;
ContextHandle & ctx = getFreeContext();
lockExclusiveAndRead(as, ctx);
bool found;
......@@ -149,10 +149,10 @@ void cta::objectstore::Agent::removeFromIntent (std::string container, std::stri
void cta::objectstore::Agent::addToOwnership(std::string name, std::string typeName) {
if (!m_creationDone)
throw CreationNotDone("In Agent::addToOwnership(): creation() not yet done");
cta::objectstore::serializers::Agent as;
serializers::Agent as;
ContextHandle & ctx = getFreeContext();
lockExclusiveAndRead(as, ctx);
cta::objectstore::serializers::ObjectOwnershipIntent * ooi =
serializers::ObjectOwnershipIntent * ooi =
as.mutable_ownershipintent()->Add();
ooi->set_name(name);
ooi->set_type(typeName);
......@@ -163,7 +163,7 @@ void cta::objectstore::Agent::addToOwnership(std::string name, std::string typeN
void cta::objectstore::Agent::removeFromOwnership(std::string name, std::string typeName) {
if (!m_creationDone)
throw CreationNotDone("In Agent::removeFromOwnership(): creation() not yet done");
cta::objectstore::serializers::Agent as;
serializers::Agent as;
ContextHandle & ctx = getFreeContext();
lockExclusiveAndRead(as, ctx);
bool found;
......@@ -187,7 +187,7 @@ std::list<cta::objectstore::Agent::intentEntry>
cta::objectstore::Agent::getIntentLog() {
if (!m_creationDone)
throw CreationNotDone("In Agent::getIntentLog(): creation() not yet done");
cta::objectstore::serializers::Agent as;
serializers::Agent as;
updateFromObjectStore(as, getFreeContext());
std::list<intentEntry> ret;
for (int i=0; i<as.creationintent_size(); i++) {
......@@ -202,7 +202,7 @@ std::list<cta::objectstore::Agent::ownershipEntry>
cta::objectstore::Agent::getOwnershipLog() {
if (!m_creationDone)
throw CreationNotDone("In Agent::getOwnershipLog(): creation() not yet done");
cta::objectstore::serializers::Agent as;
serializers::Agent as;
updateFromObjectStore(as, getFreeContext());
std::list<ownershipEntry> ret;
for (int i=0; i<as.creationintent_size(); i++) {
......@@ -213,11 +213,11 @@ std::list<cta::objectstore::Agent::ownershipEntry>
}
cta::objectstore::ObjectStore & cta::objectstore::Agent::objectStore() {
return ObjectOps<cta::objectstore::serializers::Agent>::objectStore();
return ObjectOps<serializers::Agent>::objectStore();
}
std::string cta::objectstore::Agent::dump(Agent & agent) {
cta::objectstore::serializers::Agent as;
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< Agent " << selfName() << " dump start" << std::endl
......@@ -235,4 +235,30 @@ std::string cta::objectstore::Agent::dump(Agent & agent) {
}
ret<< ">>>> Agent " << selfName() << " dump end" << std::endl;
return ret.str();
}
\ No newline at end of file
}
void cta::objectstore::Agent::heartbeat(Agent& agent) {
ContextHandle & context = agent.getFreeContext();
serializers::Agent as;
lockExclusiveAndRead(as, context);
}
uint64_t cta::objectstore::Agent::getHeartbeatCount(Agent& agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
}
cta::objectstore::AgentWatchdog::AgentWatchdog(const std::string& agentName, Agent& agent):
m_agentVisitor(agentName, agent) {
m_hearbeatCounter = m_agentVisitor.getHeartbeatCount(agent);
}
bool cta::objectstore::AgentWatchdog::checkAlive(Agent& agent) {
uint64_t newHeartBeatCount = m_agentVisitor.getHeartbeatCount(agent);
if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 0.1)
return false;
m_hearbeatCounter = newHeartBeatCount;
return true;
}
......@@ -4,6 +4,7 @@
#include "ObjectOps.hpp"
#include "ContextHandle.hpp"
#include "objectstore/cta.pb.h"
#include "utils/Timer.hpp"
namespace cta { namespace objectstore {
......@@ -16,7 +17,7 @@ namespace cta { namespace objectstore {
* It handles (in the base class):
*/
class Agent: protected ObjectOps<cta::objectstore::serializers::Agent> {
class Agent: protected ObjectOps<serializers::Agent> {
public:
Agent(ObjectStore & os);
......@@ -40,6 +41,7 @@ public:
public:
ObserverOnly(const std::string & w): cta::exception::Exception(w) {}
};
void create();
......@@ -87,6 +89,10 @@ public:
std::string dump(Agent & agent);
void heartbeat(Agent & agent);
uint64_t getHeartbeatCount(Agent & agent);
private:
std::string m_typeName;
bool m_setupDone;
......@@ -97,4 +103,14 @@ private:
ContextHandleImplementation<myOS> m_contexts[c_handleCount];
};
class AgentWatchdog {
public:
AgentWatchdog(const std::string & agentName, Agent & agent);
bool checkAlive(Agent & agent);
private:
cta::utils::Timer m_timer;
Agent m_agentVisitor;
uint64_t m_hearbeatCounter;
};
}}
\ No newline at end of file
#include "AgentRegister.hpp"
cta::objectstore::AgentRegister::AgentRegister(const std::string & name, Agent & agent):
ObjectOps<cta::objectstore::serializers::AgentRegister>(agent.objectStore(), name) {
ObjectOps<serializers::AgentRegister>(agent.objectStore(), name) {
// Check that the entry is present and readable (depending on implementation
// of object store, locking might or might not succeed)
cta::objectstore::serializers::AgentRegister rs;
serializers::AgentRegister rs;
updateFromObjectStore(rs, agent.getFreeContext());
}
void cta::objectstore::AgentRegister::addElement (std::string name, Agent & agent) {
cta::objectstore::serializers::AgentRegister rs;
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context);
rs.add_elements(name);
......@@ -18,7 +18,7 @@ void cta::objectstore::AgentRegister::addElement (std::string name, Agent & agen
}
void cta::objectstore::AgentRegister::removeElement (const std::string & name, Agent & agent) {
cta::objectstore::serializers::AgentRegister rs;
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context);
bool found;
......@@ -38,7 +38,7 @@ void cta::objectstore::AgentRegister::removeElement (const std::string & name,
}
void cta::objectstore::AgentRegister::addIntendedElement(std::string name, Agent& agent) {
cta::objectstore::serializers::AgentRegister rs;
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context);
rs.add_intendedelements(name);
......@@ -47,7 +47,7 @@ void cta::objectstore::AgentRegister::addIntendedElement(std::string name, Agent
}
void cta::objectstore::AgentRegister::upgradeIntendedElementToActual(std::string name, Agent& agent) {
cta::objectstore::serializers::AgentRegister rs;
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context);
bool found;
......@@ -69,7 +69,7 @@ void cta::objectstore::AgentRegister::upgradeIntendedElementToActual(std::string
void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& name, Agent& agent) {
cta::objectstore::serializers::AgentRegister rs;
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context);
bool found;
......@@ -91,7 +91,7 @@ void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& n
std::list<std::string> cta::objectstore::AgentRegister::getElements(Agent & agent) {
cta::objectstore::serializers::AgentRegister rs;
serializers::AgentRegister rs;
updateFromObjectStore(rs, agent.getFreeContext());
std::list<std::string> ret;
for (int i=0; i<rs.elements_size(); i++) {
......@@ -101,7 +101,7 @@ std::list<std::string> cta::objectstore::AgentRegister::getElements(Agent & agen
}
std::string cta::objectstore::AgentRegister::dump(Agent & agent) {
cta::objectstore::serializers::AgentRegister rs;
serializers::AgentRegister rs;
updateFromObjectStore(rs, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< AgentRegister " << selfName() << " dump start" << std::endl
......
......@@ -6,7 +6,7 @@
namespace cta { namespace objectstore {
class AgentRegister: private ObjectOps<cta::objectstore::serializers::AgentRegister> {
class AgentRegister: private ObjectOps<serializers::AgentRegister> {
public:
AgentRegister(const std::string & name, Agent & agent);
void addElement (std::string name, Agent & agent);
......
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
namespace cta { namespace objectstore {
class AgentVisitor: private ObjectOps<serializers::Agent> {
};
}}
\ No newline at end of file
......@@ -21,7 +21,8 @@ add_executable (tapeResourceMangerTest tapeResourceManagerTest.cpp
exception/strerror_r_wrapper.cpp
threading/ChildProcess.cpp
threading/Mutex.cpp
threading/Threading.cpp)
threading/Threading.cpp
utils/Timer.cpp)
target_link_libraries(tapeResourceMangerTest
protobuf
......
......@@ -6,11 +6,11 @@
namespace cta { namespace objectstore {
class FIFO: private ObjectOps<cta::objectstore::serializers::FIFO> {
class FIFO: private ObjectOps<serializers::FIFO> {
public:
FIFO(const std::string & name, Agent & agent):
ObjectOps<cta::objectstore::serializers::FIFO>(agent.objectStore(), name) {
cta::objectstore::serializers::FIFO fs;
ObjectOps<serializers::FIFO>(agent.objectStore(), name) {
serializers::FIFO fs;
updateFromObjectStore(fs, agent.getFreeContext());
}
......@@ -20,6 +20,11 @@ private:
}
public:
class FIFOEmpty: public cta::exception::Exception {
public:
FIFOEmpty(const std::string & context): cta::exception::Exception(context) {}
};
friend class Transaction;
class Transaction {
public:
......@@ -28,15 +33,26 @@ public:
m_fifo.lock(m_ctx);
}
~Transaction() {
try {
if(!m_writeDone)
m_fifo.unlock(m_ctx);
} catch (...) {}
}
std::string peek() {
if (m_writeDone)
throw cta::exception::Exception("In FIFO::Transaction::peek: write already occurred");
if (m_fifo.m_currentState.readpointer() >= m_fifo.m_currentState.name_size())
throw FIFOEmpty("In FIFO::Transaction::peek: FIFO empty");
return m_fifo.m_currentState.name(m_fifo.m_currentState.readpointer());
}
void popAndUnlock() {
if (m_writeDone)
throw cta::exception::Exception("In FIFO::Transaction::popAndUnlock: write already occurred");
if (m_fifo.m_currentState.readpointer() >= m_fifo.m_currentState.name_size())
throw FIFOEmpty("In FIFO::Transaction::popAndUnlock: FIFO empty");
m_fifo.m_currentState.set_readpointer(m_fifo.m_currentState.readpointer()+1);
if (m_fifo.m_currentState.readpointer() > 100) {
m_fifo.compactCurrentState();
......@@ -56,7 +72,7 @@ public:
}
void push(std::string name, Agent & agent) {
cta::objectstore::serializers::FIFO fs;
serializers::FIFO fs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(fs, context);
fs.add_name(name);
......@@ -65,7 +81,7 @@ public:
}
std::string dump(Agent & agent) {
cta::objectstore::serializers::FIFO fs;
serializers::FIFO fs;
updateFromObjectStore(fs, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< FIFO dump start" << std::endl
......@@ -80,7 +96,7 @@ public:
}
private:
cta::objectstore::serializers::FIFO m_currentState;
serializers::FIFO m_currentState;
void compactCurrentState() {
uint64_t oldReadPointer = m_currentState.readpointer();
......
......@@ -6,11 +6,11 @@
namespace cta { namespace objectstore {
class JobPool: private ObjectOps<cta::objectstore::serializers::JobPool> {
class JobPool: private ObjectOps<serializers::JobPool> {
public:
JobPool(const std::string & name, Agent & agent):
ObjectOps<cta::objectstore::serializers::JobPool>(agent.objectStore(), name) {
cta::objectstore::serializers::JobPool jps;
ObjectOps<serializers::JobPool>(agent.objectStore(), name) {
serializers::JobPool jps;
updateFromObjectStore(jps, agent.getFreeContext());
}
......@@ -25,7 +25,7 @@ public:
};
std::string dump(Agent & agent) {
cta::objectstore::serializers::JobPool jps;
serializers::JobPool jps;
updateFromObjectStore(jps, agent.getFreeContext());
std::stringstream ret;
ret << "<<<< JobPool " << selfName() << " dump start" << std::endl
......@@ -37,7 +37,7 @@ public:
std::string getRecallFIFO (Agent & agent) {
// Check if the recall FIFO exists
cta::objectstore::serializers::JobPool res;
serializers::JobPool res;
updateFromObjectStore(res, agent.getFreeContext());
// If the registry is defined, return it, job done.
if (res.recall().size())
......@@ -53,7 +53,7 @@ public:
} catch (NotAllocatedEx &) {
// If we get here, the job pool is not created yet, so we have to do it:
// lock the entry again, for writing
cta::objectstore::serializers::JobPool res;
serializers::JobPool res;
ContextHandle ctx = agent.getFreeContext();
lockExclusiveAndRead(res, ctx);
// If the registry is already defined, somebody was faster. We're done.
......@@ -68,7 +68,7 @@ public:
agent.addToIntend(selfName(), FIFOName, "recallFIFO");
// The potential object can now be garbage collected if we die from here.
// Create the object, then lock. The name should be unique, so no race.
cta::objectstore::serializers::JobPool jps;
serializers::JobPool jps;
jps.set_migration("");
jps.set_recall("");
writeChild(FIFOName, jps);
......
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
namespace cta { namespace objectstore {
class RecallJob: private ObjectOps<serializers::RecallJob> {
public:
static std::string create(const std::string & container,
const std::string & source, const std::string & destination, Agent & agent) {
serializers::RecallJob rjs;