Commit 76dd3bfb authored by Eric Cano's avatar Eric Cano
Browse files

Clarified the intent log existence and path.

The intent log exists in the root entry only for the agent register creation. Then the agent register has an agent creation intent log
as well. And finally for all other usages, the agent's intent log shall be used.

This is a working compilation checkpoint. Some intent logs still need to be removed from the schema.
parent 3b89d80e
......@@ -7,6 +7,20 @@ project(cta)
set(CMAKE_C_FLAGS_RELWITHDEBINFO "-g")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-g")
IF(NOT CMAKE_BUILD_TYPE STREQUAL "")
# If the user specifies -DCMAKE_BUILD_TYPE on the command line, take their definition and dump it in the cache
message(STATUS "Setting build type to ${CMAKE_BUILD_TYPE} as requested.")
SET(CMAKE_BUILD_TYPE ${CMAKE_BUILD_TYPE} CACHE STRING "Choose the type of build.")
ELSE()
# log choosen default (RelWithDebInfo) and set it
message(STATUS "Setting build type to 'RelWithDebInfo' as none was specified.")
message (STATUS "Override with -DCMAKE_BUILD_TYPE:STRING=Debug")
set(CMAKE_BUILD_TYPE RelWithDebInfo CACHE STRING "Choose the type of build." FORCE)
# Set the possible values of build type for cmake-gui
# this command is not yet available in SLC6's cmake 2.6
# set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release" "MinSizeRel" "RelWithDebInfo")
ENDIF(NOT CMAKE_BUILD_TYPE STREQUAL "")
set(CMAKE_DISABLE_SOURCE_CHANGES ON)
set(CMAKE_DISABLE_IN_SOURCE_BUILD ON)
list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake)
......
#pragma once
#include "ObjectStores.hpp"
#include "ObjectStoreChoice.hpp"
#include "ObjectOps.hpp"
#include "ContextHandle.hpp"
#include "objectstore/cta.pb.h"
class Agent {
/**
* Class containing agent information and managing the update of the
* agent's persitent representation in the object store.
* This object also manages the object id generator, and keeps track of
* a ContextHandles
* In all the agent is the case class for all actions.
* It handles (in the base class):
*/
class Agent: protected ObjectOps<cta::objectstore::Agent> {
public:
Agent(ObjectStore & os): m_objectStore(os) {};
~Agent() {
for (size_t i=0; i < c_handleCount; i++) {
m_contexts[i].release();
}
}
void act() {
Agent(ObjectStore & os);
Agent(ObjectStore & os, const std::string & typeName);
void setup(const std::string & typeName);
class SetupNotDone: public cta::exception::Exception {
public:
SetupNotDone(const std::string & w): cta::exception::Exception(w) {}
};
class CreationNotDone: public cta::exception::Exception {
public:
CreationNotDone(const std::string & w): cta::exception::Exception(w) {}
};
void create();
std::string type();
std::string name();
~Agent();
std::string nextId(const std::string & childType);
ContextHandleImplementation<myOS> & getFreeContext();
void addToIntend (std::string container, std::string name, std::string typeName);
void removeFromIntent (std::string container, std::string name, std::string typeName);
void addToOwnership(std::string name, std::string typeName);
void removeFromOwnership(std::string name, std::string typeName);
class intentEntry {
public:
intentEntry(const std::string & c,
const std::string & n,
const std::string & t):container(c), name(n), typeName(t) {}
std::string container;
std::string name;
std::string typeName;
};
}
class ownershipEntry {
public:
ownershipEntry(const std::string & n,
const std::string & t):name(n), typeName(t) {}
std::string name;
std::string typeName;
};
std::list<intentEntry> getIntentLog();
std::list<ownershipEntry> getOwnershipLog();
ObjectStore & objectStore();
private:
ObjectStore & m_objectStore;
std::string m_typeName;
bool m_setupDone;
bool m_creationDone;
uint64_t m_nextId;
static const size_t c_handleCount = 100;
ContextHandleImplementation<myOS> m_contexts[c_handleCount];
ContextHandleImplementation<myOS> getFreeContext() {
for (size_t i=0; i < c_handleCount; i++) {
if (!m_contexts[i].isSet())
return m_contexts[i];
}
throw cta::exception::Exception("Could not find free context slot");
}
};
\ No newline at end of file
#pragma once
#include <sstream>
#include <string>
#include <sys/syscall.h>
#include <ctime>
class AgentId {
public:
AgentId(std::string agentType): m_nextId(0) {
std::stringstream aid;
// Get time
time_t now = time(0);
struct tm localNow;
localtime_r(&now, &localNow);
// Get hostname
char host[200];
cta::exception::Errnum::throwOnMinusOne(gethostname(host, sizeof(host)),
"In AgentId::AgentId: failed to gethostname");
// gettid is a safe system call (never fails)
aid << agentType << "-" << host << "-" << syscall(SYS_gettid) << "-"
<< 1900 + localNow.tm_year
<< std::setfill('0') << std::setw(2) << 1 + localNow.tm_mon
<< localNow.tm_mday << "-"
<< localNow.tm_hour << ":"
<< localNow.tm_min << ":"
<< localNow.tm_sec;
m_agentId = aid.str();
}
std::string name() {
return m_agentId;
}
std::string nextId() {
std::stringstream id;
id << m_agentId << "-" << m_nextId++;
return id.str();
}
private:
std::string m_agentId;
uint64_t m_nextId;
};
\ No newline at end of file
......@@ -12,6 +12,9 @@ PROTOBUF_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles})
add_executable (tapeResourceMangerTest tapeResourceManagerTest.cpp
${CTAProtoSources}
RootEntry.cpp
Agent.cpp
AgentRegister.cpp
exception/Backtrace.cpp
exception/Errnum.cpp
exception/Exception.cpp
......
......@@ -2,23 +2,76 @@
#include "ObjectOps.hpp"
#include "FIFO.hpp"
#include "Agent.hpp"
class JobPool: private ObjectOps<cta::objectstore::jobPool> {
public:
JobPool(ObjectStore & os, const std::string & name, ContextHandle & context):
ObjectOps<cta::objectstore::jobPool>(os, name) {
JobPool(const std::string & name, Agent & agent):
ObjectOps<cta::objectstore::jobPool>(agent.objectStore(), name) {
cta::objectstore::jobPool jps;
updateFromObjectStore(jps, context);
updateFromObjectStore(jps, agent.getFreeContext());
}
void PostRecallJob (const std::string & string, ContextHandle & context) {
void PostRecallJob (const std::string & string, ContextHandle & context, const std::string & MigrationFIFOId) {
cta::objectstore::jobPool jps;
lockExclusiveAndRead(jps, context);
}
class NotAllocatedEx: public cta::exception::Exception {
public:
NotAllocatedEx(const std::string & context): cta::exception::Exception(context) {}
};
private:
std::string getRecallFIFO (Agent & agent) {
// Check if the recall FIFO exists
cta::objectstore::jobPool res;
updateFromObjectStore(res, agent.getFreeContext());
// If the registry is defined, return it, job done.
if (res.recall().size())
return res.recall();
throw NotAllocatedEx("In RootEntry::getJobPool: jobPool not yet allocated");
}
// Get the name of a (possibly freshly created) recall FIFO
std::string allocateOrGetJobPool(Agent & agent) {
// Check if the job pool exists
try {
return getRecallFIFO(agent);
} catch (NotAllocatedEx &) {
// If we get here, the job pool is not created yet, so we have to do it:
// lock the entry again, for writing
cta::objectstore::jobPool res;
ContextHandle ctx = agent.getFreeContext();
lockExclusiveAndRead(res, ctx);
// If the registry is already defined, somebody was faster. We're done.
if (res.recall().size()) {
unlock(ctx);
return res.recall();
}
// We will really create the register
// decide on the object's name
std::string FIFOName (agent.nextId("recallFIFO-"));
// Record the FIFO in the intent log
agent.addToIntend(selfName(), FIFOName, "recallFIFO");
// The potential object can now be garbage collected if we die from here.
// Create the object, then lock. The name should be unique, so no race.
cta::objectstore::jobPool jps;
jps.set_migration("");
jps.set_recall("");
writeChild(FIFOName, jps);
// If we lived that far, we can update the jop pool to point to the FIFO
res.set_recall(FIFOName);
agent.removeFromIntent(selfName(), FIFOName, "recallFIFO");
write(res);
// release the lock, and return the register name
unlock(ctx);
return FIFOName;
}
}
// The following functions are hidden from the user in order to provide
// higher level functionnality
//std::string get
//std::string allocateOrGetMigrationFIFO
};
\ No newline at end of file
......@@ -5,10 +5,23 @@
template <class C>
class ObjectOps {
public:
ObjectOps(ObjectStore & os, const std::string & name):m_name(name),
ObjectOps(ObjectStore & os, const std::string & name):m_nameSet(true), m_name(name),
m_objectStore(os) {}
ObjectOps(ObjectStore & os): m_nameSet(false), m_objectStore(os) {}
class NameNotSet: public cta::exception::Exception {
public:
NameNotSet(const std::string & w): cta::exception::Exception(w) {}
};
void setName(const std::string & name) {
m_name = name;
m_nameSet = true;
}
void updateFromObjectStore (C & val, ContextHandle & context) {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
m_objectStore.lockShared(m_name, context);
std::string reStr = m_objectStore.read(m_name);
m_objectStore.unlock(m_name, context);
......@@ -16,27 +29,50 @@ public:
}
void lockExclusiveAndRead (C & val, ContextHandle & context) {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
m_objectStore.lockExclusive(m_name, context);
// Re-read to get latest version (lock upgrade could be useful here)
std::string reStr = m_objectStore.read(m_name);
cta::objectstore::RootEntry res;
val.ParseFromString(reStr);
}
void write (C & val) {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
m_objectStore.atomicOverwrite(m_name, val.SerializeAsString());
}
void unlock (ContextHandle & context) {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
// release the lock, and return the register name
m_objectStore.unlock(m_name, context);
}
void remove () {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::remove: name not set");
// remove the object
m_objectStore.remove(m_name);
}
template <class C2>
void writeChild (const std::string & name, C2 & val) {
m_objectStore.atomicOverwrite(name, val.SerializeAsString());
}
void removeOther(const std::string & name) {
m_objectStore.remove(name);
}
std::string selfName() {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
return m_name;
}
ObjectStore & objectStore() {
return m_objectStore;
}
private:
bool m_nameSet;
std::string m_name;
ObjectStore & m_objectStore;
};
\ No newline at end of file
......@@ -10,6 +10,8 @@
#include <sys/syscall.h>
#include <fstream>
#include <rados/librados.hpp>
#include "exception/Exception.hpp"
#include "exception/Errnum.hpp"
class ContextHandle {
public:
......@@ -30,6 +32,7 @@ public:
virtual void lockShared(std::string name, ContextHandle & context) = 0;
virtual void lockExclusive(std::string name, ContextHandle & context) = 0;
virtual void unlock(std::string name, ContextHandle & context) = 0;
virtual void remove(std::string name) = 0;
virtual std::string path() { return ""; }
virtual std::string user() { return ""; }
virtual std::string pool() { return ""; }
......@@ -93,6 +96,7 @@ public:
::rename(tempPath.c_str(), targetPath.c_str()),
"In ObjectStoreVFS::atomicOverwrite, failed to rename the file");
}
virtual std::string read(std::string name) {
std::string path = m_root+"/" + name;
std::string ret;
......@@ -110,6 +114,11 @@ public:
return ret;
}
virtual void remove(std::string name) {
std::string path = m_root+"/" + name;
cta::exception::Errnum::throwOnNonZero(unlink(name.c_str()));
}
void lockHelper(std::string name, ContextHandle & context, int type) {
std::string path = m_root + "/" + name + ".lock";
context.set(::open(path.c_str(), O_CREAT, S_IRWXU));
......@@ -209,8 +218,11 @@ public:
bl.copy(0, size, ret);
return ret;
}
virtual void remove(std::string name) {
cta::exception::Errnum::throwOnNegative(m_radosCtx.remove(name));
}
virtual void lockExclusive(std::string name, ContextHandle & context) {
// Build a unique client name: host:thread
char buff[200];
......
......@@ -6,14 +6,14 @@
class ObjectStrucutreDumper {
public:
std::string dump(ObjectStore & os, ContextHandle & context) {
std::string dump(Agent & agent) {
std::stringstream ret;
ret << "<< Structure dump start" << std::endl;
RootEntry re(os, context);
ret << re.dump(context);
RootEntry re(agent);
ret << re.dump(agent);
try {
Register ar(os, re.getAgentRegister(context), context);
ret << ar.dump("root->agentRegister", context);
Register ar(re.getAgentRegister(agent), agent);
ret << ar.dump("root->agentRegister", agent);
} catch (RootEntry::NotAllocatedEx &) {}
ret << ">> Structure dump end" << std::endl;
return ret.str();
......
......@@ -5,12 +5,12 @@
class Register: private ObjectOps<cta::objectstore::Register> {
public:
Register(ObjectStore & os, const std::string & name, ContextHandle & context):
ObjectOps<cta::objectstore::Register>(os, name) {
Register(const std::string & name, Agent & agent):
ObjectOps<cta::objectstore::Register>(agent.objectStore(), name) {
// Check that the entry is present and readable (depending on implementation
// of object store, locking might or might not succeed)
cta::objectstore::Register rs;
updateFromObjectStore(rs, context);
updateFromObjectStore(rs, agent.getFreeContext());
}
void addElement (std::string name, ContextHandle & context) {
......@@ -40,9 +40,9 @@ public:
unlock(context);
}
std::list<std::string> getElements(ContextHandle & context) {
std::list<std::string> getElements(Agent & agent) {
cta::objectstore::Register rs;
updateFromObjectStore(rs, context);
updateFromObjectStore(rs, agent.getFreeContext());
std::list<std::string> ret;
for (int i=0; i<rs.elements_size(); i++) {
ret.push_back(rs.elements(i));
......@@ -50,9 +50,9 @@ public:
return ret;
}
std::string dump(const std::string & title, ContextHandle & context) {
std::string dump(const std::string & title, Agent & agent) {
cta::objectstore::Register rs;
updateFromObjectStore(rs, context);
updateFromObjectStore(rs, agent.getFreeContext());
std::stringstream ret;
ret<< "<<<< Register " << title << " dump start" << std::endl
<< "Array size=" << rs.elements_size() << std::endl;
......
#pragma once
#include <google/protobuf/text_format.h>
#include "objectstore/cta.pb.h"
#include "ObjectStores.hpp"
#include "ObjectOps.hpp"
#include "Agent.hpp"
class RootEntry: private ObjectOps<cta::objectstore::RootEntry> {
public:
// Initializer.
static void init(ObjectStore & os) {
// check existence of root entry before creating it. We expect read to fail.
try {
os.read(s_rootEntryName);
throw cta::exception::Exception("In RootEntry::init: root entry already exists");
} catch (...) {}
cta::objectstore::RootEntry res;
os.atomicOverwrite(s_rootEntryName, res.SerializeAsString());
}
static void init(ObjectStore & os);
// construtor, when the backend store exists.
// Checks the existence and correctness of the root entry
RootEntry(ObjectStore & os, ContextHandle & context):
ObjectOps<cta::objectstore::RootEntry>(os, s_rootEntryName) {
// Check that the root entry is readable.
cta::objectstore::RootEntry res;
updateFromObjectStore(res, context);
}
RootEntry(Agent & agent);
class NotAllocatedEx: public cta::exception::Exception {
public:
NotAllocatedEx(const std::string & context): cta::exception::Exception(context) {}
NotAllocatedEx(const std::string & w): cta::exception::Exception(w) {}
};
// Get the name of the agent register (or exception if not available)
std::string getAgentRegister(ContextHandle & context) {
// Check if the agent register exists
cta::objectstore::RootEntry res;
updateFromObjectStore(res, context);
// If the registry is defined, return it, job done.
if (res.agentregister().size())
return res.agentregister();
throw NotAllocatedEx("In RootEntry::getAgentRegister: agentRegister not yet allocated");
}
std::string getAgentRegister(Agent & agent);
// Get the name of a (possibly freshly created) agent register
std::string allocateOrGetAgentRegister(ContextHandle & context, std::string agentRegistryId) {
// Check if the agent register exists
try {
return getAgentRegister(context);
} catch (NotAllocatedEx &) {
// If we get here, the agent register is not created yet, so we have to do it:
// lock the entry again, for writing
cta::objectstore::RootEntry res;
lockExclusiveAndRead(res, context);
// If the registry is already defined, somebody was faster. We're done.
if (res.agentregister().size()) {
unlock(context);
return res.agentregister();
}
// We will really create the register
// decide on the object's name
std::string arName ("agentRegister-");
arName += agentRegistryId;
// Record the agent in the intent log
res.add_agentregisterintentlog(arName);
// Commit the intents
write(res);
// The potential object can now be garbage collected if we die from here.
// Create the object, then lock. The name should be unique, so no race.
cta::objectstore::Register ars;
writeChild(arName, ars);
// If we lived that far, we can update the root entry to point to our
// new agent register, and remove the name from the intent log.
res.set_agentregister(arName);
res.mutable_agentregisterintentlog()->RemoveLast();
write(res);
// release the lock, and return the register name
unlock(context);
return arName;
}
}
std::string allocateOrGetAgentRegister(Agent & agent);
// Get the name of the JobPool (or exception if not available)
std::string getJobPool(ContextHandle & context) {
// Check if the job pool exists
cta::objectstore::RootEntry res;
updateFromObjectStore(res, context);
// If the registry is defined, return it, job done.
if (res.jobpool().size())
return res.jobpool();
throw NotAllocatedEx("In RootEntry::getJobPool: jobPool not yet allocated");
}
std::string getJobPool(Agent & agent);
// Get the name of a (possibly freshly created) job pool
std::string allocateOrGetJobPool(ContextHandle & context, std::string jobPoolId) {
// Check if the job pool exists
try {
return getJobPool(context);
} catch (NotAllocatedEx &) {
// If we get here, the job pool is not created yet, so we have to do it:
// lock the entry again, for writing
cta::objectstore::RootEntry res;
lockExclusiveAndRead(res, context);
// If the registry is already defined, somebody was faster. We're done.
if (res.jobpool().size()) {
unlock(context);
return res.jobpool();
}
// We will really create the register
// decide on the object's name
std::string jpName ("jobPool-");
jpName += jobPoolId;
// Record the agent in the intent log
res.add_jobpoolintentlog(jpName);
// Commit the intents
write(res);
// The potential object can now be garbage collected if we die from here.
// Create the object, then lock. The name should be unique, so no race.
cta::objectstore::jobPool jps;
jps.set_migration("");
jps.set_recall("");
writeChild(jpName, jps);
// If we lived that far, we can update the root entry to point to our
// new agent register, and remove the name from the intent log.
res.set_jobpool(jpName);
res.mutable_jobpoolintentlog()->RemoveLast();
write(res);
// release the lock, and return the register name
unlock(context);
return jpName;
}
}
std::string allocateOrGetJobPool(Agent & agent);
// Dump the root entry
std::string dump (ContextHandle & context) {
std::stringstream ret;
cta::objectstore::RootEntry res;
updateFromObjectStore(res, context);
ret << "<<<< Root entry dump start" << std::endl;
if (res.has_agentregister()) ret << "agentRegister=" << res.agentregister() << std::endl;
for (int i=0; i<res.agentregisterintentlog_size(); i++) {
ret << "agentRegisterIntentLog=" << res.agentregisterintentlog(i) << std::endl;
}
if (res.has_jobpool()) ret << "jobPool=" << res.jobpool() << std::endl;
for (int i=0; i<res.jobpoolintentlog_size(); i++) {
ret << "jobPoolIntentLog=" << res.jobpoolintentlog(i) << std::endl;