diff --git a/objectstore/Agent.cpp b/objectstore/Agent.cpp index 950d557b22178d18602fc13dca96d974acece77b..53f955309b6408984b8d0d5f321b06435c8f39e7 100644 --- a/objectstore/Agent.cpp +++ b/objectstore/Agent.cpp @@ -16,6 +16,14 @@ cta::objectstore::Agent::Agent(Backend & os): cta::objectstore::Agent::Agent(const std::string & name, Backend & os): ObjectOps<serializers::Agent>(os, name), m_nextId(0) {} +void cta::objectstore::Agent::initialize() { + ObjectOps<serializers::Agent>::initialize(); + m_payload.set_heartbeat(0); + m_payload.set_timeout_us(60*1000*1000); + m_payload.set_description(""); + m_payloadInterpreted = true; +} + void cta::objectstore::Agent::generateName(const std::string & typeName) { std::stringstream aid; // Get time diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp index 37461dfd6ea0c0c0feb3b1cdfd113538bf0dff98..34b44158e3ef83aee4110d5ab47548d2fb59a2fa 100644 --- a/objectstore/Agent.hpp +++ b/objectstore/Agent.hpp @@ -23,6 +23,8 @@ public: Agent(const std::string & name, Backend & os); + void initialize(); + void generateName(const std::string & typeName); std::string nextId(const std::string & childType); @@ -116,6 +118,7 @@ public: // If not, it is a dangling pointer. Pop and continue if (!object.exists()) { container.pop(); + container.commit(); continue; } // Try to lock the object. Exception will be let through @@ -126,6 +129,7 @@ public: if (container.getNameIfSet() != object.getOwner()) { objLock.release(); container.pop(); + container.commit(); continue; } // If we get here, then we can proceed with the ownership transfer @@ -141,7 +145,9 @@ public: object.setBackupOwner(container.getNameIfSet()); // Commit the object object.commit(); - // Job done. + // And remove the now dangling pointer from the container + container.pop(); + container.commit(); return; } } diff --git a/objectstore/BackendTest.cpp b/objectstore/BackendTest.cpp index 0a73e3983c4429ae7b912cae41a059c997000fad..7f8b1062fdc6e4321b54853b844ace45952419a6 100644 --- a/objectstore/BackendTest.cpp +++ b/objectstore/BackendTest.cpp @@ -57,7 +57,7 @@ TEST_P(BackendAbstractTest, ParametersInterface) { } cta::objectstore::BackendVFS osVFS; -#define TEST_RADOS 1 +#define TEST_RADOS 0 #if TEST_RADOS cta::objectstore::BackendRados osRados("tapetest", "tapetest"); INSTANTIATE_TEST_CASE_P(BackendTest, BackendAbstractTest, ::testing::Values(&osVFS, &osRados)); diff --git a/objectstore/FIFO.cpp b/objectstore/FIFO.cpp index 757a9234cdedd8103c2d028945ba0cfddd94e086..c5e80dbffb78f855aaec3fa3fde822fc1650c72a 100644 --- a/objectstore/FIFO.cpp +++ b/objectstore/FIFO.cpp @@ -60,7 +60,8 @@ void cta::objectstore::FIFO::push(std::string name) { void cta::objectstore::FIFO::pushIfNotPresent(std::string name) { checkPayloadWritable(); try { - serializers::findString(m_payload.mutable_name(), name); + serializers::findStringFrom(m_payload.mutable_name(), m_payload.readpointer(), + name); } catch (serializers::NotFound &) { m_payload.add_name(name); } diff --git a/objectstore/FIFO.hpp b/objectstore/FIFO.hpp index b4efbbbaf4bf0120f6724893e40ec9927ea88adb..0b7ad72074a130b7f50d0ac5a9b86dac18c00ebd 100644 --- a/objectstore/FIFO.hpp +++ b/objectstore/FIFO.hpp @@ -9,6 +9,7 @@ namespace cta { namespace objectstore { class FIFO: public ObjectOps<serializers::FIFO> { public: + FIFO(Backend & os): ObjectOps<serializers::FIFO>(os) {} FIFO(const std::string & name, Backend & os): ObjectOps<serializers::FIFO>(os, name) {} diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index b13259e9f47be6433fd3bc5874cf0866972032ea..754f13ec97a5af80b5aee4cff4e399df64788c6c 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -16,6 +16,10 @@ GarbageCollector::GarbageCollector(Backend & os, Agent & agent): ScopedSharedLock arLock(m_agentRegister); m_agentRegister.fetch(); } + +void GarbageCollector::setTimeout(double timeout) { + m_timeout = timeout; +} void GarbageCollector::runOnePass() { // Bump our own heart beat @@ -60,8 +64,8 @@ void GarbageCollector::aquireTargets() { std::list<std::string>::const_iterator c = candidatesList.begin(); // We can now take ownership of new agents, up to our max... // and we don't monitor ourselves! - while (m_watchedAgents.size() < c_maxWatchedAgentsPerGC - && c!=candidatesList.end()) { + for (;m_watchedAgents.size() < c_maxWatchedAgentsPerGC + && c!=candidatesList.end(); c++) { // We don't monitor ourselves if (*c != m_ourAgent.getNameIfSet()) { // So we have a candidate we might want to monitor @@ -97,9 +101,16 @@ void GarbageCollector::aquireTargets() { // (we hold an exclusive lock all along) m_agentRegister.trackAgent(ag.getNameIfSet()); m_agentRegister.commit(); - // Agent is now officially ours, let's track it - m_watchedAgents[ag.getNameIfSet()] = - new AgentWatchdog(ag.getNameIfSet(), m_objectStore); + // Agent is officially our, we can remove it from the untracked agent's + // list + m_agentRegister.trackAgent(ag.getNameIfSet()); + // Agent is now officially ours, let's track it. We have the release the + // lock to the agent before constructing the watchdog, which builds + // its own agent objects (and need to lock the object store representation) + std::string agentName = ag.getNameIfSet(); + agLock.release(); + m_watchedAgents[agentName] = + new AgentWatchdog(agentName, m_objectStore); m_watchedAgents[ag.getNameIfSet()]->setTimeout(m_timeout); } } @@ -157,8 +168,9 @@ void GarbageCollector::checkHeartbeats() { } ScopedSharedLock gContLock(gContainter); gContainter.fetch(); + serializers::ObjectType containerType = gContainter.type(); gContLock.release(); - switch(gContainter.type()) { + switch(containerType) { case serializers::FIFO_t: { FIFO fifo(go.getBackupOwner(), m_objectStore); ScopedExclusiveLock ffLock(fifo); @@ -170,19 +182,15 @@ void GarbageCollector::checkHeartbeats() { go.commit(); ffLock.release(); goLock.release(); + break; } default: { throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: unexpected container type!"); } } - // We now processed all the owned objects. We can delete the agent's entry - agent.remove(); - // And remove the (dangling) pointers to it - ScopedExclusiveLock arLock(m_agentRegister); - m_agentRegister.fetch(); - m_agentRegister.removeAgent(name); - m_agentRegister.commit(); } + // We now processed all the owned objects. We can delete the agent's entry + agent.deleteAndUnregisterSelf(); } diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index d9de5fe0ee35f1bab4593e0efbcb23d118c53dcf..d5bf4ef1c54b71a006091aa06d47d5b7f098f349 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -21,21 +21,19 @@ TEST(GarbageCollector, BasicFuctionnality) { cta::objectstore::Agent agA(be), agB(be); agA.initialize(); agA.generateName("unitTestAgentA"); - agA.insert(); agA.insertAndRegisterSelf(); agB.initialize(); agB.generateName("unitTestAgentB"); - agB.insert(); agB.insertAndRegisterSelf(); // Create target FIFO std::string fifoName = agent.nextId("FIFO"); std::list<std::string> expectedData; - { - // Try to create the FIFO entry - cta::objectstore::FIFO ff(fifoName,be); - ff.initialize(); - ff.insert(); - } + // Try to create the FIFO entry + cta::objectstore::FIFO ff(fifoName,be); + ff.initialize(); + ff.insert(); + // And lock it for later + cta::objectstore::ScopedExclusiveLock ffLock; { for (int i=0; i<100; i++) { // We create FIFOs here, but any object can do. @@ -45,7 +43,7 @@ TEST(GarbageCollector, BasicFuctionnality) { cta::objectstore::FIFO centralFifo(fifoName, be); cta::objectstore::ScopedExclusiveLock lock(centralFifo); centralFifo.fetch(); - expectedData.push_back(agent.nextId("TestData")); + expectedData.push_back(newFIFO.getNameIfSet()); centralFifo.push(expectedData.back()); centralFifo.commit(); lock.release(); @@ -56,9 +54,30 @@ TEST(GarbageCollector, BasicFuctionnality) { newFIFO.insert(); } } + ffLock.lock(ff); + ff.fetch(); + ASSERT_EQ(100, ff.size()); + ffLock.release(); for (int i=0; i<10; i++) { - + cta::objectstore::ScopedExclusiveLock objALock, objBLock; + cta::objectstore::FIFO objA(be), objB(be); + agA.popFromContainer(ff, objA, objALock); + agB.popFromContainer(ff, objB, objBLock); } - // TODO: take ownership of FIFO contents in agA and agB, and then garbage collect. - // The FIFO should get all its objects back. + ffLock.lock(ff); + ff.fetch(); + ASSERT_EQ(80, ff.size()); + ffLock.release(); + // Create the garbage colletor and run it twice. + cta::objectstore::Agent gcAgent(be); + gcAgent.initialize(); + gcAgent.generateName("unitTestGarbageCollector"); + gcAgent.insertAndRegisterSelf(); + cta::objectstore::GarbageCollector gc(be, gcAgent); + gc.setTimeout(0); + gc.runOnePass(); + gc.runOnePass(); + ffLock.lock(ff); + ff.fetch(); + ASSERT_EQ(100, ff.size()); } \ No newline at end of file diff --git a/objectstore/GenericObject.hpp b/objectstore/GenericObject.hpp index 78b7a4aefdc1d4ab0dfb32575d53da21f009cdab..66a555c2c9ca15dc59b4f78082b66b9972a2fbf2 100644 --- a/objectstore/GenericObject.hpp +++ b/objectstore/GenericObject.hpp @@ -8,7 +8,7 @@ namespace cta { namespace objectstore { class GenericObject: public ObjectOps<serializers::GenericObject> { public: GenericObject(const std::string & name, Backend & os): - ObjectOps<serializers::GenericObject>(os) {}; + ObjectOps<serializers::GenericObject>(os, name) {}; class ForbiddenOperation: public cta::exception::Exception { public: diff --git a/objectstore/ProtcolBuffersAlgorithms.hpp b/objectstore/ProtcolBuffersAlgorithms.hpp index 3993dfba6b9d32c6abda7de2c4125a9964fd5a80..1e4f89521fef03e6a4d4e3a3a9c683981c91c99c 100644 --- a/objectstore/ProtcolBuffersAlgorithms.hpp +++ b/objectstore/ProtcolBuffersAlgorithms.hpp @@ -15,4 +15,7 @@ class NotFound: public cta::exception::Exception { size_t findString(::google::protobuf::RepeatedPtrField< ::std::string>* field, const std::string & value); +size_t findStringFrom(::google::protobuf::RepeatedPtrField< ::std::string>* field, + size_t fromIndex, const std::string & value); + }}} \ No newline at end of file diff --git a/objectstore/ProtocolBuffersAlgorithms.cpp b/objectstore/ProtocolBuffersAlgorithms.cpp index 9117b8a1cf1e791f50137ef9092e901c2ddf07fa..42a4915083f2369e0befb1c7f1dc3b870d4656b8 100644 --- a/objectstore/ProtocolBuffersAlgorithms.cpp +++ b/objectstore/ProtocolBuffersAlgorithms.cpp @@ -25,4 +25,15 @@ size_t cta::objectstore::serializers::findString( } } throw NotFound("In cta::objectstore::serializers::findString: string not found"); +} + +size_t cta::objectstore::serializers::findStringFrom( + ::google::protobuf::RepeatedPtrField< ::std::string>* field, size_t fromIndex, + const std::string& value) { + for (size_t i=fromIndex; i<(size_t)field->size(); i++) { + if (value == field->Get(i)) { + return i; + } + } + throw NotFound("In cta::objectstore::serializers::findString: string not found"); } \ No newline at end of file