Commit 235e64ef authored by Eric Cano's avatar Eric Cano
Browse files

Adapted the existing object code to the backend interface.

parent 47298db7
#include "Agent.hpp"
#include "AgentRegister.hpp"
#include "RootEntry.hpp"
#include "exception/Errnum.hpp"
#include <string>
#include <sstream>
#include <iomanip>
#include <sys/syscall.h>
#include <ctime>
#include <cxxabi.h>
cta::objectstore::Agent::Agent(ObjectStore & os):
cta::objectstore::Agent::Agent(Backend & os):
ObjectOps<serializers::Agent>(os),
m_setupDone(false), m_creationDone(false), m_observerVersion(false), m_nextId(0) {}
m_setupDone(false), m_creationDone(false), m_nextId(0) {}
cta::objectstore::Agent::Agent(ObjectStore & os, const std::string & typeName):
cta::objectstore::Agent::Agent(Backend & os, const std::string & typeName):
ObjectOps<serializers::Agent>(os),
m_setupDone(false), m_creationDone(false), m_observerVersion(false), m_nextId(0) {
m_setupDone(false), m_creationDone(false), m_nextId(0) {
setup(typeName);
}
......@@ -47,8 +49,7 @@ void cta::objectstore::Agent::create() {
AgentRegister ar(re.allocateOrGetAgentRegister(*this), *this);
ar.addIntendedElement(selfName(), *this);
serializers::Agent as;
as.set_name(selfName());
as.set_heartbeatcount(0);
as.set_heartbeat(0);
writeChild(selfName(), as);
ar.upgradeIntendedElementToActual(selfName(), *this);
m_creationDone = true;
......@@ -66,17 +67,8 @@ std::string cta::objectstore::Agent::name() {
return selfName();
}
void cta::objectstore::Agent::flushContexts() {
for(size_t i=0; i< c_handleCount; i++) {
m_contexts[i].release();
}
}
cta::objectstore::Agent::~Agent() {
for (size_t i=0; i < c_handleCount; i++) {
m_contexts[i].release();
}
if (m_creationDone && !m_observerVersion) {
if (m_creationDone) {
try {
remove();
RootEntry re(*this);
......@@ -90,142 +82,70 @@ cta::objectstore::Agent::~Agent() {
std::string cta::objectstore::Agent::nextId(const std::string & childType) {
if (!m_setupDone)
throw SetupNotDone("In Agent::nextId(): setup() not yet done");
if (m_observerVersion)
throw ObserverOnly("In Agent::nextId(): this object is observer only");
std::stringstream id;
id << childType << "-" << name() << "-" << m_nextId++;
return id.str();
}
cta::objectstore::ContextHandle &
cta::objectstore::Agent::getFreeContext() {
for (size_t i=0; i < c_handleCount; i++) {
if (!m_contexts[i].isSet())
return m_contexts[i];
}
throw cta::exception::Exception("Could not find free context slot");
}
void cta::objectstore::Agent::addToIntend (std::string container, std::string name, serializers::ObjectType objectType) {
if (!m_creationDone)
throw CreationNotDone("In Agent::addToIntend(): creation() not yet done");
serializers::Agent as;
ContextHandle & ctx = getFreeContext();
lockExclusiveAndRead(as, ctx, "Agent::addToIntend");
serializers::ObjectCreationIntent * oca =
as.mutable_creationintent()->Add();
oca->set_container(container);
oca->set_name(name);
oca->set_type(objectType);
write(as);
unlock(ctx, "Agent::addToIntend");
}
void cta::objectstore::Agent::removeFromIntent (std::string container, std::string name, serializers::ObjectType objectType) {
if (!m_creationDone)
throw CreationNotDone("In Agent::removeFromIntent(): creation() not yet done");
serializers::Agent as;
ContextHandle & ctx = getFreeContext();
lockExclusiveAndRead(as, ctx, "Agent::removeFromIntent");
bool found;
do {
found = false;
for (int i=0; i<as.mutable_creationintent()->size(); i++) {
if (container == as.mutable_creationintent(i)->container() &&
name == as.mutable_creationintent(i)->name() &&
objectType == as.mutable_creationintent(i)->type()) {
found = true;
as.mutable_creationintent()->SwapElements(i, as.creationintent_size()-1);
as.mutable_creationintent()->RemoveLast();
break;
}
}
} while (found);
write(as);
unlock(ctx, "Agent::removeFromIntent");
}
void cta::objectstore::Agent::addToOwnership(std::string name, serializers::ObjectType objectType) {
void cta::objectstore::Agent::addToOwnership(std::string name) {
if (!m_creationDone)
throw CreationNotDone("In Agent::addToOwnership(): creation() not yet done");
serializers::Agent as;
ContextHandle & ctx = getFreeContext();
lockExclusiveAndRead(as, ctx, __func__);
serializers::ObjectOwnershipIntent * ooi =
as.mutable_ownershipintent()->Add();
ooi->set_name(name);
ooi->set_type(objectType);
lockExclusiveAndRead(as);
std::string * owned = as.mutable_ownedobjects()->Add();
*owned = name;
write(as);
unlock(ctx);
unlock();
}
void cta::objectstore::Agent::removeFromOwnership(std::string name, serializers::ObjectType objectType) {
void cta::objectstore::Agent::removeFromOwnership(std::string name) {
if (!m_creationDone)
throw CreationNotDone("In Agent::removeFromOwnership(): creation() not yet done");
serializers::Agent as;
ContextHandle & ctx = getFreeContext();
lockExclusiveAndRead(as, ctx, __func__);
lockExclusiveAndRead(as);
bool found;
do {
found = false;
for (int i=0; i<as.mutable_ownershipintent()->size(); i++) {
if (name == as.mutable_ownershipintent(i)->name() &&
objectType == as.mutable_ownershipintent(i)->type()) {
for (int i=0; i<as.mutable_ownedobjects()->size(); i++) {
if (name == *as.mutable_ownedobjects(i)) {
found = true;
as.mutable_ownershipintent()->SwapElements(i, as.ownershipintent_size()-1);
as.mutable_ownershipintent()->RemoveLast();
as.mutable_ownedobjects()->SwapElements(i, as.mutable_ownedobjects()->size()-1);
as.mutable_ownedobjects()->RemoveLast();
break;
}
}
} while (found);
write(as);
unlock(ctx);
unlock();
}
std::list<cta::objectstore::Agent::intentEntry>
cta::objectstore::Agent::getIntentLog() {
if (!m_creationDone)
throw CreationNotDone("In Agent::getIntentLog(): creation() not yet done");
serializers::Agent as;
updateFromObjectStore(as, getFreeContext());
std::list<intentEntry> ret;
for (int i=0; i<as.creationintent_size(); i++) {
ret.push_back(intentEntry(as.creationintent(i).container(),
as.creationintent(i).name(),
as.creationintent(i).type()));
}
return ret;
}
std::list<cta::objectstore::Agent::ownershipEntry>
std::list<std::string>
cta::objectstore::Agent::getOwnershipLog() {
if (!m_creationDone)
throw CreationNotDone("In Agent::getOwnershipLog(): creation() not yet done");
throw CreationNotDone("In Agent::getIntentLog(): creation() not yet done");
serializers::Agent as;
updateFromObjectStore(as, getFreeContext());
std::list<ownershipEntry> ret;
for (int i=0; i<as.creationintent_size(); i++) {
ret.push_back(ownershipEntry(as.creationintent(i).name(),
as.creationintent(i).type()));
updateFromObjectStore(as);
std::list<std::string> ret;
for (int i=0; i<as.ownedobjects_size(); i++) {
ret.push_back(as.ownedobjects(i));
}
return ret;
}
cta::objectstore::ObjectStore & cta::objectstore::Agent::objectStore() {
cta::objectstore::Backend & cta::objectstore::Agent::objectStore() {
return ObjectOps<serializers::Agent>::objectStore();
}
void cta::objectstore::Agent::heartbeat(Agent& agent) {
ContextHandle & context = agent.getFreeContext();
serializers::Agent as;
lockExclusiveAndRead(as, context, __func__);
as.set_heartbeatcount(as.heartbeatcount()+1);
lockExclusiveAndRead(as);
as.set_heartbeat(as.heartbeat()+1);
write(as);
unlock(context);
unlock();
}
uint64_t cta::objectstore::Agent::getHeartbeatCount(Agent& agent) {
serializers::Agent as;
updateFromObjectStore(as, agent.getFreeContext());
return as.heartbeatcount();
updateFromObjectStore(as);
return as.heartbeat();
}
#pragma once
#include "ObjectStoreChoice.hpp"
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include "utils/Timer.hpp"
#include <cxxabi.h>
#include <list>
namespace cta { namespace objectstore {
......@@ -19,9 +19,9 @@ namespace cta { namespace objectstore {
class Agent: protected ObjectOps<serializers::Agent> {
public:
Agent(ObjectStore & os);
Agent(Backend & os);
Agent(ObjectStore & os, const std::string & typeName);
Agent(Backend & os, const std::string & typeName);
void setup(const std::string & typeName);
......@@ -52,18 +52,16 @@ public:
std::string nextId(const std::string & childType);
ContextHandle & getFreeContext();
class ScopedIntent {
public:
ScopedIntent(Agent & agent, std::string container, std::string name, serializers::ObjectType objectType):
m_agent(agent), m_container(container), m_name(name), m_objectType(objectType), m_present(false) {
m_agent.addToIntend(m_container, m_name, m_objectType);
m_agent.addToOwnership(m_name);
m_present = true;
}
void removeFromIntent() {
if(!m_present) return;
m_agent.removeFromIntent(m_container, m_name, m_objectType);
m_agent.removeFromOwnership(m_name);
m_present = false;
}
~ScopedIntent() {
......@@ -82,14 +80,14 @@ public:
class ScopedOwnership {
public:
ScopedOwnership(Agent & agent, std::string name, serializers::ObjectType objectType):
m_agent(agent), m_name(name), m_objectType(objectType), m_present(false) {
m_agent.addToOwnership(m_name, m_objectType);
ScopedOwnership(Agent & agent, std::string name):
m_agent(agent), m_name(name), m_present(false) {
m_agent.addToOwnership(m_name);
m_present = true;
}
void removeFromOwnership() {
if(!m_present) return;
m_agent.removeFromOwnership( m_name, m_objectType);
m_agent.removeFromOwnership( m_name);
m_present = false;
}
~ScopedOwnership() {
......@@ -101,41 +99,24 @@ public:
private:
Agent & m_agent;
std::string m_name;
serializers::ObjectType m_objectType;
bool m_present;
};
void addToIntend (std::string container, std::string name, serializers::ObjectType objectType);
void removeFromIntent (std::string container, std::string name, serializers::ObjectType objectType);
void addToOwnership(std::string name, serializers::ObjectType objectType);
void addToOwnership(std::string name);
void removeFromOwnership(std::string name, serializers::ObjectType objectType);
void removeFromOwnership(std::string name);
class intentEntry {
class ownedObject {
public:
intentEntry(const std::string & c,
const std::string & n,
serializers::ObjectType t):container(c), name(n), objectType(t) {}
ownedObject(const std::string & c,
const std::string & n):container(c), name(n) {}
std::string container;
std::string name;
serializers::ObjectType objectType;
};
class ownershipEntry {
public:
ownershipEntry(const std::string & n,
serializers::ObjectType t):name(n), objectType(t) {}
std::string name;
serializers::ObjectType objectType;
};
std::list<intentEntry> getIntentLog();
std::list<ownershipEntry> getOwnershipLog();
std::list<std::string> getOwnershipLog();
ObjectStore & objectStore();
Backend & objectStore();
std::string dump(Agent & agent);
......@@ -147,10 +128,7 @@ private:
std::string m_typeName;
bool m_setupDone;
bool m_creationDone;
bool m_observerVersion;
uint64_t m_nextId;
static const size_t c_handleCount = 100;
ContextHandle m_contexts[c_handleCount];
};
}}
\ No newline at end of file
......@@ -5,22 +5,20 @@ ObjectOps<serializers::AgentRegister>(agent.objectStore(), name) {
// Check that the entry is present and readable (depending on implementation
// of object store, locking might or might not succeed)
serializers::AgentRegister rs;
updateFromObjectStore(rs, agent.getFreeContext());
updateFromObjectStore(rs);
}
void cta::objectstore::AgentRegister::addElement (std::string name, Agent & agent) {
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context, __func__);
lockExclusiveAndRead(rs);
rs.add_elements(name);
write(rs);
unlock(context);
unlock();
}
void cta::objectstore::AgentRegister::removeElement (const std::string & name, Agent & agent) {
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context, __func__);
lockExclusiveAndRead(rs);
bool found;
do {
found = false;
......@@ -34,22 +32,20 @@ void cta::objectstore::AgentRegister::removeElement (const std::string & name,
}
} while (found);
write(rs);
unlock(context);
unlock();
}
void cta::objectstore::AgentRegister::addIntendedElement(std::string name, Agent& agent) {
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context, __func__);
lockExclusiveAndRead(rs);
rs.add_intendedelements(name);
write(rs);
unlock(context);
unlock();
}
void cta::objectstore::AgentRegister::upgradeIntendedElementToActual(std::string name, Agent& agent) {
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context, __func__);
lockExclusiveAndRead(rs);
bool found;
do {
found = false;
......@@ -64,14 +60,13 @@ void cta::objectstore::AgentRegister::upgradeIntendedElementToActual(std::string
} while (found);
rs.add_elements(name);
write(rs);
unlock(context);
unlock();
}
void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& name, Agent& agent) {
serializers::AgentRegister rs;
ContextHandle & context = agent.getFreeContext();
lockExclusiveAndRead(rs, context, __func__);
lockExclusiveAndRead(rs);
bool found;
do {
found = false;
......@@ -85,14 +80,14 @@ void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& n
}
} while (found);
write(rs);
unlock(context);
unlock();
}
std::list<std::string> cta::objectstore::AgentRegister::getElements(Agent & agent) {
serializers::AgentRegister rs;
updateFromObjectStore(rs, agent.getFreeContext());
updateFromObjectStore(rs);
std::list<std::string> ret;
for (int i=0; i<rs.elements_size(); i++) {
ret.push_back(rs.elements(i));
......@@ -102,7 +97,7 @@ std::list<std::string> cta::objectstore::AgentRegister::getElements(Agent & agen
std::string cta::objectstore::AgentRegister::dump(Agent & agent) {
serializers::AgentRegister rs;
updateFromObjectStore(rs, agent.getFreeContext());
updateFromObjectStore(rs);
std::stringstream ret;
ret<< "<<<< AgentRegister " << selfName() << " dump start" << std::endl
<< "Array size=" << rs.elements_size() << std::endl;
......
#include "BackendAbstractTests.hpp"
#include "BackendTest.hpp"
#include "BackendVFS.hpp"
#include "BackendRados.hpp"
#include "exception/Exception.hpp"
......@@ -31,6 +31,31 @@ TEST_P(BackendAbstractTest, BasicReadWrite) {
ASSERT_EQ(false, m_os->exists(testObjectName));
}
TEST_P(BackendAbstractTest, LockingInterface) {
std::cout << "Type=" << m_os->typeName() << std::endl;
const std::string testObjectName = "testObject";
m_os->create(testObjectName, "");
{
// If we don't scope the object, the release will blow up after
// removal of the file.
std::auto_ptr<cta::objectstore::Backend::ScopedLock> lock(
m_os->lockExclusive(testObjectName));
}
{
std::auto_ptr<cta::objectstore::Backend::ScopedLock> lock(
m_os->lockExclusive(testObjectName));
lock->release();
}
m_os->remove(testObjectName);
}
TEST_P(BackendAbstractTest, ParametersInterface) {
std::cout << "Type=" << m_os->typeName() << std::endl;
std::auto_ptr<cta::objectstore::Backend::Parameters> params(
m_os->getParams());
std::cout << params->toStr() << std::endl;
}
cta::objectstore::BackendVFS osVFS;
cta::objectstore::BackendRados osRados("tapetest", "tapetest");
INSTANTIATE_TEST_CASE_P(BackendTest, BackendAbstractTest, ::testing::Values(&osVFS, &osRados));
#include <gtest/gtest.h>
#include "BackendVFS.hpp"
#include "exception/Exception.hpp"
namespace unitTests {
TEST(BackendVFS, BasicReadWrite) {
cta::objectstore::BackendVFS bvfs;
const std::string testValue = "1234";
const std::string testSecondValue = "1234";
const std::string testObjectName = "testObject";
// Check we can verify the absence of an object
ASSERT_EQ(false, bvfs.exists(testObjectName));
// Check that an update attempt fails on a non-existing object
ASSERT_THROW(bvfs.atomicOverwrite(testObjectName, testSecondValue), cta::exception::Exception);
// Check the creation of the obecjt
bvfs.create(testObjectName, testValue);
// Check we can validate the presence of the object
ASSERT_EQ(true, bvfs.exists(testObjectName));
// Check that we can read back after creation
ASSERT_EQ(testValue, bvfs.read(testObjectName));
bvfs.atomicOverwrite(testObjectName, testSecondValue);
// Check that an update goes through
ASSERT_EQ(testSecondValue, bvfs.read(testObjectName));
// Check that we read back the value
ASSERT_EQ(testSecondValue, bvfs.read(testObjectName));
// Check we can delete the object
ASSERT_NO_THROW(bvfs.remove(testObjectName));
// Check that the object is actually gone
ASSERT_EQ(false, bvfs.exists(testObjectName));
}
}
\ No newline at end of file
......@@ -12,9 +12,9 @@ PROTOBUF_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles})
add_library (CTAObjectStore
${CTAProtoSources}
# RootEntry.cpp
# Agent.cpp
# AgentRegister.cpp
RootEntry.cpp
Agent.cpp
AgentRegister.cpp
BackendVFS.cpp
BackendRados.cpp
exception/Backtrace.cpp
......@@ -51,9 +51,8 @@ add_library (CTAObjectStore
# protobuf rados CTAObjectStore)
set(ObjectStoreUnitTests
BackendVFSTest.cpp
BackendAbstractTests.cpp)
# RootEntryTest.cpp)
BackendTest.cpp
RootEntryTest.cpp)
add_executable(unitTests unitTests.cpp ${ObjectStoreUnitTests})
target_link_libraries(unitTests
......
#pragma once
#include "Backend.hpp"
#include "exception/Exception.hpp"
#include <memory>
namespace cta { namespace objectstore {
template <class C>
class ObjectOps {
public:
ObjectOps(ObjectStore & os, const std::string & name):m_nameSet(true), m_name(name),
ObjectOps(Backend & os, const std::string & name):m_nameSet(true), m_name(name),
m_objectStore(os) {}
ObjectOps(ObjectStore & os): m_nameSet(false), m_objectStore(os) {}
ObjectOps(Backend & os): m_nameSet(false), m_objectStore(os) {}
class NameNotSet: public cta::exception::Exception {
public:
NameNotSet(const std::string & w): cta::exception::Exception(w) {}
};
class AlreadyLocked: public cta::exception::Exception {
public:
AlreadyLocked(const std::string & w): cta::exception::Exception(w) {}
};
void setName(const std::string & name) {
m_name = name;
m_nameSet = true;
}
void updateFromObjectStore (C & val, ContextHandle & context) {
void updateFromObjectStore (C & val) {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
m_objectStore.lockShared(m_name, context);
std::auto_ptr<Backend::ScopedLock> lock (m_objectStore.lockShared(m_name));
std::string reStr = m_objectStore.read(m_name);
m_objectStore.unlock(m_name, context);
lock->release();
val.ParseFromString(reStr);
}
void lockExclusiveAndRead (C & val, ContextHandle & context, std::string where) {
void lockExclusiveAndRead (C & val) {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
m_objectStore.lockExclusive(m_name, context, where);
if(NULL != m_writeLock.get()) throw AlreadyLocked("In ObjectOps<>::updateFromObjectStore: already lcoked for write");
m_writeLock.reset(m_objectStore.lockExclusive(m_name));
// Re-read to get latest version (lock upgrade could be useful here)
std::string reStr = m_objectStore.read(m_name);
if (reStr.size())
......@@ -44,10 +52,10 @@ public:
m_objectStore.atomicOverwrite(m_name, val.SerializeAsString());
}
void unlock (ContextHandle & context, std::string where="") {
void unlock () {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
// release the lock, and return the register name
m_objectStore.unlock(m_name, context, where);
m_writeLock.reset(NULL);
}
void remove () {
......@@ -70,14 +78,15 @@ public:
return m_name;
}