Commit 1ac1bd50 authored by Eric Cano's avatar Eric Cano
Browse files

Intermediate commit: factored the base operations on ObjectOps and started to...

Intermediate commit: factored the base operations on ObjectOps and started to apply them to RootObject.
parent aa6b53cf
......@@ -124,7 +124,7 @@ std::list<std::string>
if (!m_creationDone)
throw CreationNotDone("In Agent::getIntentLog(): creation() not yet done");
serializers::Agent as;
updateFromObjectStore(as);
getPayloadFromObjectStoreAutoLock(as);
std::list<std::string> ret;
for (int i=0; i<as.ownedobjects_size(); i++) {
ret.push_back(as.ownedobjects(i));
......@@ -146,6 +146,6 @@ void cta::objectstore::Agent::heartbeat(Agent& agent) {
uint64_t cta::objectstore::Agent::getHeartbeatCount(Agent& agent) {
serializers::Agent as;
updateFromObjectStore(as);
getPayloadFromObjectStoreAutoLock(as);
return as.heartbeat();
}
......@@ -5,7 +5,7 @@ ObjectOps<serializers::AgentRegister>(agent.objectStore(), name) {
// Check that the entry is present and readable (depending on implementation
// of object store, locking might or might not succeed)
serializers::AgentRegister rs;
updateFromObjectStore(rs);
getPayloadFromObjectStoreAutoLock(rs);
}
void cta::objectstore::AgentRegister::addElement (std::string name, Agent & agent) {
......@@ -87,7 +87,7 @@ void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& n
std::list<std::string> cta::objectstore::AgentRegister::getElements(Agent & agent) {
serializers::AgentRegister rs;
updateFromObjectStore(rs);
getPayloadFromObjectStoreAutoLock(rs);
std::list<std::string> ret;
for (int i=0; i<rs.elements_size(); i++) {
ret.push_back(rs.elements(i));
......@@ -97,7 +97,7 @@ std::list<std::string> cta::objectstore::AgentRegister::getElements(Agent & agen
std::string cta::objectstore::AgentRegister::dump(Agent & agent) {
serializers::AgentRegister rs;
updateFromObjectStore(rs);
getPayloadFromObjectStoreAutoLock(rs);
std::stringstream ret;
ret<< "<<<< AgentRegister " << selfName() << " dump start" << std::endl
<< "Array size=" << rs.elements_size() << std::endl;
......
......@@ -15,12 +15,12 @@ public:
{
// check the presence of the entry
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(as, agent.getFreeContext());
}
std::string name(Agent & agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(as, agent.getFreeContext());
return as.name();
}
......@@ -88,7 +88,7 @@ public:
std::list<intentEntry> getIntentLog(Agent & agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(as, agent.getFreeContext());
std::list<intentEntry> ret;
for (int i=0; i<as.creationintent_size(); i++) {
ret.push_back(intentEntry(as.creationintent(i).container(),
......@@ -100,7 +100,7 @@ public:
std::list<ownershipEntry> getOwnershipLog(Agent & agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(as, agent.getFreeContext());
std::list<ownershipEntry> ret;
for (int i=0; i<as.ownershipintent_size(); i++) {
ret.push_back(ownershipEntry(as.ownershipintent(i).name(),
......@@ -115,13 +115,13 @@ public:
uint64_t getHeartbeatCount(Agent& agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(as, agent.getFreeContext());
return as.heartbeatcount();
}
std::string dump(Agent & agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(as, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< Agent " << selfName() << " dump start" << std::endl
<< "name=" << as.name() << std::endl
......
......@@ -12,11 +12,12 @@ PROTOBUF_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles})
add_library (CTAObjectStore
${CTAProtoSources}
RootEntry.cpp
Agent.cpp
AgentRegister.cpp
# RootEntry.cpp
# Agent.cpp
# AgentRegister.cpp
BackendVFS.cpp
BackendRados.cpp
ObjectOps.cpp
exception/Backtrace.cpp
exception/Errnum.cpp
exception/Exception.cpp
......@@ -51,8 +52,8 @@ add_library (CTAObjectStore
# protobuf rados CTAObjectStore)
set(ObjectStoreUnitTests
BackendTest.cpp
RootEntryTest.cpp)
BackendTest.cpp)
# RootEntryTest.cpp)
add_executable(unitTests unitTests.cpp ${ObjectStoreUnitTests})
target_link_libraries(unitTests
......
......@@ -13,7 +13,7 @@ public:
{
// check the presence of the entry
serializers::Counter cs;
updateFromObjectStore(cs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(cs, agent.getFreeContext());
}
void inc(Agent & agent) {
......@@ -27,7 +27,7 @@ public:
uint64_t get(Agent & agent) {
serializers::Counter cs;
updateFromObjectStore(cs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(cs, agent.getFreeContext());
return cs.count();
}
};
......
......@@ -12,7 +12,7 @@ public:
FIFO(const std::string & name, Agent & agent):
ObjectOps<serializers::FIFO>(agent.objectStore(), name) {
serializers::FIFO fs;
updateFromObjectStore(fs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(fs, agent.getFreeContext());
}
private:
......@@ -86,7 +86,7 @@ public:
std::string dump(Agent & agent) {
serializers::FIFO fs;
updateFromObjectStore(fs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(fs, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< FIFO dump start" << std::endl
<< "Read pointer=" << fs.readpointer() << std::endl
......@@ -101,7 +101,7 @@ public:
uint64_t size(Agent & agent) {
serializers::FIFO fs;
updateFromObjectStore(fs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(fs, agent.getFreeContext());
uint64_t ret = fs.name_size() - fs.readpointer();
return ret;
}
......
......@@ -11,7 +11,7 @@ public:
JobPool(const std::string & name, Agent & agent):
ObjectOps<serializers::JobPool>(agent.objectStore(), name) {
serializers::JobPool jps;
updateFromObjectStore(jps, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(jps, agent.getFreeContext());
}
void PostRecallJob (const std::string & job, Agent & agent) {
......@@ -26,7 +26,7 @@ public:
std::string dump(Agent & agent) {
serializers::JobPool jps;
updateFromObjectStore(jps, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(jps, agent.getFreeContext());
std::stringstream ret;
ret << "<<<< JobPool " << selfName() << " dump start" << std::endl
<< "Migration=" << jps.migration() << std::endl
......@@ -39,7 +39,7 @@ public:
std::string getRecallFIFO (Agent & agent) {
// Check if the recall FIFO exists
serializers::JobPool res;
updateFromObjectStore(res, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(res, agent.getFreeContext());
// If the registry is defined, return it, job done.
if (res.recall().size())
return res.recall();
......@@ -85,7 +85,7 @@ public:
std::string getRecallCounter (Agent & agent) {
// Check if the recall FIFO exists
serializers::JobPool res;
updateFromObjectStore(res, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(res, agent.getFreeContext());
// If the registry is defined, return it, job done.
if (res.recallcounter().size())
return res.recallcounter();
......
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
namespace cta { namespace objectstore {
#define MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(A) \
template <> const serializers::ObjectType ObjectOps<serializers::A>::typeId = serializers::A##_t
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RootEntry);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(AgentRegister);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Agent);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(JobPool);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RecallFIFO);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(MigrationFIFO);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RecallJob);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Counter);
#undef MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID
}}
\ No newline at end of file
......@@ -2,68 +2,181 @@
#include "Backend.hpp"
#include "exception/Exception.hpp"
#include "objectstore/cta.pb.h"
#include <memory>
#include <stdint.h>
namespace cta { namespace objectstore {
template <class C>
class ObjectOps {
public:
ObjectOps(Backend & os, const std::string & name):m_nameSet(true), m_name(name),
m_objectStore(os) {}
ObjectOps(Backend & os): m_nameSet(false), m_objectStore(os) {}
class ObjectOpsBase {
friend class ScopedLock;
friend class ScopedSharedLock;
friend class ScopedExclusiveLock;
protected:
ObjectOpsBase(Backend & os): m_nameSet(false), m_objectStore(os),
m_headerInterpreted(false), m_payloadInterpreted(false) {}
class NameNotSet: public cta::exception::Exception {
public:
NameNotSet(const std::string & w): cta::exception::Exception(w) {}
};
class NotLocked: public cta::exception::Exception {
public:
NotLocked(const std::string & w): cta::exception::Exception(w) {}
};
class AlreadyLocked: public cta::exception::Exception {
public:
AlreadyLocked(const std::string & w): cta::exception::Exception(w) {}
};
class WrongType: public cta::exception::Exception {
public:
WrongType(const std::string & w): cta::exception::Exception(w) {}
};
class NotNew: public cta::exception::Exception {
public:
NotNew(const std::string & w): cta::exception::Exception(w) {}
};
class NotFetched: public cta::exception::Exception {
public:
NotFetched(const std::string & w): cta::exception::Exception(w) {}
};
void setName(const std::string & name) {
m_name = name;
m_nameSet = true;
}
void updateFromObjectStore (C & val) {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
std::auto_ptr<Backend::ScopedLock> lock (m_objectStore.lockShared(m_name));
std::string reStr = m_objectStore.read(m_name);
lock->release();
val.ParseFromString(reStr);
std::string & getNameIfSet() {
if (!m_nameSet) {
throw NameNotSet("In ObjectOpsBase::getNameIfSet: name not set yet");
}
return m_name;
}
protected:
bool m_nameSet;
std::string m_name;
Backend & m_objectStore;
serializers::ObjectHeader m_header;
bool m_headerInterpreted;
bool m_payloadInterpreted;
bool m_locked;
bool m_lockedForWrite;
std::auto_ptr<Backend::ScopedLock> m_writeLock;
};
class ScopedLock {
public:
void release() {
m_lock.reset(NULL);
}
virtual ~ScopedLock() {
m_objectOps.m_locked = false;
m_objectOps.m_lockedForWrite = false;
}
protected:
ScopedLock(ObjectOpsBase & oo): m_objectOps(oo) {
if (m_objectOps.m_locked)
throw ObjectOpsBase::AlreadyLocked("In ScopedLock::ScopedLock: object already locked");
}
std::auto_ptr<Backend::ScopedLock> m_lock;
ObjectOpsBase & m_objectOps;
};
void lockExclusiveAndRead (C & val) {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
if(NULL != m_writeLock.get()) throw AlreadyLocked("In ObjectOps<>::updateFromObjectStore: already lcoked for write");
m_writeLock.reset(m_objectStore.lockExclusive(m_name));
// Re-read to get latest version (lock upgrade could be useful here)
std::string reStr = m_objectStore.read(m_name);
if (reStr.size())
val.ParseFromString(reStr);
class ScopedSharedLock: public ScopedLock {
public:
ScopedSharedLock(ObjectOpsBase & oo): ScopedLock(oo) {
m_lock.reset(m_objectOps.m_objectStore.lockShared(m_objectOps.getNameIfSet()));
m_objectOps.m_locked = true;
}
};
class ScopedExclusiveLock: public ScopedLock {
public:
ScopedExclusiveLock(ObjectOpsBase & oo): ScopedLock(oo) {
m_lock.reset(m_objectOps.m_objectStore.lockExclusive(m_objectOps.getNameIfSet()));
m_objectOps.m_locked = true;
m_objectOps.m_lockedForWrite = true;
}
};
template <class C>
class ObjectOps: public ObjectOpsBase {
protected:
ObjectOps(Backend & os, const std::string & name): ObjectOpsBase(os) {
setName(name);
}
void write (C & val) {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
m_objectStore.atomicOverwrite(m_name, val.SerializeAsString());
ObjectOps(Backend & os): ObjectOpsBase(os) {}
public:
void fetch() {
// Check that the object is locked, one way or another
if(!m_locked)
throw NotLocked("In ObjectOps::fetch(): object not locked");
// Get the header from the object store
getHeaderFromObjectStore();
// Interpret the data
getPayloadFromHeader();
}
void unlock () {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
// release the lock, and return the register name
m_writeLock.reset(NULL);
void commit() {
// Check that the object is locked for writing
if (!m_locked || !m_lockedForWrite)
throw NotLocked("In ObjectOps::commit(): object not locked for write");
// Serialise the payload into the header
m_header.set_payload(m_payload.SerializeAsString());
// Write the payload
m_objectStore.atomicOverwrite(getNameIfSet(), m_header.SerializeAsString());
}
protected:
void getHeaderFromObjectStore () {
m_header.ParseFromString(m_objectStore.read(getNameIfSet()));
if (m_header.type() != typeId) {
std::stringstream err;
err << "In ObjectOps::getHeaderFromObjectStore wrong object type: "
<< "found=" << m_header.type() << " expected=" << typeId;
throw cta::exception::Exception(err.str());
}
m_headerInterpreted = true;
}
void getPayloadFromHeader () {
m_payload.ParseFromString(m_header.payload());
m_payloadInterpreted = true;
}
public:
void remove () {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::remove: name not set");
// remove the object
m_objectStore.remove(m_name);
if (!m_lockedForWrite)
throw NotLocked("In ObjectOps::remove: not locked for write");
m_objectStore.remove(getNameIfSet());
}
/**
* Fill up the header and object with its default contents
*/
void initialize() {
if (m_headerInterpreted)
throw NotNew("In ObjectOps::initialize: trying to initialize an exitsting object");
m_header.set_type(typeId);
}
void insert() {
// Push the payload into the header and write the object
// We don't require locking here, as the object does not exist
// yet in the object store (and this is ensured by the )
m_header.set_payload(m_payload.SerializeAsString());
m_objectStore.create(getNameIfSet(), m_header.SerializeAsString());
}
private:
template <class C2>
void writeChild (const std::string & name, C2 & val) {
m_objectStore.create(name, val.SerializeAsString());
......@@ -82,11 +195,9 @@ public:
return m_objectStore;
}
private:
bool m_nameSet;
std::string m_name;
Backend & m_objectStore;
std::auto_ptr<Backend::ScopedLock> m_writeLock;
protected:
static const serializers::ObjectType typeId;
C m_payload;
};
}}
\ No newline at end of file
......@@ -23,7 +23,7 @@ public:
RecallJob(const std::string & name, Agent & agent):
ObjectOps<serializers::RecallJob>(agent.objectStore(), name){
serializers::RecallJob rjs;
updateFromObjectStore(rjs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(rjs, agent.getFreeContext());
}
void remove() {
......@@ -32,20 +32,20 @@ public:
std::string source(Agent & agent) {
serializers::RecallJob rjs;
updateFromObjectStore(rjs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(rjs, agent.getFreeContext());
return rjs.source();
}
std::string destination(Agent & agent) {
serializers::RecallJob rjs;
updateFromObjectStore(rjs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(rjs, agent.getFreeContext());
return rjs.destination();
}
std::string owner(Agent & agent) {
serializers::RecallJob rjs;
updateFromObjectStore(rjs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(rjs, agent.getFreeContext());
return rjs.owner();
}
};
......
......@@ -12,7 +12,7 @@ public:
// Check that the entry is present and readable (depending on implementation
// of object store, locking might or might not succeed)
serializers::Register rs;
updateFromObjectStore(rs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(rs, agent.getFreeContext());
}
void addElement (std::string name, ContextHandle & context) {
......@@ -44,7 +44,7 @@ public:
std::list<std::string> getElements(Agent & agent) {
serializers::Register rs;
updateFromObjectStore(rs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(rs, agent.getFreeContext());
std::list<std::string> ret;
for (int i=0; i<rs.elements_size(); i++) {
ret.push_back(rs.elements(i));
......@@ -54,7 +54,7 @@ public:
std::string dump(const std::string & title, Agent & agent) {
serializers::Register rs;
updateFromObjectStore(rs, agent.getFreeContext());
getPayloadFromObjectStoreAutoLock(rs, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< Register " << title << " dump start" << std::endl
<< "Array size=" << rs.elements_size() << std::endl;
......
#include "RootEntry.hpp"
#include "Agent.hpp"
#include <cxxabi.h>
// Initializer.
void cta::objectstore::RootEntry::init(Backend & os) {
// check existence of root entry before creating it. We expect read to fail.
try {
os.read(s_rootEntryName);
throw cta::exception::Exception("In RootEntry::init: root entry already exists");
} catch (std::exception&) {
}
serializers::RootEntry res;
os.create(s_rootEntryName, res.SerializeAsString());
}
// construtor, when the backend store exists.
// Checks the existence and correctness of the root entry
cta::objectstore::RootEntry::RootEntry(Agent & agent):
ObjectOps<serializers::RootEntry>(agent.objectStore(), s_rootEntryName) {
// Check that the root entry is readable.
serializers::RootEntry res;
updateFromObjectStore(res);
}
cta::objectstore::RootEntry::RootEntry(Backend & os):
ObjectOps<serializers::RootEntry>(os, s_rootEntryName) {}
// Get the name of the agent register (or exception if not available)
std::string cta::objectstore::RootEntry::getAgentRegister(Agent & agent) {
// Check if the agent register exists
serializers::RootEntry res;
updateFromObjectStore(res);
std::string cta::objectstore::RootEntry::getAgentRegister() {
// Check that the fetch was done
if (!m_payloadInterpreted)
throw ObjectOpsBase::NotFetched("In RootEntry::getAgentRegister: object not yet fetched")
// If the registry is defined, return it, job done.
if (res.agentregister().size())
return res.agentregister();
if (m_payload.agentregister().size())
return m_payload.agentregister();
throw NotAllocatedEx("In RootEntry::getAgentRegister: agentRegister not yet allocated");
}
......@@ -75,7 +59,7 @@ std::string cta::objectstore::RootEntry::allocateOrGetAgentRegister(Agent & agen
std::string cta::objectstore::RootEntry::getJobPool(Agent & agent) {
// Check if the job pool exists
serializers::RootEntry res;
updateFromObjectStore(res);
getPayloadFromObjectStoreAutoLock(res);
// If the registry is defined, return it, job done.
if (res.jobpool().size())
return res.jobpool();
......@@ -125,7 +109,7 @@ std::string cta::objectstore::RootEntry::allocateOrGetJobPool(Agent & agent) {
std::string cta::objectstore::RootEntry::dump (Agent & agent) {
std::stringstream ret;
serializers::RootEntry res;
updateFromObjectStore(res);
getPayloadFromObjectStoreAutoLock(res);
ret << "<<<< Root entry dump start" << std::endl;
if (res.has_agentregister()) ret << "agentRegister=" << res.agentregister() << std::endl;
ret << "agentRegister Intent Log size=" << res.agentregisterintentlog_size() << std::endl;
......
......@@ -8,14 +8,10 @@
namespace cta { namespace objectstore {
class RootEntry: private ObjectOps<serializers::RootEntry> {
class RootEntry: public ObjectOps<serializers::RootEntry> {
public:
// Initializer.
static void init(Backend & os);
// construtor, when the backend store exists.
// Checks the existence and correctness of the root entry
RootEntry(Agent & agent);
// construtor
RootEntry(Backend & os);
class NotAllocatedEx: public cta::exception::Exception {
public:
......@@ -23,19 +19,19 @@ public:
};
// Get the name of the agent register (or exception if not available)
std::string getAgentRegister(Agent & agent);
std::string getAgentRegister();
// Get the name of a (possibly freshly created) agent register
std::string allocateOrGetAgentRegister(Agent & agent);
// Get the name of the JobPool (or exception if not available)
std::string getJobPool(Agent & agent);
std::string getJobPool();
// Get the name of a (possibly freshly created) job pool
std::string allocateOrGetJobPool(Agent & agent);
// Dump the root entry
std::string dump (Agent & agent);
std::string dump ();