From b0b0b82853f500657ba3162fe7bc20c547766b81 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Mon, 2 Mar 2015 19:22:02 +0100
Subject: [PATCH] Completed the garbage collection unit test.

---
 objectstore/Agent.cpp                     |  8 +++++
 objectstore/Agent.hpp                     |  8 ++++-
 objectstore/BackendTest.cpp               |  2 +-
 objectstore/FIFO.cpp                      |  3 +-
 objectstore/FIFO.hpp                      |  1 +
 objectstore/GarbageCollector.cpp          | 34 +++++++++++-------
 objectstore/GarbageCollectorTest.cpp      | 43 ++++++++++++++++-------
 objectstore/GenericObject.hpp             |  2 +-
 objectstore/ProtcolBuffersAlgorithms.hpp  |  3 ++
 objectstore/ProtocolBuffersAlgorithms.cpp | 11 ++++++
 10 files changed, 86 insertions(+), 29 deletions(-)

diff --git a/objectstore/Agent.cpp b/objectstore/Agent.cpp
index 950d557b22..53f955309b 100644
--- a/objectstore/Agent.cpp
+++ b/objectstore/Agent.cpp
@@ -16,6 +16,14 @@ cta::objectstore::Agent::Agent(Backend & os):
 cta::objectstore::Agent::Agent(const std::string & name, Backend & os): 
   ObjectOps<serializers::Agent>(os, name), m_nextId(0) {}
 
+void cta::objectstore::Agent::initialize() {
+  ObjectOps<serializers::Agent>::initialize();
+  m_payload.set_heartbeat(0);
+  m_payload.set_timeout_us(60*1000*1000);
+  m_payload.set_description("");
+  m_payloadInterpreted = true;
+}
+
 void cta::objectstore::Agent::generateName(const std::string & typeName) {
   std::stringstream aid;
   // Get time
diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp
index 37461dfd6e..34b44158e3 100644
--- a/objectstore/Agent.hpp
+++ b/objectstore/Agent.hpp
@@ -23,6 +23,8 @@ public:
   
   Agent(const std::string & name, Backend & os);
 
+  void initialize();
+
   void generateName(const std::string & typeName);
   
   std::string nextId(const std::string & childType);
@@ -116,6 +118,7 @@ public:
       // If not, it is a dangling pointer. Pop and continue
       if (!object.exists()) {
         container.pop();
+        container.commit();
         continue;
       }
       // Try to lock the object. Exception will be let through
@@ -126,6 +129,7 @@ public:
       if (container.getNameIfSet() != object.getOwner()) {
         objLock.release();
         container.pop();
+        container.commit();
         continue;
       }
       // If we get here, then we can proceed with the ownership transfer
@@ -141,7 +145,9 @@ public:
       object.setBackupOwner(container.getNameIfSet());
       // Commit the object
       object.commit();
-      // Job done.
+      // And remove the now dangling pointer from the container
+      container.pop();
+      container.commit();
       return;
     }
   }
diff --git a/objectstore/BackendTest.cpp b/objectstore/BackendTest.cpp
index 0a73e3983c..7f8b1062fd 100644
--- a/objectstore/BackendTest.cpp
+++ b/objectstore/BackendTest.cpp
@@ -57,7 +57,7 @@ TEST_P(BackendAbstractTest, ParametersInterface) {
 }
 
 cta::objectstore::BackendVFS osVFS;
-#define TEST_RADOS 1
+#define TEST_RADOS 0
 #if TEST_RADOS
 cta::objectstore::BackendRados osRados("tapetest", "tapetest");
 INSTANTIATE_TEST_CASE_P(BackendTest, BackendAbstractTest, ::testing::Values(&osVFS, &osRados));
