Commit d4c2c95e authored by Eric Cano's avatar Eric Cano
Browse files

Create the object store type TapePool.

Renamed the setName/GetName functions to the more appropriate getAddress/setAddress.
parent e1360a55
......@@ -62,7 +62,7 @@ void cta::objectstore::Agent::generateName(const std::string & typeName) {
<< std::setw(2) << localNow.tm_hour << ":"
<< std::setw(2) << localNow.tm_min << ":"
<< std::setw(2) << localNow.tm_sec;
setName(aid.str());
setAddress(aid.str());
}
void cta::objectstore::Agent::insertAndRegisterSelf() {
......@@ -77,11 +77,11 @@ void cta::objectstore::Agent::insertAndRegisterSelf() {
// Then we should first create a pointer to our agent
ScopedExclusiveLock arLock(ar);
ar.fetch();
ar.addAgent(getNameIfSet());
ar.addAgent(getAddressIfSet());
ar.commit();
// Set the agent register as owner and backup owner
setBackupOwner(ar.getNameIfSet());
setOwner(ar.getNameIfSet());
setBackupOwner(ar.getAddressIfSet());
setOwner(ar.getAddressIfSet());
// Create the agent
insert();
// And release the agent register's lock
......@@ -101,7 +101,7 @@ void cta::objectstore::Agent::deleteAndUnregisterSelf() {
// Then we should first create a pointer to our agent
ScopedExclusiveLock arLock(ar);
ar.fetch();
ar.removeAgent(getNameIfSet());
ar.removeAgent(getAddressIfSet());
ar.commit();
arLock.release();
}
......@@ -121,7 +121,7 @@ void cta::objectstore::Agent::deleteAndUnregisterSelf() {
std::string cta::objectstore::Agent::nextId(const std::string & childType) {
std::stringstream id;
id << childType << "-" << getNameIfSet() << "-" << m_nextId++;
id << childType << "-" << getAddressIfSet() << "-" << m_nextId++;
return id.str();
}
......
......@@ -158,7 +158,7 @@ public:
commit();
agentLock.release();
// Then make the pointer agent's ownership official
object.setOwner(getNameIfSet());
object.setOwner(getAddressIfSet());
// The container should be the backup owner, so let's make sure!
object.setBackupOwner(container.getNameIfSet());
// Commit the object
......@@ -181,7 +181,7 @@ public:
ScopedExclusiveLock objLock(object);
object.fetch();
// Check that the object is indeed ours
if (object.getOwner() != getNameIfSet())
if (object.getOwner() != getAddressIfSet())
throw AgentDoesNotOwnObject("In Agent::pushToContainer: agent is not the owner of the object");
// Lock the container for write
ScopedExclusiveLock contLock(container);
......
......@@ -85,7 +85,7 @@ std::list<std::string> cta::objectstore::AgentRegister::getUntrackedAgents() {
std::string cta::objectstore::AgentRegister::dump() {
checkPayloadReadable();
std::stringstream ret;
ret<< "<<<< AgentRegister " << getNameIfSet() << " dump start" << std::endl
ret<< "<<<< AgentRegister " << getAddressIfSet() << " dump start" << std::endl
<< "Agents array size=" << m_payload.agents_size() << std::endl;
for (int i=0; i<m_payload.agents_size(); i++) {
ret << "element[" << i << "]=" << m_payload.agents(i) << std::endl;
......@@ -94,6 +94,6 @@ std::string cta::objectstore::AgentRegister::dump() {
for (int i=0; i<m_payload.untrackedagents_size(); i++) {
ret << "intendedElement[" << i << "]=" << m_payload.untrackedagents(i) << std::endl;
}
ret<< ">>>> AgentRegister " << getNameIfSet() << " dump end" << std::endl;
ret<< ">>>> AgentRegister " << getAddressIfSet() << " dump end" << std::endl;
return ret.str();
}
\ No newline at end of file
......@@ -17,6 +17,7 @@ add_library (CTAObjectStore SHARED
Agent.cpp
AgentRegister.cpp
AgentWatchdog.cpp
TapePool.cpp
#AdminUsersList.cpp
BackendVFS.cpp
BackendRados.cpp
......
......@@ -94,14 +94,14 @@ uint64_t cta::objectstore::FIFO::size() {
std::string cta::objectstore::FIFO::dump() {
checkPayloadReadable();
std::stringstream ret;
ret << "<<<< FIFO dump start: " << getNameIfSet() << std::endl
ret << "<<<< FIFO dump start: " << getAddressIfSet() << std::endl
<< "Read pointer=" << m_payload.readpointer() << std::endl
<< "Array size=" << m_payload.name_size() << std::endl;
for (int i = m_payload.readpointer(); i < m_payload.name_size(); i++) {
ret << "name[phys=" << i << " ,log=" << i - m_payload.readpointer()
<< "]=" << m_payload.name(i) << std::endl;
}
ret << ">>>> FIFO dump end for " << getNameIfSet() << std::endl;
ret << ">>>> FIFO dump end for " << getAddressIfSet() << std::endl;
return ret.str();
}
......
......@@ -29,7 +29,7 @@ GarbageCollector::GarbageCollector(Backend & os, Agent & agent):
RootEntry re(m_objectStore);
ScopedSharedLock reLock(re);
re.fetch();
m_agentRegister.setName(re.getAgentRegisterPointer());
m_agentRegister.setAddress(re.getAgentRegisterPointer());
reLock.release();
ScopedSharedLock arLock(m_agentRegister);
m_agentRegister.fetch();
......@@ -85,7 +85,7 @@ void GarbageCollector::aquireTargets() {
for (;m_watchedAgents.size() < c_maxWatchedAgentsPerGC
&& c!=candidatesList.end(); c++) {
// We don't monitor ourselves
if (*c != m_ourAgent.getNameIfSet()) {
if (*c != m_ourAgent.getAddressIfSet()) {
// So we have a candidate we might want to monitor
// First, check that the agent entry exists, and that ownership
// is indeed pointing to the agent register
......@@ -100,8 +100,8 @@ void GarbageCollector::aquireTargets() {
ag.fetch();
// Check that the actual owner is the agent register.
// otherwise, it should not be listed as an agent to monitor
if (ag.getOwner() != m_agentRegister.getNameIfSet()) {
m_agentRegister.trackAgent(ag.getNameIfSet());
if (ag.getOwner() != m_agentRegister.getAddressIfSet()) {
m_agentRegister.trackAgent(ag.getAddressIfSet());
agLock.release();
continue;
}
......@@ -110,26 +110,26 @@ void GarbageCollector::aquireTargets() {
// Lock ours
ScopedExclusiveLock oaLock(m_ourAgent);
m_ourAgent.fetch();
m_ourAgent.addToOwnership(ag.getNameIfSet());
m_ourAgent.addToOwnership(ag.getAddressIfSet());
m_ourAgent.commit();
// We now have a pointer to the agent, we can make the ownership official
ag.setOwner(m_ourAgent.getNameIfSet());
ag.setOwner(m_ourAgent.getAddressIfSet());
ag.commit();
// And we can remove the now dangling pointer from the agent register
// (we hold an exclusive lock all along)
m_agentRegister.trackAgent(ag.getNameIfSet());
m_agentRegister.trackAgent(ag.getAddressIfSet());
m_agentRegister.commit();
// Agent is officially our, we can remove it from the untracked agent's
// list
m_agentRegister.trackAgent(ag.getNameIfSet());
m_agentRegister.trackAgent(ag.getAddressIfSet());
// Agent is now officially ours, let's track it. We have the release the
// lock to the agent before constructing the watchdog, which builds
// its own agent objects (and need to lock the object store representation)
std::string agentName = ag.getNameIfSet();
std::string agentName = ag.getAddressIfSet();
agLock.release();
m_watchedAgents[agentName] =
new AgentWatchdog(agentName, m_objectStore);
m_watchedAgents[ag.getNameIfSet()]->setTimeout(m_timeout);
m_watchedAgents[ag.getAddressIfSet()]->setTimeout(m_timeout);
}
}
// Commit all the modifications to the agent register
......@@ -160,7 +160,7 @@ void GarbageCollector::checkHeartbeats() {
Agent agent(name, m_objectStore);
ScopedExclusiveLock agLock(agent);
agent.fetch();
if (agent.getOwner() != m_ourAgent.getNameIfSet()) {
if (agent.getOwner() != m_ourAgent.getAddressIfSet()) {
throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: the ownership is not ours as expected");
}
// Return all objects owned by the agent to their respective backup owners
......
......@@ -26,7 +26,7 @@ void GenericObject::fetch() {
throw NotLocked("In ObjectOps::fetch(): object not locked");
m_existingObject = true;
// Get the header from the object store. We don't care for the type
m_header.ParseFromString(m_objectStore.read(getNameIfSet()));
m_header.ParseFromString(m_objectStore.read(getAddressIfSet()));
m_headerInterpreted = true;
}
......@@ -37,7 +37,7 @@ serializers::ObjectType GenericObject::type() {
void GenericObject::commit() {
checkHeaderWritable();
m_objectStore.atomicOverwrite(getNameIfSet(), m_header.SerializeAsString());
m_objectStore.atomicOverwrite(getAddressIfSet(), m_header.SerializeAsString());
}
void GenericObject::insert() {
......
......@@ -41,11 +41,11 @@ public:
std::string dump(Agent & agent) {
checkPayloadReadable();
std::stringstream ret;
ret << "<<<< JobPool " << getNameIfSet() << " dump start" << std::endl
ret << "<<<< JobPool " << getAddressIfSet() << " dump start" << std::endl
<< "Migration=" << m_payload.migration() << std::endl
<< "Recall=" << m_payload.recall() << std::endl
<< "RecallCounter=" << m_payload.recallcounter() << std::endl;
ret << ">>>> JobPool " << getNameIfSet() << " dump end" << std::endl;
ret << ">>>> JobPool " << getAddressIfSet() << " dump end" << std::endl;
return ret.str();
}
......
......@@ -27,6 +27,7 @@ namespace cta { namespace objectstore {
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RootEntry);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(AgentRegister);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Agent);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(TapePool);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(JobPool);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RecallFIFO);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(MigrationFIFO);
......
......@@ -36,9 +36,9 @@ protected:
m_existingObject(false), m_locksCount(0),
m_locksForWriteCount(0) {}
class NameNotSet: public cta::exception::Exception {
class AddressNotSet: public cta::exception::Exception {
public:
NameNotSet(const std::string & w): cta::exception::Exception(w) {}
AddressNotSet(const std::string & w): cta::exception::Exception(w) {}
};
class NotLocked: public cta::exception::Exception {
......@@ -76,9 +76,9 @@ protected:
NotInitialized(const std::string & w): cta::exception::Exception(w) {}
};
class NameAlreadySet: public cta::exception::Exception {
class AddressAlreadySet: public cta::exception::Exception {
public:
NameAlreadySet(const std::string & w): cta::exception::Exception(w) {}
AddressAlreadySet(const std::string & w): cta::exception::Exception(w) {}
};
void checkHeaderWritable() {
......@@ -117,23 +117,23 @@ protected:
public:
void setName(const std::string & name) {
void setAddress(const std::string & name) {
if (m_nameSet)
throw NameAlreadySet("In ObjectOps::setName: trying to overwrite an already set name");
throw AddressAlreadySet("In ObjectOps::setName: trying to overwrite an already set name");
m_name = name;
m_nameSet = true;
}
std::string & getNameIfSet() {
std::string & getAddressIfSet() {
if (!m_nameSet) {
throw NameNotSet("In ObjectOpsBase::getNameIfSet: name not set yet");
throw AddressNotSet("In ObjectOpsBase::getNameIfSet: name not set yet");
}
return m_name;
}
void remove () {
checkWritable();
m_objectStore.remove(getNameIfSet());
m_objectStore.remove(getAddressIfSet());
m_existingObject = false;
m_headerInterpreted = false;
m_payloadInterpreted = false;
......@@ -222,7 +222,7 @@ public:
void lock(ObjectOpsBase & oo) {
checkNotLocked();
m_objectOps = & oo;
m_lock.reset(m_objectOps->m_objectStore.lockShared(m_objectOps->getNameIfSet()));
m_lock.reset(m_objectOps->m_objectStore.lockShared(m_objectOps->getAddressIfSet()));
m_objectOps->m_locksCount++;
m_locked = true;
}
......@@ -237,7 +237,7 @@ public:
void lock(ObjectOpsBase & oo) {
checkNotLocked();
m_objectOps = &oo;
m_lock.reset(m_objectOps->m_objectStore.lockExclusive(m_objectOps->getNameIfSet()));
m_lock.reset(m_objectOps->m_objectStore.lockExclusive(m_objectOps->getAddressIfSet()));
m_objectOps->m_locksCount++;
m_objectOps->m_locksForWriteCount++;
m_locked = true;
......@@ -254,7 +254,7 @@ template <class C>
class ObjectOps: public ObjectOpsBase {
protected:
ObjectOps(Backend & os, const std::string & name): ObjectOpsBase(os) {
setName(name);
setAddress(name);
}
ObjectOps(Backend & os): ObjectOpsBase(os) {}
......@@ -278,7 +278,7 @@ public:
// Serialise the payload into the header
m_header.set_payload(m_payload.SerializeAsString());
// Write the object
m_objectStore.atomicOverwrite(getNameIfSet(), m_header.SerializeAsString());
m_objectStore.atomicOverwrite(getAddressIfSet(), m_header.SerializeAsString());
}
protected:
......@@ -289,7 +289,7 @@ protected:
}
void getHeaderFromObjectStore () {
m_header.ParseFromString(m_objectStore.read(getNameIfSet()));
m_header.ParseFromString(m_objectStore.read(getAddressIfSet()));
if (m_header.type() != typeId) {
std::stringstream err;
err << "In ObjectOps::getHeaderFromObjectStore wrong object type: "
......@@ -324,12 +324,12 @@ public:
// We don't require locking here, as the object does not exist
// yet in the object store (and this is ensured by the )
m_header.set_payload(m_payload.SerializeAsString());
m_objectStore.create(getNameIfSet(), m_header.SerializeAsString());
m_objectStore.create(getAddressIfSet(), m_header.SerializeAsString());
m_existingObject = true;
}
bool exists() {
return m_objectStore.exists(getNameIfSet());
return m_objectStore.exists(getAddressIfSet());
}
private:
......@@ -343,7 +343,7 @@ private:
}
std::string selfName() {
if(!m_nameSet) throw NameNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
if(!m_nameSet) throw AddressNotSet("In ObjectOps<>::updateFromObjectStore: name not set");
return m_name;
}
......
......@@ -19,6 +19,7 @@
#include "RootEntry.hpp"
#include "AgentRegister.hpp"
#include "Agent.hpp"
#include "TapePool.hpp"
#include <cxxabi.h>
#include "ProtcolBuffersAlgorithms.hpp"
......@@ -303,7 +304,7 @@ namespace {
void cta::objectstore::RootEntry::addLibrary(const std::string& library,
const CreationLog & log) {
checkPayloadWritable();
// Check the library does not exist already
// Check the library does not exist aclready
try {
serializers::findElement(m_payload.libraries(), library);
throw DuplicateEntry("In RootEntry::addLibrary: trying to create duplicate entry");
......@@ -334,7 +335,56 @@ std::list<std::string> cta::objectstore::RootEntry::dumpLibraries() {
return ret;
}
// =============================================================================
// ========== Tape pools manipulations =========================================
// =============================================================================
// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
bool operator==(const std::string &tp,
const cta::objectstore::serializers::TapePoolPointer & tpp) {
return tpp.name() == tp;
}
}
void cta::objectstore::RootEntry::addTapePoolAndCommit(const std::string& tapePool,
const CreationLog& log, Agent& agent) {
checkHeaderWritable();
// Check the tape pool does not already exist
try {
serializers::findElement(m_payload.tapepoolpointers(), tapePool);
throw DuplicateEntry("In RootEntry::addTapePool: trying to create duplicate entry");
} catch (serializers::NotFound &) {}
// Insert the tape pool, then its pointer, with agent intent log update
// First generate the intent
ScopedExclusiveLock al(agent);
std::string tapePoolAddress = agent.nextId("tapePool");
agent.fetch();
agent.addToOwnership(tapePoolAddress);
agent.commit();
// Then create the tape pool object
TapePool tp(tapePoolAddress, ObjectOps<serializers::RootEntry>::m_objectStore);
tp.initialize(tapePool);
tp.setOwner(agent.getAddressIfSet());
tp.setBackupOwner(agent.getAddressIfSet());
tp.insert();
ScopedExclusiveLock tpl(tp);
// Now move the tape pool's ownership to the root entry
auto * tpp = m_payload.mutable_tapepoolpointers()->Add();
tpp->set_address(tapePoolAddress);
tpp->set_name(tapePool);
// We must commit here to ensure the tape pool object is referenced.
commit();
// Now update the tape pool's ownership.
tp.setOwner(getAddressIfSet());
tp.setBackupOwner(getAddressIfSet());
tp.commit();
// ... and clean up the agent
agent.removeFromOwnership(tapePoolAddress);
agent.commit();
}
......@@ -551,8 +601,8 @@ std::string cta::objectstore::RootEntry::addOrGetAgentRegisterPointer(Agent & ag
deleteIntendedAgentRegistry();
commit();
// Record completion in agent registry
ar.setOwner(getNameIfSet());
ar.setBackupOwner(getNameIfSet());
ar.setOwner(getAddressIfSet());
ar.setBackupOwner(getAddressIfSet());
ar.commit();
// And we are done. Release locks
lockPtr.reset(NULL);
......
......@@ -32,7 +32,7 @@ class Agent;
class RootEntry: public ObjectOps<serializers::RootEntry> {
public:
// construtor
// Constructor
RootEntry(Backend & os);
class NotAllocatedEx: public cta::exception::Exception {
......@@ -112,7 +112,7 @@ public:
// TapePoolManipulations =====================================================
/** This function implicitly creates the tape pool structure and updates
* the pointer to it */
void addTapePool(const std::string & tapePool, const CreationLog & log, Agent & agent);
void addTapePoolAndCommit(const std::string & tapePool, const CreationLog & log, Agent & agent);
/** This function implicitly deletes the tape pool structure.
* Fails if it not empty*/
void removeTapePool(const std::string & tapePool, Agent & agent);
......
......@@ -6,14 +6,15 @@ enum ObjectType {
RootEntry_t = 0;
AgentRegister_t = 1;
Agent_t = 2;
JobPool_t = 3;
RecallFIFO_t = 4;
MigrationFIFO_t = 5;
RecallJob_t = 6;
Counter_t = 7;
FIFO_t = 8;
AdminUsersList_t = 9;
StorageClassList_t = 10;
TapePool_t = 3;
// JobPool_t = 3;
// RecallFIFO_t = 4;
// MigrationFIFO_t = 5;
// RecallJob_t = 6;
// Counter_t = 7;
// FIFO_t = 8;
// AdminUsersList_t = 9;
// StorageClassList_t = 10;
GenericObject_t = 1000;
}
......@@ -126,6 +127,7 @@ message RootEntry {
//=========== Sub-objects ======================================================
// ------------- Agent handling ------------------------------------------------
// The agent object represents a running process. It is a payload to an object
// itself, and it can be owned by a watchdog or a global agent registry, which
......@@ -141,27 +143,12 @@ message RootEntry {
// The objects in this list can be considered for being returned to a backup
// owner.
message Agent {
required string description = 50;
required uint64 heartbeat = 51;
required uint64 timeout_us = 52;
repeated string ownedobjects = 53;
required string description = 2000;
required uint64 heartbeat = 2001;
required uint64 timeout_us = 2002;
repeated string ownedobjects = 2003;
}
////// The root entry: it points to an agent register and a job pool register.
////// There is an integrated intent log for the agent register, but the updates
////// should be very rare (besides first register creation). All other objects
////// can be covered by the agents's ownership logs.
////message RootEntry {
//// optional string agentregister = 100;
//// repeated string agentregisterintentlog = 101;
//// optional string jobpool = 102;
//// repeated string jobpoolintentlog = 103;
//// optional string adminuserslist = 104;
//// repeated string adminuserslistintentlog = 105;
//// optional string storageclasslist = 106;
//// repeated string storageclasslistintentlog = 107;
////}
////// The registers (simple name arrays)
////message Register {
//// repeated string elements = 150;
......@@ -170,9 +157,36 @@ message Agent {
// The agent register holds 2 lists:
// a full list, and a list of agents not yet watched
message AgentRegister {
repeated string agents = 160;
repeated string untrackedagents = 161;
repeated string agents = 2100;
repeated string untrackedagents = 2101;
}
// ------------- Jobs handling -------------------------------------------------
message ArchivalJobPointer {
required uint64 size = 3001;
required string address = 3002;
}
// ------------- Tape pools ---------------------------------------------------
message TapePointer {
required string name = 4000;
required string address = 4001;
required CreationLog log = 4002;
}
message TapePool {
required string name = 4100;
repeated TapePointer tapes = 4101;
repeated ArchivalJobPointer archivaljobs = 4102;
required uint64 ArchivalJobsTotalSize = 4103;
}
// ------------- Drives handling ----------------------------------------------
////
////// A basic FIFO
////// poping is done by increasing the read pointer, and from time to time
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment