diff --git a/objectstore/Action.hpp b/objectstore/Action.hpp index 00ef8b0bcc5864174ea1409922cb1e283ef3d6cb..8699f7f3830a7572917f302a6594529475b98dc1 100644 --- a/objectstore/Action.hpp +++ b/objectstore/Action.hpp @@ -1,10 +1,160 @@ #pragma once +#include "Agent.hpp" +#include "RootEntry.hpp" +#include "RecallJob.hpp" +#include "JobPool.hpp" +#include "FIFO.hpp" +#include "AgentRegister.hpp" +#include <unistd.h> + namespace cta { namespace objectstore { class Action { public: - void execute() {} + virtual void execute() {} + virtual ~Action() {} +}; + +class JobPoster: public Action { +public: + JobPoster (Agent & agent, int number, int objective): + m_agent(agent), m_number(number), m_objective(objective), m_achieved(0){} + virtual void execute() { + std::stringstream name; + name << "JobPoster-" << m_number; + m_agent.setup(name.str()); + m_agent.create(); + RootEntry re(m_agent); + JobPool jp(re.getJobPool(m_agent), m_agent); + std::string fifoName = jp.getRecallFIFO(m_agent); + FIFO fifo(fifoName, m_agent); + std::cout << name << " starting"; + while (m_achieved < m_objective) { + std::stringstream src, dst; + src << "S-" << m_number << "-" << m_achieved; + dst << "D-" << m_number << "-" << m_achieved; + // This create the recall job and return the name. It is + // automatically added to the creation intent log of m_agent. + std::string rjName = RecallJob::create(fifoName, src.str(), dst.str(), m_agent); + // Post the job + fifo.push(rjName, m_agent); + // We can now release it from the intent log + m_agent.removeFromIntent(fifoName, rjName, "RecallJob"); + std::cout << rjName << " created" << std::endl; + m_achieved++; + } + std::cout << name << " complete" << std::endl; + } +private: + Agent & m_agent; + int m_number; + int m_objective; + int m_achieved; +}; + +class Recaller: public Action { +public: + Recaller (Agent & agent, int number): m_agent(agent), m_number(number) {} + virtual void execute() { + std::stringstream name; + name << "RecallerAction-" << m_number; + m_agent.setup(name.str()); + m_agent.create(); + RootEntry re(m_agent); + JobPool jp(re.getJobPool(m_agent), m_agent); + FIFO fifo(jp.getRecallFIFO(m_agent), m_agent); + std::cout << name << " starting"; + while (true) { + try { + // Pop a job from the FIFO + FIFO::Transaction tr = fifo.startTransaction(m_agent); + std::string rjName = tr.peek(); + m_agent.addToOwnership(tr.peek(), "RecallJob"); + tr.popAndUnlock(); + RecallJob rj(rjName, m_agent); + // Sleep on it for a while + usleep(100 * 1000); + // Log the deletion + std::cout << "RecallJob " << rj.source(m_agent) << " => " + << rj.destination(m_agent) << " is done" << std::endl; + rj.remove(); + } catch (FIFO::FIFOEmpty &) { break; } + std::cout << name << "complete: FIFO empty" << std::endl; + } + } + +private: + Agent & m_agent; + int m_number; +}; + +class GarbageCollector: public Action { +public: + GarbageCollector (Agent & agent): m_agent(agent) {} + virtual void execute() { + std::stringstream name; + name << "GarbageCollector"; + m_agent.setup(name.str()); + m_agent.create(); + RootEntry re(m_agent); + AgentRegister ar(re.getAgentRegister(m_agent), m_agent); + std::cout << name << " starting"; + utils::Timer noAgentTimer; + std::map<std::string, AgentWatchdog *> watchdogs; + while (true) { + // Get the list of current agents + std::list<std::string> agentNames = ar.getElements(m_agent); + // If no one is running, go away after a delay + if(!agentNames.size()) { + if (noAgentTimer.secs() > 1.0) + break; + } else { + noAgentTimer.reset(); + } + // On a first pass, trim the watchdog list of now-gone agents + for (std::map<std::string, AgentWatchdog *>::iterator i=watchdogs.begin(); + i != watchdogs.end();) { + if (std::find(agentNames.begin(), agentNames.end(), i->first) == agentNames.end()) { + delete i->second; + // The post-increment returns the iterator to erase, + // but sets i to the next value, which will remain valid. + watchdogs.erase(i++); + } else { + i++; + } + } + // On a second pass, check that we are acquaint with all processes + for (std::list<std::string>::iterator i=agentNames.begin(); + i != agentNames.end(); i++) { + if(watchdogs.find(*i) == watchdogs.end()) { + watchdogs[*i] = new AgentWatchdog(*i, m_agent); + } + } + // And now check the heartbeats of the agents + for (std::map<std::string, AgentWatchdog *>::iterator i=watchdogs.begin(); + i != watchdogs.end();) { + if (!i->second->checkAlive(m_agent)) { + collectGarbage(i->first); + delete i->second; + // The post-increment returns the iterator to erase, + // but sets i to the next value, which will remain valid. + watchdogs.erase(i++); + } + } + } + } + +private: + Agent & m_agent; + void collectGarbage(const std::string & agentName) { + // When collecting the garbage of an agent, we have to iterate through its + // intended and owned objects, validate that they are still owned by the dead + // agent, and re-post them to the container where they should be (and ownership) + // is re-set to the container. + Agent ag(agentName, m_agent); + std::list<Agent::intentEntry> intendedObjects = ag.getIntentLog(); + } }; }} diff --git a/objectstore/Agent.cpp b/objectstore/Agent.cpp index 0eca49395a6e90a4a26d599de4ea8754989e0731..11d9fdef11e4e89dd314636eeec8c5f3a373b874 100644 --- a/objectstore/Agent.cpp +++ b/objectstore/Agent.cpp @@ -7,22 +7,22 @@ #include <ctime> cta::objectstore::Agent::Agent(ObjectStore & os): - ObjectOps<cta::objectstore::serializers::Agent>(os), + ObjectOps<serializers::Agent>(os), m_nextId(0), m_setupDone(false), m_creationDone(false), m_observerVersion(false) {}; cta::objectstore::Agent::Agent(ObjectStore & os, const std::string & typeName): - ObjectOps<cta::objectstore::serializers::Agent>(os), + ObjectOps<serializers::Agent>(os), m_nextId(0), m_setupDone(false), m_creationDone(false), m_observerVersion(false) { setup(typeName); } // Passive constructor, used for looking at existing agent records cta::objectstore::Agent::Agent(const std::string & name, Agent & agent): - ObjectOps<cta::objectstore::serializers::Agent>(agent.objectStore(), name), + ObjectOps<serializers::Agent>(agent.objectStore(), name), m_nextId(0), m_setupDone(true), m_creationDone(true), m_observerVersion(true) { // check the presence of the entry - cta::objectstore::serializers::Agent as; + serializers::Agent as; updateFromObjectStore(as, agent.getFreeContext()); } @@ -45,7 +45,7 @@ void cta::objectstore::Agent::setup(const std::string & typeName) { << std::setw(2) << localNow.tm_hour << ":" << std::setw(2) << localNow.tm_min << ":" << std::setw(2) << localNow.tm_sec; - ObjectOps<cta::objectstore::serializers::Agent>::setName(aid.str()); + ObjectOps<serializers::Agent>::setName(aid.str()); m_setupDone = true; } @@ -55,7 +55,7 @@ void cta::objectstore::Agent::create() { RootEntry re(*this); AgentRegister ar(re.allocateOrGetAgentRegister(*this), *this); ar.addIntendedElement(selfName(), *this); - cta::objectstore::serializers::Agent as; + serializers::Agent as; as.set_name(selfName()); writeChild(selfName(), as); ar.upgradeIntendedElementToActual(selfName(), *this); @@ -110,10 +110,10 @@ cta::objectstore::ContextHandleImplementation<myOS> & void cta::objectstore::Agent::addToIntend (std::string container, std::string name, std::string typeName) { if (!m_creationDone) throw CreationNotDone("In Agent::addToIntend(): creation() not yet done"); - cta::objectstore::serializers::Agent as; + serializers::Agent as; ContextHandle & ctx = getFreeContext(); lockExclusiveAndRead(as, ctx); - cta::objectstore::serializers::ObjectCreationIntent * oca = + serializers::ObjectCreationIntent * oca = as.mutable_creationintent()->Add(); oca->set_container(container); oca->set_name(name); @@ -125,7 +125,7 @@ void cta::objectstore::Agent::addToIntend (std::string container, std::string na void cta::objectstore::Agent::removeFromIntent (std::string container, std::string name, std::string typeName) { if (!m_creationDone) throw CreationNotDone("In Agent::removeFromIntent(): creation() not yet done"); - cta::objectstore::serializers::Agent as; + serializers::Agent as; ContextHandle & ctx = getFreeContext(); lockExclusiveAndRead(as, ctx); bool found; @@ -149,10 +149,10 @@ void cta::objectstore::Agent::removeFromIntent (std::string container, std::stri void cta::objectstore::Agent::addToOwnership(std::string name, std::string typeName) { if (!m_creationDone) throw CreationNotDone("In Agent::addToOwnership(): creation() not yet done"); - cta::objectstore::serializers::Agent as; + serializers::Agent as; ContextHandle & ctx = getFreeContext(); lockExclusiveAndRead(as, ctx); - cta::objectstore::serializers::ObjectOwnershipIntent * ooi = + serializers::ObjectOwnershipIntent * ooi = as.mutable_ownershipintent()->Add(); ooi->set_name(name); ooi->set_type(typeName); @@ -163,7 +163,7 @@ void cta::objectstore::Agent::addToOwnership(std::string name, std::string typeN void cta::objectstore::Agent::removeFromOwnership(std::string name, std::string typeName) { if (!m_creationDone) throw CreationNotDone("In Agent::removeFromOwnership(): creation() not yet done"); - cta::objectstore::serializers::Agent as; + serializers::Agent as; ContextHandle & ctx = getFreeContext(); lockExclusiveAndRead(as, ctx); bool found; @@ -187,7 +187,7 @@ std::list<cta::objectstore::Agent::intentEntry> cta::objectstore::Agent::getIntentLog() { if (!m_creationDone) throw CreationNotDone("In Agent::getIntentLog(): creation() not yet done"); - cta::objectstore::serializers::Agent as; + serializers::Agent as; updateFromObjectStore(as, getFreeContext()); std::list<intentEntry> ret; for (int i=0; i<as.creationintent_size(); i++) { @@ -202,7 +202,7 @@ std::list<cta::objectstore::Agent::ownershipEntry> cta::objectstore::Agent::getOwnershipLog() { if (!m_creationDone) throw CreationNotDone("In Agent::getOwnershipLog(): creation() not yet done"); - cta::objectstore::serializers::Agent as; + serializers::Agent as; updateFromObjectStore(as, getFreeContext()); std::list<ownershipEntry> ret; for (int i=0; i<as.creationintent_size(); i++) { @@ -213,11 +213,11 @@ std::list<cta::objectstore::Agent::ownershipEntry> } cta::objectstore::ObjectStore & cta::objectstore::Agent::objectStore() { - return ObjectOps<cta::objectstore::serializers::Agent>::objectStore(); + return ObjectOps<serializers::Agent>::objectStore(); } std::string cta::objectstore::Agent::dump(Agent & agent) { - cta::objectstore::serializers::Agent as; + serializers::Agent as; updateFromObjectStore(as, agent.getFreeContext()); std::stringstream ret; ret<< "<<<< Agent " << selfName() << " dump start" << std::endl @@ -235,4 +235,30 @@ std::string cta::objectstore::Agent::dump(Agent & agent) { } ret<< ">>>> Agent " << selfName() << " dump end" << std::endl; return ret.str(); -} \ No newline at end of file +} + +void cta::objectstore::Agent::heartbeat(Agent& agent) { + ContextHandle & context = agent.getFreeContext(); + serializers::Agent as; + lockExclusiveAndRead(as, context); +} + +uint64_t cta::objectstore::Agent::getHeartbeatCount(Agent& agent) { + serializers::Agent as; + updateFromObjectStore(as, agent.getFreeContext()); +} + + +cta::objectstore::AgentWatchdog::AgentWatchdog(const std::string& agentName, Agent& agent): +m_agentVisitor(agentName, agent) { + m_hearbeatCounter = m_agentVisitor.getHeartbeatCount(agent); +} + +bool cta::objectstore::AgentWatchdog::checkAlive(Agent& agent) { + uint64_t newHeartBeatCount = m_agentVisitor.getHeartbeatCount(agent); + if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 0.1) + return false; + m_hearbeatCounter = newHeartBeatCount; + return true; +} + diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp index 95883c7a440b68f8dc1c2e11bdee1b2cbd395402..789e24746a4ff3ebe614961ac76a48a753229e00 100644 --- a/objectstore/Agent.hpp +++ b/objectstore/Agent.hpp @@ -4,6 +4,7 @@ #include "ObjectOps.hpp" #include "ContextHandle.hpp" #include "objectstore/cta.pb.h" +#include "utils/Timer.hpp" namespace cta { namespace objectstore { @@ -16,7 +17,7 @@ namespace cta { namespace objectstore { * It handles (in the base class): */ -class Agent: protected ObjectOps<cta::objectstore::serializers::Agent> { +class Agent: protected ObjectOps<serializers::Agent> { public: Agent(ObjectStore & os); @@ -40,6 +41,7 @@ public: public: ObserverOnly(const std::string & w): cta::exception::Exception(w) {} }; + void create(); @@ -87,6 +89,10 @@ public: std::string dump(Agent & agent); + void heartbeat(Agent & agent); + + uint64_t getHeartbeatCount(Agent & agent); + private: std::string m_typeName; bool m_setupDone; @@ -97,4 +103,14 @@ private: ContextHandleImplementation<myOS> m_contexts[c_handleCount]; }; +class AgentWatchdog { +public: + AgentWatchdog(const std::string & agentName, Agent & agent); + bool checkAlive(Agent & agent); +private: + cta::utils::Timer m_timer; + Agent m_agentVisitor; + uint64_t m_hearbeatCounter; +}; + }} \ No newline at end of file diff --git a/objectstore/AgentRegister.cpp b/objectstore/AgentRegister.cpp index ae9ad9f7f28d14051412dbc085a4adc753740347..0e58757cf774be343e4da060c1ffd5adbf289ca9 100644 --- a/objectstore/AgentRegister.cpp +++ b/objectstore/AgentRegister.cpp @@ -1,15 +1,15 @@ #include "AgentRegister.hpp" cta::objectstore::AgentRegister::AgentRegister(const std::string & name, Agent & agent): -ObjectOps<cta::objectstore::serializers::AgentRegister>(agent.objectStore(), name) { +ObjectOps<serializers::AgentRegister>(agent.objectStore(), name) { // Check that the entry is present and readable (depending on implementation // of object store, locking might or might not succeed) - cta::objectstore::serializers::AgentRegister rs; + serializers::AgentRegister rs; updateFromObjectStore(rs, agent.getFreeContext()); } void cta::objectstore::AgentRegister::addElement (std::string name, Agent & agent) { - cta::objectstore::serializers::AgentRegister rs; + serializers::AgentRegister rs; ContextHandle & context = agent.getFreeContext(); lockExclusiveAndRead(rs, context); rs.add_elements(name); @@ -18,7 +18,7 @@ void cta::objectstore::AgentRegister::addElement (std::string name, Agent & agen } void cta::objectstore::AgentRegister::removeElement (const std::string & name, Agent & agent) { - cta::objectstore::serializers::AgentRegister rs; + serializers::AgentRegister rs; ContextHandle & context = agent.getFreeContext(); lockExclusiveAndRead(rs, context); bool found; @@ -38,7 +38,7 @@ void cta::objectstore::AgentRegister::removeElement (const std::string & name, } void cta::objectstore::AgentRegister::addIntendedElement(std::string name, Agent& agent) { - cta::objectstore::serializers::AgentRegister rs; + serializers::AgentRegister rs; ContextHandle & context = agent.getFreeContext(); lockExclusiveAndRead(rs, context); rs.add_intendedelements(name); @@ -47,7 +47,7 @@ void cta::objectstore::AgentRegister::addIntendedElement(std::string name, Agent } void cta::objectstore::AgentRegister::upgradeIntendedElementToActual(std::string name, Agent& agent) { - cta::objectstore::serializers::AgentRegister rs; + serializers::AgentRegister rs; ContextHandle & context = agent.getFreeContext(); lockExclusiveAndRead(rs, context); bool found; @@ -69,7 +69,7 @@ void cta::objectstore::AgentRegister::upgradeIntendedElementToActual(std::string void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& name, Agent& agent) { - cta::objectstore::serializers::AgentRegister rs; + serializers::AgentRegister rs; ContextHandle & context = agent.getFreeContext(); lockExclusiveAndRead(rs, context); bool found; @@ -91,7 +91,7 @@ void cta::objectstore::AgentRegister::removeIntendedElement(const std::string& n std::list<std::string> cta::objectstore::AgentRegister::getElements(Agent & agent) { - cta::objectstore::serializers::AgentRegister rs; + serializers::AgentRegister rs; updateFromObjectStore(rs, agent.getFreeContext()); std::list<std::string> ret; for (int i=0; i<rs.elements_size(); i++) { @@ -101,7 +101,7 @@ std::list<std::string> cta::objectstore::AgentRegister::getElements(Agent & agen } std::string cta::objectstore::AgentRegister::dump(Agent & agent) { - cta::objectstore::serializers::AgentRegister rs; + serializers::AgentRegister rs; updateFromObjectStore(rs, agent.getFreeContext()); std::stringstream ret; ret<< "<<<< AgentRegister " << selfName() << " dump start" << std::endl diff --git a/objectstore/AgentRegister.hpp b/objectstore/AgentRegister.hpp index 04ffe44c17c632a2d8452169331755407e4d7eb5..a87d205fc6e43d2022abe440f1ccd33ff6ee29cc 100644 --- a/objectstore/AgentRegister.hpp +++ b/objectstore/AgentRegister.hpp @@ -6,7 +6,7 @@ namespace cta { namespace objectstore { -class AgentRegister: private ObjectOps<cta::objectstore::serializers::AgentRegister> { +class AgentRegister: private ObjectOps<serializers::AgentRegister> { public: AgentRegister(const std::string & name, Agent & agent); void addElement (std::string name, Agent & agent); diff --git a/objectstore/AgentVisitor.hpp b/objectstore/AgentVisitor.hpp new file mode 100644 index 0000000000000000000000000000000000000000..9430ecad163630f3531327af036791cc561d88bf --- /dev/null +++ b/objectstore/AgentVisitor.hpp @@ -0,0 +1,13 @@ +#pragma once + +#include "ObjectOps.hpp" +#include "objectstore/cta.pb.h" + +namespace cta { namespace objectstore { + + + class AgentVisitor: private ObjectOps<serializers::Agent> { + + }; + +}} \ No newline at end of file diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index f9682ccb933d46a388f64279aa30c8970909680a..ad20fe2ccf2fe13228f3af9d88718eb2890c7a59 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -21,7 +21,8 @@ add_executable (tapeResourceMangerTest tapeResourceManagerTest.cpp exception/strerror_r_wrapper.cpp threading/ChildProcess.cpp threading/Mutex.cpp - threading/Threading.cpp) + threading/Threading.cpp + utils/Timer.cpp) target_link_libraries(tapeResourceMangerTest protobuf diff --git a/objectstore/FIFO.hpp b/objectstore/FIFO.hpp index 36a999c1c135a02f4859b5dd1213a5963bfcc0b1..8a3d48035e35a579eaaac4e7f2e8231d923fc17f 100644 --- a/objectstore/FIFO.hpp +++ b/objectstore/FIFO.hpp @@ -6,11 +6,11 @@ namespace cta { namespace objectstore { -class FIFO: private ObjectOps<cta::objectstore::serializers::FIFO> { +class FIFO: private ObjectOps<serializers::FIFO> { public: FIFO(const std::string & name, Agent & agent): - ObjectOps<cta::objectstore::serializers::FIFO>(agent.objectStore(), name) { - cta::objectstore::serializers::FIFO fs; + ObjectOps<serializers::FIFO>(agent.objectStore(), name) { + serializers::FIFO fs; updateFromObjectStore(fs, agent.getFreeContext()); } @@ -20,6 +20,11 @@ private: } public: + class FIFOEmpty: public cta::exception::Exception { + public: + FIFOEmpty(const std::string & context): cta::exception::Exception(context) {} + }; + friend class Transaction; class Transaction { public: @@ -28,15 +33,26 @@ public: m_fifo.lock(m_ctx); } + ~Transaction() { + try { + if(!m_writeDone) + m_fifo.unlock(m_ctx); + } catch (...) {} + } + std::string peek() { if (m_writeDone) throw cta::exception::Exception("In FIFO::Transaction::peek: write already occurred"); + if (m_fifo.m_currentState.readpointer() >= m_fifo.m_currentState.name_size()) + throw FIFOEmpty("In FIFO::Transaction::peek: FIFO empty"); return m_fifo.m_currentState.name(m_fifo.m_currentState.readpointer()); } void popAndUnlock() { if (m_writeDone) throw cta::exception::Exception("In FIFO::Transaction::popAndUnlock: write already occurred"); + if (m_fifo.m_currentState.readpointer() >= m_fifo.m_currentState.name_size()) + throw FIFOEmpty("In FIFO::Transaction::popAndUnlock: FIFO empty"); m_fifo.m_currentState.set_readpointer(m_fifo.m_currentState.readpointer()+1); if (m_fifo.m_currentState.readpointer() > 100) { m_fifo.compactCurrentState(); @@ -56,7 +72,7 @@ public: } void push(std::string name, Agent & agent) { - cta::objectstore::serializers::FIFO fs; + serializers::FIFO fs; ContextHandle & context = agent.getFreeContext(); lockExclusiveAndRead(fs, context); fs.add_name(name); @@ -65,7 +81,7 @@ public: } std::string dump(Agent & agent) { - cta::objectstore::serializers::FIFO fs; + serializers::FIFO fs; updateFromObjectStore(fs, agent.getFreeContext()); std::stringstream ret; ret<< "<<<< FIFO dump start" << std::endl @@ -80,7 +96,7 @@ public: } private: - cta::objectstore::serializers::FIFO m_currentState; + serializers::FIFO m_currentState; void compactCurrentState() { uint64_t oldReadPointer = m_currentState.readpointer(); diff --git a/objectstore/JobPool.hpp b/objectstore/JobPool.hpp index 1fc8205dda6f3403b703bb3da48fb4f62ae09425..e63fd967a87825fbe8b7e6cf14c8d32b60f7d30f 100644 --- a/objectstore/JobPool.hpp +++ b/objectstore/JobPool.hpp @@ -6,11 +6,11 @@ namespace cta { namespace objectstore { -class JobPool: private ObjectOps<cta::objectstore::serializers::JobPool> { +class JobPool: private ObjectOps<serializers::JobPool> { public: JobPool(const std::string & name, Agent & agent): - ObjectOps<cta::objectstore::serializers::JobPool>(agent.objectStore(), name) { - cta::objectstore::serializers::JobPool jps; + ObjectOps<serializers::JobPool>(agent.objectStore(), name) { + serializers::JobPool jps; updateFromObjectStore(jps, agent.getFreeContext()); } @@ -25,7 +25,7 @@ public: }; std::string dump(Agent & agent) { - cta::objectstore::serializers::JobPool jps; + serializers::JobPool jps; updateFromObjectStore(jps, agent.getFreeContext()); std::stringstream ret; ret << "<<<< JobPool " << selfName() << " dump start" << std::endl @@ -37,7 +37,7 @@ public: std::string getRecallFIFO (Agent & agent) { // Check if the recall FIFO exists - cta::objectstore::serializers::JobPool res; + serializers::JobPool res; updateFromObjectStore(res, agent.getFreeContext()); // If the registry is defined, return it, job done. if (res.recall().size()) @@ -53,7 +53,7 @@ public: } catch (NotAllocatedEx &) { // If we get here, the job pool is not created yet, so we have to do it: // lock the entry again, for writing - cta::objectstore::serializers::JobPool res; + serializers::JobPool res; ContextHandle ctx = agent.getFreeContext(); lockExclusiveAndRead(res, ctx); // If the registry is already defined, somebody was faster. We're done. @@ -68,7 +68,7 @@ public: agent.addToIntend(selfName(), FIFOName, "recallFIFO"); // The potential object can now be garbage collected if we die from here. // Create the object, then lock. The name should be unique, so no race. - cta::objectstore::serializers::JobPool jps; + serializers::JobPool jps; jps.set_migration(""); jps.set_recall(""); writeChild(FIFOName, jps); diff --git a/objectstore/RecallJob.hpp b/objectstore/RecallJob.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5b8d355b187728afdca5221adbd4211ff2fe001d --- /dev/null +++ b/objectstore/RecallJob.hpp @@ -0,0 +1,44 @@ +#pragma once + +#include "ObjectOps.hpp" +#include "objectstore/cta.pb.h" + +namespace cta { namespace objectstore { + +class RecallJob: private ObjectOps<serializers::RecallJob> { +public: + static std::string create(const std::string & container, + const std::string & source, const std::string & destination, Agent & agent) { + serializers::RecallJob rjs; + rjs.set_owner(container); + rjs.set_source(source); + rjs.set_destination(destination); + std::string ret = agent.nextId("RecallJob"); + agent.addToIntend(container, ret, "RecallJob"); + agent.objectStore().atomicOverwrite(ret, rjs.SerializeAsString()); + } + + RecallJob(const std::string & name, Agent & agent): + ObjectOps<serializers::RecallJob>(agent.objectStore(), name){ + serializers::RecallJob rjs; + updateFromObjectStore(rjs, agent.getFreeContext()); + } + + void remove() { + ObjectOps<serializers::RecallJob>::remove(); + } + + std::string source(Agent & agent) { + serializers::RecallJob rjs; + updateFromObjectStore(rjs, agent.getFreeContext()); + return rjs.source(); + } + + std::string destination(Agent & agent) { + serializers::RecallJob rjs; + updateFromObjectStore(rjs, agent.getFreeContext()); + return rjs.destination(); + } +}; + +}} \ No newline at end of file diff --git a/objectstore/Register.hpp b/objectstore/Register.hpp index c6bec23cc86358882d7bbe6151d8713acf064717..28fd154c1f48b9b5ced0acd2002bcf0c5c97be0d 100644 --- a/objectstore/Register.hpp +++ b/objectstore/Register.hpp @@ -1,22 +1,22 @@ #pragma once #include "ObjectOps.hpp" -#include <algorithm> +#include "objectstore/cta.pb.h" namespace cta { namespace objectstore { -class Register: private ObjectOps<cta::objectstore::serializers::Register> { +class Register: private ObjectOps<serializers::Register> { public: Register(const std::string & name, Agent & agent): - ObjectOps<cta::objectstore::serializers::Register>(agent.objectStore(), name) { + ObjectOps<serializers::Register>(agent.objectStore(), name) { // Check that the entry is present and readable (depending on implementation // of object store, locking might or might not succeed) - cta::objectstore::serializers::Register rs; + serializers::Register rs; updateFromObjectStore(rs, agent.getFreeContext()); } void addElement (std::string name, ContextHandle & context) { - cta::objectstore::serializers::Register rs; + serializers::Register rs; lockExclusiveAndRead(rs, context); rs.add_elements(name); write(rs); @@ -24,7 +24,7 @@ public: } void removeElement (const std::string & name, ContextHandle & context) { - cta::objectstore::serializers::Register rs; + serializers::Register rs; lockExclusiveAndRead(rs, context); bool found; do { @@ -43,7 +43,7 @@ public: } std::list<std::string> getElements(Agent & agent) { - cta::objectstore::serializers::Register rs; + serializers::Register rs; updateFromObjectStore(rs, agent.getFreeContext()); std::list<std::string> ret; for (int i=0; i<rs.elements_size(); i++) { @@ -53,7 +53,7 @@ public: } std::string dump(const std::string & title, Agent & agent) { - cta::objectstore::serializers::Register rs; + serializers::Register rs; updateFromObjectStore(rs, agent.getFreeContext()); std::stringstream ret; ret<< "<<<< Register " << title << " dump start" << std::endl diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 7825d2944040b24bbf661d04b7b2199f051657a2..7ad9a24caf711032374b1df1f31bbf9ba9c39b30 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -9,22 +9,22 @@ void cta::objectstore::RootEntry::init(ObjectStore & os) { os.read(s_rootEntryName); throw cta::exception::Exception("In RootEntry::init: root entry already exists"); } catch (...) {} - cta::objectstore::serializers::RootEntry res; + serializers::RootEntry res; os.atomicOverwrite(s_rootEntryName, res.SerializeAsString()); } // construtor, when the backend store exists. // Checks the existence and correctness of the root entry cta::objectstore::RootEntry::RootEntry(Agent & agent): - ObjectOps<cta::objectstore::serializers::RootEntry>(agent.objectStore(), s_rootEntryName) { + ObjectOps<serializers::RootEntry>(agent.objectStore(), s_rootEntryName) { // Check that the root entry is readable. - cta::objectstore::serializers::RootEntry res; + serializers::RootEntry res; updateFromObjectStore(res, agent.getFreeContext()); } // Get the name of the agent register (or exception if not available) std::string cta::objectstore::RootEntry::getAgentRegister(Agent & agent) { // Check if the agent register exists - cta::objectstore::serializers::RootEntry res; + serializers::RootEntry res; updateFromObjectStore(res, agent.getFreeContext()); // If the registry is defined, return it, job done. if (res.agentregister().size()) @@ -40,7 +40,7 @@ std::string cta::objectstore::RootEntry::allocateOrGetAgentRegister(Agent & agen } catch (NotAllocatedEx &) { // If we get here, the agent register is not created yet, so we have to do it: // lock the entry again, for writing - cta::objectstore::serializers::RootEntry res; + serializers::RootEntry res; ContextHandle & context = agent.getFreeContext(); lockExclusiveAndRead(res, context); // If the registry is already defined, somebody was faster. We're done. @@ -57,7 +57,7 @@ std::string cta::objectstore::RootEntry::allocateOrGetAgentRegister(Agent & agen write(res); // The potential object can now be garbage collected if we die from here. // Create the object, then lock. The name should be unique, so no race. - cta::objectstore::serializers::Register ars; + serializers::Register ars; writeChild(arName, ars); // If we lived that far, we can update the root entry to point to our // new agent register, and remove the name from the intent log. @@ -73,7 +73,7 @@ std::string cta::objectstore::RootEntry::allocateOrGetAgentRegister(Agent & agen // Get the name of the JobPool (or exception if not available) std::string cta::objectstore::RootEntry::getJobPool(Agent & agent) { // Check if the job pool exists - cta::objectstore::serializers::RootEntry res; + serializers::RootEntry res; updateFromObjectStore(res, agent.getFreeContext()); // If the registry is defined, return it, job done. if (res.jobpool().size()) @@ -89,7 +89,7 @@ std::string cta::objectstore::RootEntry::allocateOrGetJobPool(Agent & agent) { } catch (NotAllocatedEx &) { // If we get here, the job pool is not created yet, so we have to do it: // lock the entry again, for writing - cta::objectstore::serializers::RootEntry res; + serializers::RootEntry res; ContextHandle & context = agent.getFreeContext(); lockExclusiveAndRead(res, context); // If the registry is already defined, somebody was faster. We're done. @@ -104,7 +104,7 @@ std::string cta::objectstore::RootEntry::allocateOrGetJobPool(Agent & agent) { agent.addToIntend(s_rootEntryName, jpName, "jobPool"); // The potential object can now be garbage collected if we die from here. // Create the object, then lock. The name should be unique, so no race. - cta::objectstore::serializers::JobPool jps; + serializers::JobPool jps; jps.set_migration(""); jps.set_recall(""); writeChild(jpName, jps); @@ -123,7 +123,7 @@ std::string cta::objectstore::RootEntry::allocateOrGetJobPool(Agent & agent) { // Dump the root entry std::string cta::objectstore::RootEntry::dump (Agent & agent) { std::stringstream ret; - cta::objectstore::serializers::RootEntry res; + serializers::RootEntry res; updateFromObjectStore(res, agent.getFreeContext()); ret << "<<<< Root entry dump start" << std::endl; if (res.has_agentregister()) ret << "agentRegister=" << res.agentregister() << std::endl; diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index 33cd623b20e90619f917cbda638d7e1ac00d1c10..5f3c8003f097b6e06afe67c8a0a1ad234f73cfa0 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -8,7 +8,7 @@ namespace cta { namespace objectstore { -class RootEntry: private ObjectOps<cta::objectstore::serializers::RootEntry> { +class RootEntry: private ObjectOps<serializers::RootEntry> { public: // Initializer. static void init(ObjectStore & os); diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 515f99de841e0f62b5c909c400930ceee61791af..3482e8ea3938c5d26cd69f4c95800295dadbb0c2 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -48,6 +48,7 @@ message Agent { required string name = 1000; repeated ObjectCreationIntent creationintent = 1001; repeated ObjectOwnershipIntent ownershipintent = 1002; + required int64 heartbeatcount = 1003; } // The tape record diff --git a/objectstore/tapeResourceManagerTest.cpp b/objectstore/tapeResourceManagerTest.cpp index 22881f35b5eb895f57614244d1630a5007a86825..4e62b37756237430aa1f745a8f3a7556ee45a71e 100644 --- a/objectstore/tapeResourceManagerTest.cpp +++ b/objectstore/tapeResourceManagerTest.cpp @@ -10,6 +10,8 @@ #include "ObjectStructureDumper.hpp" #include "JobPool.hpp" #include "AgentRegister.hpp" +#include "RecallJob.hpp" +#include "Register.hpp" diff --git a/objectstore/utils/Timer.cpp b/objectstore/utils/Timer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d2c9c49006b45ebda5d806072d7f2885d103c973 --- /dev/null +++ b/objectstore/utils/Timer.cpp @@ -0,0 +1,60 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * This little class allows to easily time some piece of code + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#include "Timer.hpp" + +//------------------------------------------------------------------------------ +// constructor +//------------------------------------------------------------------------------ +cta::utils::Timer::Timer() { + reset(); +} + +//------------------------------------------------------------------------------ +// usecs +//------------------------------------------------------------------------------ +int64_t cta::utils::Timer::usecs(reset_t reset) { + timeval now; + gettimeofday(&now, 0); + int64_t ret = ((now.tv_sec * 1000000) + now.tv_usec) - + ((m_reference.tv_sec * 1000000) + m_reference.tv_usec); + if (reset == resetCounter) { + m_reference = now; + } + return ret; +} + +//------------------------------------------------------------------------------ +// secs +//------------------------------------------------------------------------------ +double cta::utils::Timer::secs(reset_t reset) { + return usecs(reset) * 0.000001; +} + +//------------------------------------------------------------------------------ +// reset +//------------------------------------------------------------------------------ +void cta::utils::Timer::reset() { + gettimeofday(&m_reference, 0); +} + diff --git a/objectstore/utils/Timer.hpp b/objectstore/utils/Timer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..7cfe36d05ab2a10767bdfd865d52d116fa9d4054 --- /dev/null +++ b/objectstore/utils/Timer.hpp @@ -0,0 +1,85 @@ +/****************************************************************************** + * + * This file is part of the Castor project. + * See http://castor.web.cern.ch/castor + * + * Copyright (C) 2003 CERN + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * This little class allows to easily time some piece of code + * + * @author Castor Dev team, castor-dev@cern.ch + *****************************************************************************/ + +#pragma once + +#include <sys/time.h> +#include <sys/types.h> + +namespace cta { +namespace utils { + +/** + * A small timing class. + * It basically remembers a reference time (by default the time of + * its construction) and gives the elapsed time since then. + * The reset method allows to reset the reference time to the current time + */ +class Timer { + +public: + + enum reset_t { + keepRunning, + resetCounter + }; + + /** + * Constructor. + */ + Timer(); + + /** + * Destructor. + */ + virtual ~Timer() {} + + /** + * Gives elapsed time in microseconds with respect to the reference time + * optionally resets the counter. + */ + int64_t usecs(reset_t reset = keepRunning); + + /** + * Gives elapsed time in seconds (with microsecond precision) + * with respect to the reference time. Optionally resets the counter. + */ + double secs(reset_t reset = keepRunning); + + /** + * Resets the Timer reference's time to the current time. + */ + void reset(); + +private: + + /** + * Reference time for this timeri + */ + timeval m_reference; + +}; // class Timer + +} // namespace utils +} // namespace cta +