diff --git a/objectstore/FIFO.cpp b/objectstore/FIFO.cpp
index 757a9234cd..c5e80dbffb 100644
--- a/objectstore/FIFO.cpp
+++ b/objectstore/FIFO.cpp
@@ -60,7 +60,8 @@ void cta::objectstore::FIFO::push(std::string name) {
 void cta::objectstore::FIFO::pushIfNotPresent(std::string name) {
   checkPayloadWritable();
   try {
-    serializers::findString(m_payload.mutable_name(), name);
+    serializers::findStringFrom(m_payload.mutable_name(), m_payload.readpointer(),
+      name);
   } catch (serializers::NotFound &) {
     m_payload.add_name(name);
   }
diff --git a/objectstore/FIFO.hpp b/objectstore/FIFO.hpp
index b4efbbbaf4..0b7ad72074 100644
--- a/objectstore/FIFO.hpp
+++ b/objectstore/FIFO.hpp
@@ -9,6 +9,7 @@ namespace cta { namespace objectstore {
 
 class FIFO: public ObjectOps<serializers::FIFO> {
 public:
+  FIFO(Backend & os): ObjectOps<serializers::FIFO>(os) {}
   FIFO(const std::string & name, Backend & os):
   ObjectOps<serializers::FIFO>(os, name) {}
  
diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp
index b13259e9f4..754f13ec97 100644
--- a/objectstore/GarbageCollector.cpp
+++ b/objectstore/GarbageCollector.cpp
@@ -16,6 +16,10 @@ GarbageCollector::GarbageCollector(Backend & os, Agent & agent):
   ScopedSharedLock arLock(m_agentRegister);
   m_agentRegister.fetch();
 }
+
+void GarbageCollector::setTimeout(double timeout) {
+  m_timeout = timeout;
+}
   
 void GarbageCollector::runOnePass() {
   // Bump our own heart beat
@@ -60,8 +64,8 @@ void GarbageCollector::aquireTargets() {
   std::list<std::string>::const_iterator c = candidatesList.begin();
   // We can now take ownership of new agents, up to our max...
   // and we don't monitor ourselves!
-  while (m_watchedAgents.size() < c_maxWatchedAgentsPerGC
-         && c!=candidatesList.end()) {
+  for (;m_watchedAgents.size() < c_maxWatchedAgentsPerGC
+         && c!=candidatesList.end(); c++) {
     // We don't monitor ourselves
     if (*c != m_ourAgent.getNameIfSet()) {
       // So we have a candidate we might want to monitor
@@ -97,9 +101,16 @@ void GarbageCollector::aquireTargets() {
       // (we hold an exclusive lock all along)
       m_agentRegister.trackAgent(ag.getNameIfSet());
       m_agentRegister.commit();
-      // Agent is now officially ours, let's track it
-      m_watchedAgents[ag.getNameIfSet()] =
-        new AgentWatchdog(ag.getNameIfSet(), m_objectStore);
+      // Agent is officially our, we can remove it from the untracked agent's
+      // list
+      m_agentRegister.trackAgent(ag.getNameIfSet());
+      // Agent is now officially ours, let's track it. We have the release the 
+      // lock to the agent before constructing the watchdog, which builds
+      // its own agent objects (and need to lock the object store representation)
+      std::string agentName = ag.getNameIfSet();
+      agLock.release();
+      m_watchedAgents[agentName] =
+        new AgentWatchdog(agentName, m_objectStore);
       m_watchedAgents[ag.getNameIfSet()]->setTimeout(m_timeout);
     }
   }
@@ -157,8 +168,9 @@ void GarbageCollector::checkHeartbeats() {
      }
      ScopedSharedLock gContLock(gContainter);
      gContainter.fetch();
+     serializers::ObjectType containerType = gContainter.type();
      gContLock.release();
-     switch(gContainter.type()) {
+     switch(containerType) {
        case serializers::FIFO_t: {
          FIFO fifo(go.getBackupOwner(), m_objectStore);
          ScopedExclusiveLock ffLock(fifo);
@@ -170,19 +182,15 @@ void GarbageCollector::checkHeartbeats() {
          go.commit();
          ffLock.release();
          goLock.release();
+         break;
        }
        default: {
          throw cta::exception::Exception("In GarbageCollector::cleanupDeadAgent: unexpected container type!");
        }
      }
-     // We now processed all the owned objects. We can delete the agent's entry
-     agent.remove();
-     // And remove the (dangling) pointers to it
-     ScopedExclusiveLock arLock(m_agentRegister);
-     m_agentRegister.fetch();
-     m_agentRegister.removeAgent(name);
-     m_agentRegister.commit();
    }
+   // We now processed all the owned objects. We can delete the agent's entry
+   agent.deleteAndUnregisterSelf();
  }
 
 
diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp
index d9de5fe0ee..d5bf4ef1c5 100644
--- a/objectstore/GarbageCollectorTest.cpp
+++ b/objectstore/GarbageCollectorTest.cpp
@@ -21,21 +21,19 @@ TEST(GarbageCollector, BasicFuctionnality) {
   cta::objectstore::Agent agA(be), agB(be);
   agA.initialize();
   agA.generateName("unitTestAgentA");
-  agA.insert();
   agA.insertAndRegisterSelf();
   agB.initialize();
   agB.generateName("unitTestAgentB");
-  agB.insert();
   agB.insertAndRegisterSelf();
   // Create target FIFO
   std::string fifoName = agent.nextId("FIFO");
   std::list<std::string> expectedData;
-  { 
-    // Try to create the FIFO entry
-    cta::objectstore::FIFO ff(fifoName,be);
-    ff.initialize();
-    ff.insert();
-  }
+  // Try to create the FIFO entry
+  cta::objectstore::FIFO ff(fifoName,be);
+  ff.initialize();
+  ff.insert();
+  // And lock it for later
+  cta::objectstore::ScopedExclusiveLock ffLock;
   {
     for (int i=0; i<100; i++) {
       // We create FIFOs here, but any object can do.
@@ -45,7 +43,7 @@ TEST(GarbageCollector, BasicFuctionnality) {
       cta::objectstore::FIFO centralFifo(fifoName, be);
       cta::objectstore::ScopedExclusiveLock lock(centralFifo);
       centralFifo.fetch();
-      expectedData.push_back(agent.nextId("TestData"));
+      expectedData.push_back(newFIFO.getNameIfSet());
       centralFifo.push(expectedData.back());
       centralFifo.commit();
       lock.release();
@@ -56,9 +54,30 @@ TEST(GarbageCollector, BasicFuctionnality) {
       newFIFO.insert();
     }
   }
+  ffLock.lock(ff);
+  ff.fetch();
+  ASSERT_EQ(100, ff.size());
+  ffLock.release();
   for (int i=0; i<10; i++) {
-    
+    cta::objectstore::ScopedExclusiveLock objALock, objBLock;
+    cta::objectstore::FIFO objA(be), objB(be); 
+    agA.popFromContainer(ff, objA, objALock);
+    agB.popFromContainer(ff, objB, objBLock);
   }
-  // TODO: take ownership of FIFO contents in agA and agB, and then garbage collect.
-  // The FIFO should get all its objects back.
+  ffLock.lock(ff);
+  ff.fetch();
+  ASSERT_EQ(80, ff.size());
+  ffLock.release();
+  // Create the garbage colletor and run it twice.
+  cta::objectstore::Agent gcAgent(be);
+  gcAgent.initialize();
+  gcAgent.generateName("unitTestGarbageCollector");
+  gcAgent.insertAndRegisterSelf();
+  cta::objectstore::GarbageCollector gc(be, gcAgent);
+  gc.setTimeout(0);
+  gc.runOnePass();
+  gc.runOnePass();
+  ffLock.lock(ff);
+  ff.fetch();
+  ASSERT_EQ(100, ff.size());
 }
\ No newline at end of file
diff --git a/objectstore/GenericObject.hpp b/objectstore/GenericObject.hpp
index 78b7a4aefd..66a555c2c9 100644
--- a/objectstore/GenericObject.hpp
+++ b/objectstore/GenericObject.hpp
@@ -8,7 +8,7 @@ namespace cta {  namespace objectstore {
 class GenericObject: public ObjectOps<serializers::GenericObject> {
 public:
   GenericObject(const std::string & name, Backend & os):
-    ObjectOps<serializers::GenericObject>(os) {};
+    ObjectOps<serializers::GenericObject>(os, name) {};
     
   class ForbiddenOperation: public cta::exception::Exception {
   public:
diff --git a/objectstore/ProtcolBuffersAlgorithms.hpp b/objectstore/ProtcolBuffersAlgorithms.hpp
index 3993dfba6b..1e4f89521f 100644
--- a/objectstore/ProtcolBuffersAlgorithms.hpp
+++ b/objectstore/ProtcolBuffersAlgorithms.hpp
@@ -15,4 +15,7 @@ class NotFound: public cta::exception::Exception {
 size_t findString(::google::protobuf::RepeatedPtrField< ::std::string>* field, 
   const std::string & value);
 
+size_t findStringFrom(::google::protobuf::RepeatedPtrField< ::std::string>* field,
+  size_t fromIndex, const std::string & value);
+
 }}}
\ No newline at end of file
diff --git a/objectstore/ProtocolBuffersAlgorithms.cpp b/objectstore/ProtocolBuffersAlgorithms.cpp
index 9117b8a1cf..42a4915083 100644
--- a/objectstore/ProtocolBuffersAlgorithms.cpp
+++ b/objectstore/ProtocolBuffersAlgorithms.cpp
@@ -25,4 +25,15 @@ size_t cta::objectstore::serializers::findString(
     }
   }
   throw NotFound("In cta::objectstore::serializers::findString: string not found");
+}
+
+size_t cta::objectstore::serializers::findStringFrom(
+  ::google::protobuf::RepeatedPtrField< ::std::string>* field, size_t fromIndex,
+  const std::string& value) {
+  for (size_t i=fromIndex; i<(size_t)field->size(); i++) {
+    if (value == field->Get(i)) {
+      return i;
+    }
+  }
+  throw NotFound("In cta::objectstore::serializers::findString: string not found");
 }
\ No newline at end of file
-- 
GitLab