From bb0b10afbbe898816bb150f79b52410826e86495 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Mon, 2 Mar 2015 12:09:47 +0100
Subject: [PATCH] Added helper functions for transmission of ownership.
 Implemented the self registration and unregistration functions for the agent.
 Improved the safeguards and imlpemented more initialize() function for
 different object types.

---
 objectstore/Agent.cpp                | 41 +++++++++++++
 objectstore/Agent.hpp                | 87 +++++++++++++++++++++++++++-
 objectstore/AgentRegister.cpp        |  6 ++
 objectstore/AgentRegister.hpp        |  1 +
 objectstore/GarbageCollectorTest.cpp |  9 +++
 objectstore/ObjectOps.hpp            | 74 ++++++++++++++++++-----
 objectstore/RootEntry.cpp            |  8 +++
 objectstore/RootEntry.hpp            |  3 +
 8 files changed, 214 insertions(+), 15 deletions(-)

diff --git a/objectstore/Agent.cpp b/objectstore/Agent.cpp
index cb2503ba8e..950d557b22 100644
--- a/objectstore/Agent.cpp
+++ b/objectstore/Agent.cpp
@@ -38,6 +38,47 @@ void cta::objectstore::Agent::generateName(const std::string & typeName) {
   setName(aid.str());
 }
 
+void cta::objectstore::Agent::insertAndRegisterSelf() {
+  // We suppose initialize was already called, and that the agent name
+  // is set.
+  // We need to get hold of the agent register, which we suppose is available
+  RootEntry re(m_objectStore);
+  ScopedSharedLock reLock(re);
+  re.fetch();
+  AgentRegister ar(re.getAgentRegister(), m_objectStore);
+  reLock.release();
+  // Then we should first create a pointer to our agent
+  ScopedExclusiveLock arLock(ar);
+  ar.fetch();
+  ar.addAgent(getNameIfSet());
+  ar.commit();
+  // Set the agent register as owner and backup owner
+  setBackupOwner(ar.getNameIfSet());
+  setOwner(ar.getNameIfSet());
+  // Create the agent
+  insert();
+  // And release the agent register's lock
+  arLock.release();
+}
+
+void cta::objectstore::Agent::deleteAndUnregisterSelf() {
+  // First delete ourselves
+  remove();
+  // Then we remove the dangling pointer about ourselves in the agent register.
+  // We need to get hold of the agent register, which we suppose is available
+  RootEntry re(m_objectStore);
+  ScopedSharedLock reLock(re);
+  re.fetch();
+  AgentRegister ar(re.getAgentRegister(), m_objectStore);
+  reLock.release();
+  // Then we should first create a pointer to our agent
+  ScopedExclusiveLock arLock(ar);
+  ar.fetch();
+  ar.removeAgent(getNameIfSet());
+  ar.commit();
+  arLock.release();
+}
+
 /*void cta::objectstore::Agent::create() {
   if (!m_setupDone)
     throw SetupNotDone("In Agent::create(): setup() not yet done");
diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp
index 9cd3655462..37461dfd6e 100644
--- a/objectstore/Agent.hpp
+++ b/objectstore/Agent.hpp
@@ -27,9 +27,9 @@ public:
   
   std::string nextId(const std::string & childType);
   
-  void registerSelf();
+  void insertAndRegisterSelf();
   
-  void unregisterSelf();
+  void deleteAndUnregisterSelf();
   
  /* class ScopedIntent {
   public:
@@ -93,6 +93,89 @@ public:
   
   uint64_t getHeartbeatCount();
   
+  
+  /**
+   * Helper function to transfer ownership of the next valid head object of a
+   * container to the agent.
+   * The object is returned locked on the lock passed as reference.
+   * @param container
+   * @param object
+   */
+  template <class Cont, class Obj>
+  void popFromContainer (Cont & container, Obj & object, ScopedExclusiveLock & objLock) {
+    // Lock the container for write first.
+    ScopedExclusiveLock contLock(container);
+    while(true) {
+      // Check there is an object to pop.
+      // This throws an exception if nothing's available, we just let it through
+      std::string nextObjName = container.peek();
+      // Check that the object exists, is of the right type (implicit), etc...
+      // Set the name of the object
+      object.setName(nextObjName);
+      // Validate that the object exists.
+      // If not, it is a dangling pointer. Pop and continue
+      if (!object.exists()) {
+        container.pop();
+        continue;
+      }
+      // Try to lock the object. Exception will be let through
+      objLock.lock(object);
+      // Fetch the object. Exception will be let through.
+      object.fetch();
+      // Check that the container owns the object. If not, it is a dangling pointer
+      if (container.getNameIfSet() != object.getOwner()) {
+        objLock.release();
+        container.pop();
+        continue;
+      }
+      // If we get here, then we can proceed with the ownership transfer
+      // First, add a pointer to the object on the agent
+      ScopedExclusiveLock agentLock(*this);
+      fetch();
+      addToOwnership(nextObjName);
+      commit();
+      agentLock.release();
+      // Then make the pointer agent's ownership official
+      object.setOwner(getNameIfSet());
+      // The container should be the backup owner, so let's make sure!
+      object.setBackupOwner(container.getNameIfSet());
+      // Commit the object
+      object.commit();
+      // Job done.
+      return;
+    }
+  }
+  
+  class AgentDoesNotOwnObject: public cta::exception::Exception {
+  public:
+    AgentDoesNotOwnObject(const std::string & context): cta::exception::Exception(context) {}
+  };
+  
+  template <class Cont, class Obj>
+  void pushToContainer (Cont & container, Obj & object) {
+    // Lock the object for write
+    ScopedExclusiveLock objLock(object);
+    object.fetch();
+    // Check that the object is indeed ours
+    if (object.getOwner() != getNameIfSet())
+      throw AgentDoesNotOwnObject("In Agent::pushToContainer: agent is not the owner of the object");
+    // Lock the container for write
+    ScopedExclusiveLock contLock(container);
+    // Add a pointer to the object in the container
+    container.fetch();
+    container.push(object.getNameIfSet());
+    container.commit();
+    // Note: we retain the lock on the container until the ownership if official
+    // in the object. Otherwise, there would be a race condition, and pointer could
+    // be lost.
+    object.setOwner(container.getNameIfSet());
+    object.setBackupOwner(container.getNameIfSet());
+    object.commit();
+    objLock.release();
+    contLock.release();
+  }
+
+  
 private:
   uint64_t m_nextId;
 };
