Skip to content
Snippets Groups Projects
Commit 6de36148 authored by Eric Cano's avatar Eric Cano
Browse files

The AgentVistor class has been created.

In order to simplify development, we will not turn the types into numbers.
parent 201d9a44
No related branches found
No related tags found
No related merge requests found
#pragma once
#include "Agent.hpp"
#include "AgentVisitor.hpp"
#include "RootEntry.hpp"
#include "RecallJob.hpp"
#include "JobPool.hpp"
......@@ -149,11 +149,24 @@ private:
Agent & m_agent;
void collectGarbage(const std::string & agentName) {
// When collecting the garbage of an agent, we have to iterate through its
// intended and owned objects, validate that they are still owned by the dead
// agent, and re-post them to the container where they should be (and ownership)
// is re-set to the container.
Agent ag(agentName, m_agent);
std::list<Agent::intentEntry> intendedObjects = ag.getIntentLog();
// intended and owned objects, validate that they still exist, are owned by
// the dead agent, and re-post them to the container where they should be.
try {
// If the agent entry does not exist anymore, we're done.
AgentVisitor ag(agentName, m_agent);
std::list<AgentVisitor::intentEntry> intendedObjects = ag.getIntentLog(m_agent);
for (std::list<AgentVisitor::intentEntry>::iterator i=intendedObjects.begin();
i != intendedObjects.end(); i++) {
switch (i->typeName) {
case "recallFIFO":
break;
case "RecallJob":
break;
case "jobPool":
breal;
}
}
} catch (...) {}
}
};
......
......@@ -16,16 +16,6 @@ cta::objectstore::Agent::Agent(ObjectStore & os, const std::string & typeName):
setup(typeName);
}
// Passive constructor, used for looking at existing agent records
cta::objectstore::Agent::Agent(const std::string & name, Agent & agent):
ObjectOps<serializers::Agent>(agent.objectStore(), name),
m_nextId(0), m_setupDone(true), m_creationDone(true), m_observerVersion(true)
{
// check the presence of the entry
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
}
void cta::objectstore::Agent::setup(const std::string & typeName) {
std::stringstream aid;
// Get time
......@@ -216,27 +206,6 @@ cta::objectstore::ObjectStore & cta::objectstore::Agent::objectStore() {
return ObjectOps<serializers::Agent>::objectStore();
}
std::string cta::objectstore::Agent::dump(Agent & agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< Agent " << selfName() << " dump start" << std::endl
<< "name=" << as.name() << std::endl
<< "Ownership intent size=" << as.ownershipintent_size() << std::endl;
for (int i=0; i<as.ownershipintent_size(); i++) {
ret << "ownershipIntent[" << i << "]: name=" << as.ownershipintent(i).name()
<< " type=" << as.ownershipintent(i).type() << std::endl;
}
ret << "Creation intent size=" << as.creationintent_size() << std::endl;
for (int i=0; i<as.creationintent_size(); i++) {
ret << "creationIntent[" << i << "]: name=" << as.creationintent(i).name()
<< " type=" << as.creationintent(i).type()
<< " container=" << as.creationintent(i).container() << std::endl;
}
ret<< ">>>> Agent " << selfName() << " dump end" << std::endl;
return ret.str();
}
void cta::objectstore::Agent::heartbeat(Agent& agent) {
ContextHandle & context = agent.getFreeContext();
serializers::Agent as;
......@@ -246,19 +215,5 @@ void cta::objectstore::Agent::heartbeat(Agent& agent) {
uint64_t cta::objectstore::Agent::getHeartbeatCount(Agent& agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
return as.heartbeatcount();
}
cta::objectstore::AgentWatchdog::AgentWatchdog(const std::string& agentName, Agent& agent):
m_agentVisitor(agentName, agent) {
m_hearbeatCounter = m_agentVisitor.getHeartbeatCount(agent);
}
bool cta::objectstore::AgentWatchdog::checkAlive(Agent& agent) {
uint64_t newHeartBeatCount = m_agentVisitor.getHeartbeatCount(agent);
if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 0.1)
return false;
m_hearbeatCounter = newHeartBeatCount;
return true;
}
......@@ -23,8 +23,6 @@ public:
Agent(ObjectStore & os, const std::string & typeName);
Agent(const std::string & name, Agent & agent);
void setup(const std::string & typeName);
class SetupNotDone: public cta::exception::Exception {
......@@ -102,15 +100,5 @@ private:
static const size_t c_handleCount = 100;
ContextHandleImplementation<myOS> m_contexts[c_handleCount];
};
class AgentWatchdog {
public:
AgentWatchdog(const std::string & agentName, Agent & agent);
bool checkAlive(Agent & agent);
private:
cta::utils::Timer m_timer;
Agent m_agentVisitor;
uint64_t m_hearbeatCounter;
};
}}
\ No newline at end of file
......@@ -2,12 +2,165 @@
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include <string>
namespace cta { namespace objectstore {
class AgentVisitor: private ObjectOps<serializers::Agent> {
class AgentVisitor: private ObjectOps<serializers::Agent> {
public:
AgentVisitor(const std::string & name, Agent & agent):
ObjectOps<serializers::Agent>(agent.objectStore(), name)
{
// check the presence of the entry
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
}
std::string name(Agent & agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
return as.name();
}
void removeFromIntent (std::string container, std::string name,
std::string typeName, Agent & agent) {
ContextHandle & context = agent.getFreeContext();
serializers::Agent as;
lockExclusiveAndRead(as, context);
bool found;
do {
found = false;
for (int i=0; i<as.creationintent_size(); i++) {
if (container == as.creationintent(i).container() &&
name == as.creationintent(i).name() &&
typeName == as.creationintent(i).type()) {
found = true;
as.mutable_creationintent()->SwapElements(i, as.creationintent_size()-1);
as.mutable_creationintent()->RemoveLast();
break;
}
}
} while (found);
write(as);
unlock(context);
}
void removeFromOwnership(std::string name, std::string typeName, Agent & agent) {
serializers::Agent as;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(as, context);
bool found;
do {
found = false;
for (int i=0; i<as.ownershipintent_size(); i++) {
if (name == as.ownershipintent(i).name() &&
typeName == as.ownershipintent(i).type()) {
found = true;
as.mutable_creationintent()->SwapElements(i, as.ownershipintent_size()-1);
as.mutable_creationintent()->RemoveLast();
break;
}
}
} while (found);
write(as);
unlock(context);
}
class intentEntry {
public:
intentEntry(const std::string & c,
const std::string & n,
const std::string & t):container(c), name(n), typeName(t) {}
std::string container;
std::string name;
std::string typeName;
};
class ownershipEntry {
public:
ownershipEntry(const std::string & n,
const std::string & t):name(n), typeName(t) {}
std::string name;
std::string typeName;
};
std::list<intentEntry> getIntentLog(Agent & agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
std::list<intentEntry> ret;
for (int i=0; i<as.creationintent_size(); i++) {
ret.push_back(intentEntry(as.creationintent(i).container(),
as.creationintent(i).name(),
as.creationintent(i).type()));
}
return ret;
}
std::list<ownershipEntry> getOwnershipLog(Agent & agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
std::list<ownershipEntry> ret;
for (int i=0; i<as.creationintent_size(); i++) {
ret.push_back(ownershipEntry(as.creationintent(i).name(),
as.creationintent(i).type()));
}
return ret;
}
void remove(Agent & agent) {
removeOther(selfName());
}
uint64_t getHeartbeatCount(Agent& agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
return as.heartbeatcount();
}
std::string dump(Agent & agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< Agent " << selfName() << " dump start" << std::endl
<< "name=" << as.name() << std::endl
<< "Ownership intent size=" << as.ownershipintent_size() << std::endl;
for (int i=0; i<as.ownershipintent_size(); i++) {
ret << "ownershipIntent[" << i << "]: name=" << as.ownershipintent(i).name()
<< " type=" << as.ownershipintent(i).type() << std::endl;
}
ret << "Creation intent size=" << as.creationintent_size() << std::endl;
for (int i=0; i<as.creationintent_size(); i++) {
ret << "creationIntent[" << i << "]: name=" << as.creationintent(i).name()
<< " type=" << as.creationintent(i).type()
<< " container=" << as.creationintent(i).container() << std::endl;
}
ret<< ">>>> Agent " << selfName() << " dump end" << std::endl;
return ret.str();
}
};
class AgentWatchdog {
public:
AgentWatchdog(const std::string & agentName, Agent & agent):
m_agentVisitor(agentName, agent) {
m_hearbeatCounter = m_agentVisitor.getHeartbeatCount(agent);
}
bool checkAlive(Agent & agent) {
uint64_t newHeartBeatCount = m_agentVisitor.getHeartbeatCount(agent);
if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 0.1)
return false;
m_hearbeatCounter = newHeartBeatCount;
return true;
}
private:
cta::utils::Timer m_timer;
AgentVisitor m_agentVisitor;
uint64_t m_hearbeatCounter;
};
}}
\ No newline at end of file
......@@ -20,7 +20,7 @@ public:
ret << ar.dump(agent) << std::endl;
std::list<std::string> agList = ar.getElements(agent);
for (std::list<std::string>::iterator i=agList.begin(); i!=agList.end(); i++) {
Agent a(*i, agent);
AgentVisitor a(*i, agent);
ret << a.dump(agent) << std::endl;
}
} catch (RootEntry::NotAllocatedEx &) {}
......
......@@ -12,6 +12,7 @@
#include "AgentRegister.hpp"
#include "RecallJob.hpp"
#include "Register.hpp"
#include "AgentVisitor.hpp"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment