Commit 9cb21bad authored by Eric Cano's avatar Eric Cano
Browse files

Updated a few central object to the new scheme (RootEntry, JobPool, Agent and AgentRegister),

and created initial unit tests for those.
parent fbdab7e5
......@@ -2,6 +2,7 @@
#include "AgentRegister.hpp"
#include "RootEntry.hpp"
#include "exception/Errnum.hpp"
#include "ProtcolBuffersAlgorithms.hpp"
#include <string>
#include <sstream>
#include <iomanip>
......@@ -10,16 +11,9 @@
#include <cxxabi.h>
cta::objectstore::Agent::Agent(Backend & os):
ObjectOps<serializers::Agent>(os),
m_setupDone(false), m_creationDone(false), m_nextId(0) {}
ObjectOps<serializers::Agent>(os), m_nextId(0) {}
cta::objectstore::Agent::Agent(Backend & os, const std::string & typeName):
ObjectOps<serializers::Agent>(os),
m_setupDone(false), m_creationDone(false), m_nextId(0) {
setup(typeName);
}
void cta::objectstore::Agent::setup(const std::string & typeName) {
void cta::objectstore::Agent::generateName(const std::string & typeName) {
std::stringstream aid;
// Get time
time_t now = time(0);
......@@ -38,114 +32,69 @@ void cta::objectstore::Agent::setup(const std::string & typeName) {
<< std::setw(2) << localNow.tm_hour << ":"
<< std::setw(2) << localNow.tm_min << ":"
<< std::setw(2) << localNow.tm_sec;
ObjectOps<serializers::Agent>::setName(aid.str());
m_setupDone = true;
setName(aid.str());
}
void cta::objectstore::Agent::create() {
/*void cta::objectstore::Agent::create() {
if (!m_setupDone)
throw SetupNotDone("In Agent::create(): setup() not yet done");
RootEntry re(*this);
AgentRegister ar(re.allocateOrGetAgentRegister(*this), *this);
RootEntry re(m_objectStore);
AgentRegister ar(re.allocateOrGetAgentRegister(*this), m_objectStore);
ar.addIntendedElement(selfName(), *this);
serializers::Agent as;
as.set_heartbeat(0);
writeChild(selfName(), as);
ar.upgradeIntendedElementToActual(selfName(), *this);
m_creationDone = true;
}
std::string cta::objectstore::Agent::type() {
if (!m_setupDone)
throw SetupNotDone("In Agent::type(): setup() not yet done");
return m_typeName;
}
std::string cta::objectstore::Agent::name() {
if (!m_setupDone)
throw SetupNotDone("In Agent::name(): setup() not yet done");
return selfName();
}
cta::objectstore::Agent::~Agent() {
if (m_creationDone) {
try {
remove();
RootEntry re(*this);
AgentRegister ar(re.getAgentRegister(*this), *this);
ar.removeElement(selfName(),*this);
} catch (std::exception&) {
}
}
}
}*/
std::string cta::objectstore::Agent::nextId(const std::string & childType) {
if (!m_setupDone)
throw SetupNotDone("In Agent::nextId(): setup() not yet done");
std::stringstream id;
id << childType << "-" << name() << "-" << m_nextId++;
id << childType << "-" << getNameIfSet() << "-" << m_nextId++;
return id.str();
}
void cta::objectstore::Agent::addToOwnership(std::string name) {
if (!m_creationDone)
throw CreationNotDone("In Agent::addToOwnership(): creation() not yet done");
serializers::Agent as;
lockExclusiveAndRead(as);
std::string * owned = as.mutable_ownedobjects()->Add();
checkPayloadWritable();
std::string * owned = m_payload.mutable_ownedobjects()->Add();
*owned = name;
write(as);
unlock();
}
void cta::objectstore::Agent::removeFromOwnership(std::string name) {
if (!m_creationDone)
throw CreationNotDone("In Agent::removeFromOwnership(): creation() not yet done");
serializers::Agent as;
lockExclusiveAndRead(as);
checkPayloadWritable();
serializers::removeString(m_payload.mutable_ownedobjects(), name);
/*
bool found;
do {
found = false;
for (int i=0; i<as.mutable_ownedobjects()->size(); i++) {
if (name == *as.mutable_ownedobjects(i)) {
for (int i=0; i<m_payload.mutable_ownedobjects()->size(); i++) {
if (name == *m_payload.mutable_ownedobjects(i)) {
found = true;
as.mutable_ownedobjects()->SwapElements(i, as.mutable_ownedobjects()->size()-1);
as.mutable_ownedobjects()->RemoveLast();
m_payload.mutable_ownedobjects()->SwapElements(i, m_payload.mutable_ownedobjects()->size()-1);
m_payload.mutable_ownedobjects()->RemoveLast();
break;
}
}
} while (found);
write(as);
unlock();
} while (found);*/
}
std::list<std::string>
cta::objectstore::Agent::getOwnershipLog() {
if (!m_creationDone)
throw CreationNotDone("In Agent::getIntentLog(): creation() not yet done");
serializers::Agent as;
getPayloadFromObjectStoreAutoLock(as);
cta::objectstore::Agent::getOwnershipList() {
checkPayloadReadable();
std::list<std::string> ret;
for (int i=0; i<as.ownedobjects_size(); i++) {
ret.push_back(as.ownedobjects(i));
for (int i=0; i<m_payload.ownedobjects_size(); i++) {
ret.push_back(m_payload.ownedobjects(i));
}
return ret;
}
cta::objectstore::Backend & cta::objectstore::Agent::objectStore() {
return ObjectOps<serializers::Agent>::objectStore();
void cta::objectstore::Agent::bumpHeartbeat() {
checkPayloadWritable();
m_payload.set_heartbeat(m_payload.heartbeat()+1);
}
void cta::objectstore::Agent::heartbeat(Agent& agent) {
serializers::Agent as;
lockExclusiveAndRead(as);
as.set_heartbeat(as.heartbeat()+1);
write(as);
unlock();
uint64_t cta::objectstore::Agent::getHeartbeatCount() {
checkPayloadReadable();
return m_payload.heartbeat();
}
uint64_t cta::objectstore::Agent::getHeartbeatCount(Agent& agent) {
serializers::Agent as;
getPayloadFromObjectStoreAutoLock(as);
return as.heartbeat();
}
......@@ -20,39 +20,14 @@ namespace cta { namespace objectstore {
class Agent: protected ObjectOps<serializers::Agent> {
public:
Agent(Backend & os);
Agent(Backend & os, const std::string & typeName);
void setup(const std::string & typeName);
class SetupNotDone: public cta::exception::Exception {
public:
SetupNotDone(const std::string & w): cta::exception::Exception(w) {}
};
class CreationNotDone: public cta::exception::Exception {
public:
CreationNotDone(const std::string & w): cta::exception::Exception(w) {}
};
class ObserverOnly: public cta::exception::Exception {
public:
ObserverOnly(const std::string & w): cta::exception::Exception(w) {}
};
void create();
std::string type();
std::string name();
void flushContexts();
~Agent();
void generateName(const std::string & typeName);
std::string nextId(const std::string & childType);
class ScopedIntent {
void registerSelf();
/* class ScopedIntent {
public:
ScopedIntent(Agent & agent, std::string container, std::string name, serializers::ObjectType objectType):
m_agent(agent), m_container(container), m_name(name), m_objectType(objectType), m_present(false) {
......@@ -76,9 +51,9 @@ public:
std::string m_name;
serializers::ObjectType m_objectType;
bool m_present;
};
};*/
class ScopedOwnership {
/*class ScopedOwnership {
public:
ScopedOwnership(Agent & agent, std::string name):
m_agent(agent), m_name(name), m_present(false) {
......@@ -100,34 +75,21 @@ public:
Agent & m_agent;
std::string m_name;
bool m_present;
};
};*/
void addToOwnership(std::string name);
void removeFromOwnership(std::string name);
class ownedObject {
public:
ownedObject(const std::string & c,
const std::string & n):container(c), name(n) {}
std::string container;
std::string name;
};
std::list<std::string> getOwnershipLog();
Backend & objectStore();
std::list<std::string> getOwnershipList();
std::string dump(Agent & agent);
std::string dump();
void heartbeat(Agent & agent);
void bumpHeartbeat();
uint64_t getHeartbeatCount(Agent & agent);
uint64_t getHeartbeatCount();
private:
std::string m_typeName;
bool m_setupDone;
bool m_creationDone;
uint64_t m_nextId;
};
......
#include "AgentRegister.hpp"
#include "ProtcolBuffersAlgorithms.hpp"
cta::objectstore::AgentRegister::AgentRegister(const std::string & name, Agent & agent):
ObjectOps<serializers::AgentRegister>(agent.objectStore(), name) {
// Check that the entry is present and readable (depending on implementation
// of object store, locking might or might not succeed)
serializers::AgentRegister rs;
getPayloadFromObjectStoreAutoLock(rs);
}
cta::objectstore::AgentRegister::AgentRegister(const std::string & name, Backend & os):
ObjectOps<serializers::AgentRegister>(os, name) {}
void cta::objectstore::AgentRegister::addElement (std::string name, Agent & agent) {
serializers::AgentRegister rs;
lockExclusiveAndRead(rs);
rs.add_elements(name);
write(rs);
unlock();
void cta::objectstore::AgentRegister::addElement (std::string name) {
checkPayloadWritable();
std::string * ag = m_payload.mutable_elements()->Add();
*ag = name;
}
void cta::objectstore::AgentRegister::removeElement (const std::string & name, Agent & agent) {
serializers::AgentRegister rs;
lockExclusiveAndRead(rs);
bool found;
do {
found = false;
for (int i=0; i<rs.mutable_elements()->size(); i++) {
if (name == *rs.mutable_elements(i)) {
found = true;
rs.mutable_elements()->SwapElements(i, rs.elements_size()-1);
rs.mutable_elements()->RemoveLast();
break;
}
}
} while (found);
write(rs);
unlock();
void cta::objectstore::AgentRegister::removeElement (const std::string & name) {
checkPayloadReadable();
serializers::removeString(m_payload.mutable_elements(), name);
}
void cta::objectstore::AgentRegister::addIntendedElement(std::string name, Agent& agent) {
serializers::AgentRegister rs;
lockExclusiveAndRead(rs);
rs.add_intendedelements(name);
write(rs);
unlock();
void cta::objectstore::AgentRegister::addIntendedElement(std::string name) {
checkPayloadWritable();
std::string * ag = m_payload.mutable_intendedelements()->Add();
*ag = name;
}
void cta::objectstore::AgentRegister::upgradeIntendedElementToActual(std::string name, Agent& agent) {
serializers::AgentRegister rs;
lockExclusiveAndRead(rs);
bool found;
do {
found = false;
for (int i=0; i<rs.mutable_intendedelements()->size(); i++) {
if (name == *rs.mutable_intendedelements(i)) {
found = true;
rs.mutable_intendedelements()->SwapElements(i, rs.intendedelements_size()-1);
rs.mutable_intendedelements()->RemoveLast();
break;
}
}
} while (found);
rs.add_elements(name);
write(rs);
unlock();
void cta::objectstore::AgentRegister::upgradeIntendedElementToActual(std::string name) {
checkPayloadWritable();
serializers::removeString(m_payload.mutable_intendedelements(), name);
std::string * ag = m_payload.mutable_elements()->Add();
*ag = name;
}
void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& name, Agent& agent) {
serializers::AgentRegister rs;
lockExclusiveAndRead(rs);
bool found;
do {
found = false;
for (int i=0; i<rs.mutable_intendedelements()->size(); i++) {
if (name == *rs.mutable_intendedelements(i)) {
found = true;
rs.mutable_intendedelements()->SwapElements(i, rs.intendedelements_size()-1);
rs.mutable_intendedelements()->RemoveLast();
break;
}
}
} while (found);
write(rs);
unlock();
void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& name) {
checkPayloadWritable();
serializers::removeString(m_payload.mutable_intendedelements(), name);
}
std::list<std::string> cta::objectstore::AgentRegister::getElements(Agent & agent) {
serializers::AgentRegister rs;
getPayloadFromObjectStoreAutoLock(rs);
std::list<std::string> cta::objectstore::AgentRegister::getElements() {
std::list<std::string> ret;
for (int i=0; i<rs.elements_size(); i++) {
ret.push_back(rs.elements(i));
for (int i=0; i<m_payload.elements_size(); i++) {
ret.push_back(m_payload.elements(i));
}
return ret;
}
std::string cta::objectstore::AgentRegister::dump(Agent & agent) {
serializers::AgentRegister rs;
getPayloadFromObjectStoreAutoLock(rs);
std::string cta::objectstore::AgentRegister::dump() {
checkPayloadReadable();
std::stringstream ret;
ret<< "<<<< AgentRegister " << selfName() << " dump start" << std::endl
<< "Array size=" << rs.elements_size() << std::endl;
for (int i=0; i<rs.elements_size(); i++) {
ret << "element[" << i << "]=" << rs.elements(i) << std::endl;
ret<< "<<<< AgentRegister " << getNameIfSet() << " dump start" << std::endl
<< "Array size=" << m_payload.elements_size() << std::endl;
for (int i=0; i<m_payload.elements_size(); i++) {
ret << "element[" << i << "]=" << m_payload.elements(i) << std::endl;
}
ret << "Intent array size=" << rs.intendedelements_size() << std::endl;
for (int i=0; i<rs.intendedelements_size(); i++) {
ret << "intendedElement[" << i << "]=" << rs.intendedelements(i) << std::endl;
ret << "Intent array size=" << m_payload.intendedelements_size() << std::endl;
for (int i=0; i<m_payload.intendedelements_size(); i++) {
ret << "intendedElement[" << i << "]=" << m_payload.intendedelements(i) << std::endl;
}
ret<< ">>>> AgentRegister " << selfName() << " dump end" << std::endl;
ret<< ">>>> AgentRegister " << getNameIfSet() << " dump end" << std::endl;
return ret.str();
}
\ No newline at end of file
#pragma once
#include "ObjectOps.hpp"
#include "Agent.hpp"
#include <algorithm>
#include <list>
namespace cta { namespace objectstore {
class Backend;
class Agent;
class AgentRegister: private ObjectOps<serializers::AgentRegister> {
class AgentRegister: public ObjectOps<serializers::AgentRegister> {
public:
AgentRegister(const std::string & name, Agent & agent);
void addElement (std::string name, Agent & agent);
void removeElement (const std::string & name, Agent & agent);
void addIntendedElement (std::string name, Agent & agent);
void upgradeIntendedElementToActual(std::string name, Agent & agent);
void removeIntendedElement (const std::string & name, Agent & agent);
std::list<std::string> getElements(Agent & agent);
std::string dump(Agent & agent);
AgentRegister(const std::string & name, Backend & os);
void addElement (std::string name);
void removeElement (const std::string & name);
void addIntendedElement (std::string name);
void upgradeIntendedElementToActual(std::string name);
void removeIntendedElement (const std::string & name);
std::list<std::string> getElements();
std::string dump();
};
}}
\ No newline at end of file
......@@ -57,5 +57,10 @@ TEST_P(BackendAbstractTest, ParametersInterface) {
}
cta::objectstore::BackendVFS osVFS;
#define TEST_RADOS 0
#if TEST_RADOS
cta::objectstore::BackendRados osRados("tapetest", "tapetest");
INSTANTIATE_TEST_CASE_P(BackendTest, BackendAbstractTest, ::testing::Values(&osVFS, &osRados));
#else
INSTANTIATE_TEST_CASE_P(BackendTest, BackendAbstractTest, ::testing::Values((cta::objectstore::Backend*)&osVFS));
#endif
\ No newline at end of file
......@@ -12,12 +12,13 @@ PROTOBUF_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles})
add_library (CTAObjectStore
${CTAProtoSources}
# RootEntry.cpp
# Agent.cpp
# AgentRegister.cpp
RootEntry.cpp
Agent.cpp
AgentRegister.cpp
BackendVFS.cpp
BackendRados.cpp
ObjectOps.cpp
ProtocolBuffersAlgorithms.cpp
exception/Backtrace.cpp
exception/Errnum.cpp
exception/Exception.cpp
......@@ -52,8 +53,9 @@ add_library (CTAObjectStore
# protobuf rados CTAObjectStore)
set(ObjectStoreUnitTests
BackendTest.cpp)
# RootEntryTest.cpp)
BackendTest.cpp
RootEntryTest.cpp
)
add_executable(unitTests unitTests.cpp ${ObjectStoreUnitTests})
target_link_libraries(unitTests
......
#pragma once
#include "Backend.hpp"
#include "ObjectOps.hpp"
#include "FIFO.hpp"
#include "Agent.hpp"
namespace cta { namespace objectstore {
class JobPool: private ObjectOps<serializers::JobPool> {
class JobPool: public ObjectOps<serializers::JobPool> {
public:
JobPool(const std::string & name, Agent & agent):
ObjectOps<serializers::JobPool>(agent.objectStore(), name) {
serializers::JobPool jps;
getPayloadFromObjectStoreAutoLock(jps, agent.getFreeContext());
}
JobPool(const std::string & name, Backend & os):
ObjectOps<serializers::JobPool>(os, name) {}
void PostRecallJob (const std::string & job, Agent & agent) {
FIFO recallFIFO(allocateOrGetRecallFIFO(agent), agent);
recallFIFO.push(job, agent);
/*FIFO recallFIFO(allocateOrGetRecallFIFO(agent), agent);
recallFIFO.push(job, agent);*/
}
class NotAllocatedEx: public cta::exception::Exception {
......@@ -25,24 +21,20 @@ public:
};
std::string dump(Agent & agent) {
serializers::JobPool jps;
getPayloadFromObjectStoreAutoLock(jps, agent.getFreeContext());
checkPayloadReadable();
std::stringstream ret;
ret << "<<<< JobPool " << selfName() << " dump start" << std::endl
<< "Migration=" << jps.migration() << std::endl
<< "Recall=" << jps.recall() << std::endl
<< "RecallCounter=" << jps.recallcounter() << std::endl;
ret << ">>>> JobPool " << selfName() << " dump end" << std::endl;
ret << "<<<< JobPool " << getNameIfSet() << " dump start" << std::endl
<< "Migration=" << m_payload.migration() << std::endl
<< "Recall=" << m_payload.recall() << std::endl
<< "RecallCounter=" << m_payload.recallcounter() << std::endl;
ret << ">>>> JobPool " << getNameIfSet() << " dump end" << std::endl;
return ret.str();
}
std::string getRecallFIFO (Agent & agent) {
// Check if the recall FIFO exists
serializers::JobPool res;
getPayloadFromObjectStoreAutoLock(res, agent.getFreeContext());
// If the registry is defined, return it, job done.
if (res.recall().size())
return res.recall();
std::string getRecallFIFO () {
checkPayloadReadable();
if (m_payload.recall().size())
return m_payload.recall();
throw NotAllocatedEx("In RootEntry::getJobPool: jobPool not yet allocated");
}
......@@ -50,9 +42,10 @@ public:
std::string allocateOrGetRecallFIFO(Agent & agent) {
// Check if the job pool exists
try {
return getRecallFIFO(agent);
return getRecallFIFO();
} catch (NotAllocatedEx &) {
// If we get here, the job pool is not created yet, so we have to do it:
throw;
/*// If we get here, the job pool is not created yet, so we have to do it:
// lock the entry again, for writing
serializers::JobPool res;
ContextHandle & ctx = agent.getFreeContext();
......@@ -78,17 +71,14 @@ public:
write(res);
// release the lock, and return the register name
unlock(ctx);
return FIFOName;
return FIFOName;*/
}
}
std::string getRecallCounter (Agent & agent) {
// Check if the recall FIFO exists
serializers::JobPool res;
getPayloadFromObjectStoreAutoLock(res, agent.getFreeContext());
// If the registry is defined, return it, job done.
if (res.recallcounter().size())
return res.recallcounter();
checkPayloadReadable();
if (m_payload.recallcounter().size())
return m_payload.recallcounter();
throw NotAllocatedEx("In RootEntry::getRecallCounter: recallCounter not yet allocated");
}
......@@ -97,6 +87,7 @@ public:
try {
return getRecallCounter(agent);
} catch (NotAllocatedEx &) {
throw;/*
// If we get here, the job pool is not created yet, so we have to do it:
// lock the entry again, for writing
serializers::JobPool res;
......@@ -123,7 +114,7 @@ public:
write(res);
// release the lock, and return the register name
unlock(ctx);
return recallCounterName;
return recallCounterName;*/
}
}
......
......@@ -14,7 +14,9 @@ class ObjectOpsBase {
friend class ScopedExclusiveLock;
protected: