diff --git a/objectstore/Action.hpp b/objectstore/Action.hpp index ebd3e65f3e9c728cf8023bd54d0f8cbeedb3c48c..e03ed60ecd14c7c3eebe0ee863d13ad5e69a0baa 100644 --- a/objectstore/Action.hpp +++ b/objectstore/Action.hpp @@ -72,21 +72,26 @@ public: // Pop a job from the FIFO FIFO::Transaction tr = fifo.startTransaction(m_agent); std::string rjName = tr.peek(); - m_agent.addToOwnership(tr.peek(), serializers::RecallJob_t); + m_agent.addToOwnership(rjName, serializers::RecallJob_t); tr.popAndUnlock(); RecallJob rj(rjName, m_agent); // Sleep on it for a while usleep(100 * 1000); // Log the deletion std::cout << "RecallJob " << rj.source(m_agent) << " => " - << rj.destination(m_agent) << " is done" << std::endl; + << rj.destination(m_agent) << " is done by recaller " + << m_number << " (tid=" << syscall(SYS_gettid) << ")" + << std::endl; rj.remove(); + m_agent.removeFromOwnership(rjName, serializers::RecallJob_t); } catch (FIFO::FIFOEmpty &) { cta::utils::Timer timeout; + bool somethingMore = false; while (timeout.secs() < 1.0) { try { if (fifo.size(m_agent) > 0) { - continue; + somethingMore = true; + break; } else { usleep (100 * 1000); } @@ -94,6 +99,7 @@ public: throw; } catch (...) {} } + if (somethingMore) continue; std::cout << name.str() << " complete: FIFO empty" << std::endl; break; } @@ -141,21 +147,30 @@ public: i++; } } - // On a second pass, check that we are acquaint with all processes + // On a second pass, check that we are acquainted with all processes for (std::list<std::string>::iterator i=agentNames.begin(); i != agentNames.end(); i++) { if(watchdogs.find(*i) == watchdogs.end()) { - watchdogs[*i] = new AgentWatchdog(*i, m_agent); + try { + watchdogs[*i] = new AgentWatchdog(*i, m_agent); + } catch ( cta::exception::Exception & ) {} } } // And now check the heartbeats of the agents for (std::map<std::string, AgentWatchdog *>::iterator i=watchdogs.begin(); i != watchdogs.end();) { - if (!i->second->checkAlive(m_agent)) { - collectGarbage(i->first); + try { + if (!i->second->checkAlive(m_agent)) { + collectGarbage(i->first); + delete i->second; + // The post-increment returns the iterator to erase, + // but sets i to the next value, which will remain valid. + watchdogs.erase(i++); + } else { + i++; + } + } catch ( cta::exception::Exception & ) { delete i->second; - // The post-increment returns the iterator to erase, - // but sets i to the next value, which will remain valid. watchdogs.erase(i++); } } @@ -169,7 +184,18 @@ private: // intended and owned objects, validate that they still exist, are owned by // the dead agent, and re-post them to the container where they should be. try { + std::cout << "In garbage collector, found a dead agent: " << agentName << std::endl; // If the agent entry does not exist anymore, we're done. + // print the recall FIFO + std::cout << "FIFO before garbage collection:" << std::endl; + try { + RootEntry re(m_agent); + JobPool jp(re.getJobPool(m_agent), m_agent); + FIFO rf(jp.getRecallFIFO(m_agent), m_agent); + std::cout << rf.dump(m_agent) << std::endl; + } catch (abi::__forced_unwind&) { + throw; + } catch (...) {} AgentVisitor ag(agentName, m_agent); std::list<AgentVisitor::intentEntry> intendedObjects = ag.getIntentLog(m_agent); for (std::list<AgentVisitor::intentEntry>::iterator i=intendedObjects.begin(); @@ -243,6 +269,16 @@ private: break; } } + // print the recall FIFO + std::cout << "FIFO before garbage collection:" << std::endl; + try { + RootEntry re(m_agent); + JobPool jp(re.getJobPool(m_agent), m_agent); + FIFO rf(jp.getRecallFIFO(m_agent), m_agent); + std::cout << rf.dump(m_agent) << std::endl; + } catch (abi::__forced_unwind&) { + throw; + } catch (...) {} } catch (abi::__forced_unwind&) { throw; } catch (...) {} diff --git a/objectstore/Agent.cpp b/objectstore/Agent.cpp index 187080db8791b4f2a73d8079eac44cd503cf22fe..d13c828733cb2aca763cc15612f07d21589356dd 100644 --- a/objectstore/Agent.cpp +++ b/objectstore/Agent.cpp @@ -66,6 +66,12 @@ std::string cta::objectstore::Agent::name() { return selfName(); } +void cta::objectstore::Agent::flushContexts() { + for(size_t i=0; i< c_handleCount; i++) { + m_contexts[i].release(); + } +} + cta::objectstore::Agent::~Agent() { for (size_t i=0; i < c_handleCount; i++) { m_contexts[i].release(); @@ -92,7 +98,7 @@ std::string cta::objectstore::Agent::nextId(const std::string & childType) { return id.str(); } -cta::objectstore::ContextHandleImplementation<myOS> & +cta::objectstore::ContextHandle & cta::objectstore::Agent::getFreeContext() { for (size_t i=0; i < c_handleCount; i++) { if (!m_contexts[i].isSet()) @@ -167,8 +173,8 @@ void cta::objectstore::Agent::removeFromOwnership(std::string name, serializers: if (name == as.mutable_ownershipintent(i)->name() && objectType == as.mutable_ownershipintent(i)->type()) { found = true; - as.mutable_creationintent()->SwapElements(i, as.ownershipintent_size()-1); - as.mutable_creationintent()->RemoveLast(); + as.mutable_ownershipintent()->SwapElements(i, as.ownershipintent_size()-1); + as.mutable_ownershipintent()->RemoveLast(); break; } } diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp index 98e28c79851b3623f8173ed04c4475b4c7909a1c..9848aac2204ce27acfa6b35f0d973f3a4fc2c296 100644 --- a/objectstore/Agent.hpp +++ b/objectstore/Agent.hpp @@ -2,7 +2,6 @@ #include "ObjectStoreChoice.hpp" #include "ObjectOps.hpp" -#include "ContextHandle.hpp" #include "objectstore/cta.pb.h" #include "utils/Timer.hpp" @@ -47,11 +46,13 @@ public: std::string name(); + void flushContexts(); + ~Agent(); std::string nextId(const std::string & childType); - ContextHandleImplementation<myOS> & getFreeContext(); + ContextHandle & getFreeContext(); void addToIntend (std::string container, std::string name, serializers::ObjectType objectType); @@ -98,7 +99,7 @@ private: bool m_observerVersion; uint64_t m_nextId; static const size_t c_handleCount = 100; - ContextHandleImplementation<myOS> m_contexts[c_handleCount]; + ContextHandle m_contexts[c_handleCount]; }; }} \ No newline at end of file diff --git a/objectstore/AgentVisitor.hpp b/objectstore/AgentVisitor.hpp index f9d715177087535ed7d9a36f2e5816a8e7804405..4846169c3c8e2af244235b03113a82b15b6f49e6 100644 --- a/objectstore/AgentVisitor.hpp +++ b/objectstore/AgentVisitor.hpp @@ -150,7 +150,7 @@ public: bool checkAlive(Agent & agent) { uint64_t newHeartBeatCount = m_agentVisitor.getHeartbeatCount(agent); - if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 0.1) + if (newHeartBeatCount == m_hearbeatCounter && m_timer.secs() > 120) return false; m_hearbeatCounter = newHeartBeatCount; return true; diff --git a/objectstore/ContextHandle.hpp b/objectstore/ContextHandle.hpp deleted file mode 100644 index 8a474781e97a935737effdbbb16a497fc52ed0f3..0000000000000000000000000000000000000000 --- a/objectstore/ContextHandle.hpp +++ /dev/null @@ -1,40 +0,0 @@ -#pragma once - -#include "ObjectStores.hpp" - -namespace cta { namespace objectstore { - -template <class C> -class ContextHandleImplementation: public ContextHandle {}; - -template <> -class ContextHandleImplementation <ObjectStoreVFS>: public ContextHandle { -public: - ContextHandleImplementation(): m_set(false), m_fd(-1) {} - virtual void set(int fd) { m_set = true; m_fd = fd; __sync_synchronize(); } - virtual int get(int) { return m_fd; } - virtual void reset() { m_set = false; m_fd = -1; __sync_synchronize(); } - virtual bool isSet() { return m_set; } - virtual void release() { __sync_synchronize(); if (m_set && -1 != m_fd) ::close(m_fd); reset(); } - virtual ~ContextHandleImplementation() {} -private: - bool m_set; - volatile int m_fd; -}; - - -template <> -class ContextHandleImplementation <ObjectStoreRados>: public ContextHandle { -public: - ContextHandleImplementation(): m_set(false) {} - virtual void set(int) { m_set = true; } - virtual int get(int) { return 0; } - virtual void reset() { m_set = false; } - virtual bool isSet() { return m_set; } - virtual void release() { reset(); } - virtual ~ContextHandleImplementation() {} -private: - bool m_set; -}; - -}} \ No newline at end of file diff --git a/objectstore/FIFO.hpp b/objectstore/FIFO.hpp index c68a958603a34298c6051864c6d30ca3d9e1e926..b606a773d1a193ceebfbd8b8236a3b1f74a545e3 100644 --- a/objectstore/FIFO.hpp +++ b/objectstore/FIFO.hpp @@ -103,7 +103,8 @@ public: uint64_t size(Agent & agent) { serializers::FIFO fs; updateFromObjectStore(fs, agent.getFreeContext()); - return fs.name_size(); + uint64_t ret = fs.name_size() - fs.readpointer(); + return ret; } private: diff --git a/objectstore/ObjectStores.hpp b/objectstore/ObjectStores.hpp index 51c470455df40517f91e3faa30f77a5c2273e4e2..430d2e64b60292a3155b1da4ebe01d5a7753dd12 100644 --- a/objectstore/ObjectStores.hpp +++ b/objectstore/ObjectStores.hpp @@ -17,12 +17,18 @@ namespace cta { namespace objectstore { class ContextHandle { public: - virtual void set(int) = 0; - virtual int get(int) = 0; - virtual void reset() = 0; - virtual bool isSet() = 0; - virtual void release() = 0; - virtual ~ContextHandle() {}; + ContextHandle(): m_set(false), m_hasFd(false), m_fd(-1) {} + void set(int fd) { m_set = true; m_fd = fd; m_hasFd = true; __sync_synchronize(); } + void set() { m_set = true; } + int get(int) { return m_fd; } + void reset() { m_set = false; m_fd = -1; m_hasFd = false; __sync_synchronize(); } + bool isSet() { return m_set; } + void release() { if (!m_hasFd) return; __sync_synchronize(); if (m_set && -1 != m_fd) ::close(m_fd); reset(); } + ~ContextHandle() { release(); } +private: + bool m_set; + bool m_hasFd; + volatile int m_fd; }; class ObjectStore { @@ -117,12 +123,12 @@ public: virtual void remove(std::string name) { std::string path = m_root+"/" + name; - cta::exception::Errnum::throwOnNonZero(unlink(name.c_str())); + cta::exception::Errnum::throwOnNonZero(unlink(path.c_str())); } void lockHelper(std::string name, ContextHandle & context, int type) { - std::string path = m_root + "/" + name + ".lock"; - context.set(::open(path.c_str(), O_CREAT, S_IRWXU)); + std::string path = m_root + "/" + name; + context.set(::open(path.c_str(), O_RDONLY, S_IRWXU)); cta::exception::Errnum::throwOnMinusOne(context.get(0), "In ObjectStoreVFS::lockHelper, failed to open the lock file."); cta::exception::Errnum::throwOnMinusOne(::flock(context.get(0), LOCK_EX), @@ -245,6 +251,7 @@ public: cta::exception::Errnum::throwOnReturnedErrno(-rc, std::string("In ObjectStoreRados::lockExclusive, failed to librados::IoCtx::lock_exclusive: ")+ name + "/" + "lock" + "/" + client.str() + "//"); + context.set(); std::cout << "LockedExclusive: " << name << "/" << "lock" << "/" << client.str() << "//@" << where << std::endl; } @@ -267,6 +274,7 @@ public: cta::exception::Errnum::throwOnReturnedErrno(-rc, std::string("In ObjectStoreRados::lockShared, failed to librados::IoCtx::lock_shared: ")+ name + "/" + "lock" + "/" + client.str() + "//"); + context.set(); std::cout << "LockedShared: " << name << "/" << "lock" << "/" << client.str() << "//@" << where << std::endl; } @@ -282,6 +290,7 @@ public: -m_radosCtx.unlock(name, "lock", client.str()), std::string("In ObjectStoreRados::lockExclusive, failed to lock_exclusive ")+ name); + context.reset(); std::cout << "Unlocked: " << name << "/" << "lock" << "/" << client.str() << "//@" << where << std::endl; } diff --git a/objectstore/tapeResourceManagerTest.cpp b/objectstore/tapeResourceManagerTest.cpp index bd749f4d01969d4be9ef8f4098c220e712defa8f..c4d4ffb59f5f116327f32c1fa0f25758781d940d 100644 --- a/objectstore/tapeResourceManagerTest.cpp +++ b/objectstore/tapeResourceManagerTest.cpp @@ -6,7 +6,6 @@ #include "ObjectStoreChoice.hpp" #include "RootEntry.hpp" #include "Action.hpp" -#include "ContextHandle.hpp" #include "ObjectStructureDumper.hpp" #include "JobPool.hpp" #include "AgentRegister.hpp" @@ -76,7 +75,7 @@ int main(void){ // Create our own agent representation cta::objectstore::Agent self(os, "masterProcess"); self.create(); - cta::objectstore::ContextHandleImplementation<myOS> ctx; + cta::objectstore::ContextHandle ctx; // Dump the structure cta::objectstore::ObjectStrucutreDumper osd; std::cout << osd.dump(self) << std::endl; @@ -158,7 +157,11 @@ int main(void){ // recallJobs.push_back(new jobExecutor(*recallActions[i])); // recallJobs.back()->start(dc); // } - // wait for completion of all processes + // wait for FIFO to be empty + while (recallFIFO.size(self)) { + usleep (100 * 1000); + } + while (recallJobs.size()) { // Wait for completion the job recallJobs.back()->wait(); @@ -173,7 +176,9 @@ int main(void){ } injectorProcess.wait(); + gcProcess.kill(); gcProcess.wait(); + gcAgent.flushContexts(); // And see the state or affairs std::cout << osd.dump(self) << std::endl;