diff --git a/objectstore/AgentRegister.cpp b/objectstore/AgentRegister.cpp
index 44d5eddb70..31faf8825c 100644
--- a/objectstore/AgentRegister.cpp
+++ b/objectstore/AgentRegister.cpp
@@ -7,6 +7,12 @@ ObjectOps<serializers::AgentRegister>(os) {}
 cta::objectstore::AgentRegister::AgentRegister(const std::string & name, Backend & os):
 ObjectOps<serializers::AgentRegister>(os, name) {}
 
+void cta::objectstore::AgentRegister::initialize() {
+  ObjectOps<serializers::AgentRegister>::initialize();
+  // There is nothing to do for the payload.
+  m_payloadInterpreted = true;
+}
+
 void cta::objectstore::AgentRegister::addAgent (std::string name) {
   checkPayloadWritable();
   m_payload.add_agents(name);
diff --git a/objectstore/AgentRegister.hpp b/objectstore/AgentRegister.hpp
index 30154b0085..0779aa0242 100644
--- a/objectstore/AgentRegister.hpp
+++ b/objectstore/AgentRegister.hpp
@@ -12,6 +12,7 @@ class AgentRegister: public ObjectOps<serializers::AgentRegister> {
 public:
   AgentRegister(Backend & os);
   AgentRegister(const std::string & name, Backend & os);
+  void initialize();
   void addAgent (std::string name);
   void removeAgent (const std::string  & name);
   void trackAgent (std::string name);
diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp
index 6a5f933fca..d9de5fe0ee 100644
--- a/objectstore/GarbageCollectorTest.cpp
+++ b/objectstore/GarbageCollectorTest.cpp
@@ -19,8 +19,14 @@ TEST(GarbageCollector, BasicFuctionnality) {
   re.allocateOrGetAgentRegister(agent);
   // Create 2 agents, A and B and register them
   cta::objectstore::Agent agA(be), agB(be);
+  agA.initialize();
   agA.generateName("unitTestAgentA");
+  agA.insert();
+  agA.insertAndRegisterSelf();
+  agB.initialize();
   agB.generateName("unitTestAgentB");
+  agB.insert();
+  agB.insertAndRegisterSelf();
   // Create target FIFO
   std::string fifoName = agent.nextId("FIFO");
   std::list<std::string> expectedData;
@@ -49,6 +55,9 @@ TEST(GarbageCollector, BasicFuctionnality) {
       newFIFO.setBackupOwner(fifoName);
       newFIFO.insert();
     }
+  }
+  for (int i=0; i<10; i++) {
+    
   }
   // TODO: take ownership of FIFO contents in agA and agB, and then garbage collect.
   // The FIFO should get all its objects back.
diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp
index 663a46541a..4b5ef8b1ee 100644
--- a/objectstore/ObjectOps.hpp
+++ b/objectstore/ObjectOps.hpp
@@ -157,33 +157,78 @@ protected:
 class ScopedLock {
 public:
   void release() {
-    m_lock.reset(NULL);
-    m_objectOps.m_locksCount--;
-    m_objectOps.m_locksForWriteCount--;
+    checkLocked();
+    releaseIfNeeded();
   }
+  
   virtual ~ScopedLock() {
-    release();
+    releaseIfNeeded();
   }
+  class AlreadyLocked: public cta::exception::Exception {
+  public:
+    AlreadyLocked(const std::string & w): cta::exception::Exception(w) {}
+  };
+  
+  class NotLocked: public cta::exception::Exception {
+  public:
+    NotLocked(const std::string & w): cta::exception::Exception(w) {}
+  };
+  
 protected:
-  ScopedLock(ObjectOpsBase & oo): m_objectOps(oo) {}
+  ScopedLock(): m_objectOps(NULL), m_locked(false) {}
   std::auto_ptr<Backend::ScopedLock> m_lock;
-  ObjectOpsBase & m_objectOps;
+  ObjectOpsBase * m_objectOps;
+  bool m_locked;
+  void checkNotLocked() {
+    if (m_locked)
+      throw AlreadyLocked("In ScopedLock::checkNotLocked: trying to lock an already locked lock");
+  }
+  void checkLocked() {
+    if (!m_locked)
+      throw NotLocked("In ScopedLock::checkLocked: trying to unlock an unlocked lock");
+  }
+  virtual void releaseIfNeeded() {
+    if(!m_locked) return;
+    m_lock.reset(NULL);
+    m_objectOps->m_locksCount--;
+    m_locked = false;
+  }
 };
   
 class ScopedSharedLock: public ScopedLock {
 public:
-  ScopedSharedLock(ObjectOpsBase & oo): ScopedLock(oo) {
-    m_lock.reset(m_objectOps.m_objectStore.lockShared(m_objectOps.getNameIfSet()));
-    m_objectOps.m_locksCount++;
+  ScopedSharedLock() {}
+  ScopedSharedLock(ObjectOpsBase & oo) {
+    lock(oo);
+  }
+  void lock(ObjectOpsBase & oo) {
+    checkNotLocked();
+    m_objectOps  = & oo;
+    m_lock.reset(m_objectOps->m_objectStore.lockShared(m_objectOps->getNameIfSet()));
+    m_objectOps->m_locksCount++;
+    m_locked = true;
   }
 };
 
 class ScopedExclusiveLock: public ScopedLock {
 public:
-  ScopedExclusiveLock(ObjectOpsBase & oo): ScopedLock(oo) {
-    m_lock.reset(m_objectOps.m_objectStore.lockExclusive(m_objectOps.getNameIfSet()));
-    m_objectOps.m_locksCount++;
-    m_objectOps.m_locksForWriteCount++;
+  ScopedExclusiveLock() {}
+  ScopedExclusiveLock(ObjectOpsBase & oo) {
+    lock(oo);
+  }
+  void lock(ObjectOpsBase & oo) {
+    checkNotLocked();
+    m_objectOps = &oo;
+    m_lock.reset(m_objectOps->m_objectStore.lockExclusive(m_objectOps->getNameIfSet()));
+    m_objectOps->m_locksCount++;
+    m_objectOps->m_locksForWriteCount++;
+    m_locked = true;
+  }
+protected:
+  void releaseIfNeeded() {
+    if (!m_locked) return;
+    ScopedLock::releaseIfNeeded();
+    m_objectOps->m_locksForWriteCount--;
   }
 };
 
@@ -254,6 +299,9 @@ public:
     // Check that we are not dealing with an existing object
     if (m_existingObject)
       throw NotNewObject("In ObjectOps::insert: trying to insert an already exitsting object");
+    // Check that the object is ready in memory
+    if (!m_headerInterpreted || !m_payloadInterpreted)
+      throw NotInitialized("In ObjectOps::insert: trying to insert an uninitialized object");
     // Push the payload into the header and write the object
     // We don't require locking here, as the object does not exist
     // yet in the object store (and this is ensured by the )
diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp
index ee38016b20..0265a7c80a 100644
--- a/objectstore/RootEntry.cpp
+++ b/objectstore/RootEntry.cpp
@@ -10,6 +10,14 @@
 cta::objectstore::RootEntry::RootEntry(Backend & os):
   ObjectOps<serializers::RootEntry>(os, s_rootEntryName) {}
 
+// Initialiser. This uses the base object's initialiser and sets the defaults 
+// of payload.
+void cta::objectstore::RootEntry::initialize() {
+  ObjectOps<serializers::RootEntry>::initialize();
+  // There is nothing to do for the payload.
+  m_payloadInterpreted = true;
+}
+
 // Get the name of the agent register (or exception if not available)
 std::string cta::objectstore::RootEntry::getAgentRegister() {
   // Check that the fetch was done
diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp
index 0cc86b4e5d..1ac27450ab 100644
--- a/objectstore/RootEntry.hpp
+++ b/objectstore/RootEntry.hpp
@@ -19,6 +19,9 @@ public:
     NotAllocatedEx(const std::string & w): cta::exception::Exception(w) {}
   };
   
+  // In memory initialiser
+  void initialize();
+  
   // Get the name of the agent register (or exception if not available)
   std::string getAgentRegister();
   
-- 
GitLab