From 5379e60ac250db856b0d4bab63e9102819e44bb1 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Mon, 11 Jun 2018 16:30:30 +0200
Subject: [PATCH] Extended shared access to Agent ownership to all cases.

The older styled add/remove by bulk are now sharing the access to object with heartbeat.
They should be more efficient. They are used for popping from queue and reporting jobs are done,
in the tape server.
---
 objectstore/AgentReference.cpp | 57 +++++++++++++++++-----------------
 objectstore/AgentReference.hpp |  6 +++-
 2 files changed, 33 insertions(+), 30 deletions(-)

diff --git a/objectstore/AgentReference.cpp b/objectstore/AgentReference.cpp
index e0256a522e..774ab765c0 100644
--- a/objectstore/AgentReference.cpp
+++ b/objectstore/AgentReference.cpp
@@ -74,49 +74,27 @@ std::string AgentReference::nextId(const std::string& childType) {
 }
 
 void AgentReference::addToOwnership(const std::string& objectAddress, objectstore::Backend& backend) {
-  std::shared_ptr<Action> a (new Action(AgentOperation::Add, objectAddress));
+  std::shared_ptr<Action> a (new Action(AgentOperation::Add, objectAddress, std::list<std::string>()));
   queueAndExecuteAction(a, backend);
 }
 
 void AgentReference::addBatchToOwnership(const std::list<std::string>& objectAdresses, objectstore::Backend& backend) {
-  objectstore::Agent ag(m_agentAddress, backend);
-  log::LogContext lc(m_logger);
-  log::ScopedParamContainer params(lc);
-  params.add("agentObject", m_agentAddress);
-  objectstore::ScopedExclusiveLock agl(ag);
-  ag.fetch();
-  for (const auto & oa: objectAdresses) {
-    ag.addToOwnership(oa);
-    log::ScopedParamContainer params(lc);
-    params.add("ownedObject", oa);
-    lc.log(log::DEBUG, "In AgentReference::addBatchToOwnership(): added object to ownership.");
-  }
-  ag.commit();
+  std::shared_ptr<Action> a (new Action(AgentOperation::AddBatch, "", objectAdresses));
+  queueAndExecuteAction(a, backend);
 }
 
 void AgentReference::removeFromOwnership(const std::string& objectAddress, objectstore::Backend& backend) {
-  std::shared_ptr<Action> a (new Action(AgentOperation::Remove, objectAddress));
+  std::shared_ptr<Action> a (new Action(AgentOperation::Remove, objectAddress, std::list<std::string>()));
   queueAndExecuteAction(a, backend);
 }
 
 void AgentReference::removeBatchFromOwnership(const std::list<std::string>& objectAdresses, objectstore::Backend& backend) {
-  objectstore::Agent ag(m_agentAddress, backend);
-  log::LogContext lc(m_logger);
-  log::ScopedParamContainer params(lc);
-  params.add("agentObject", m_agentAddress);
-  objectstore::ScopedExclusiveLock agl(ag);
-  ag.fetch();
-  for (const auto & oa: objectAdresses) {
-    ag.removeFromOwnership(oa);
-    log::ScopedParamContainer params(lc);
-    params.add("ownedObject", oa);
-    lc.log(log::DEBUG, "In AgentReference::removeBatchFromOwnership(): removed object from ownership.");
-  }
-  ag.commit();
+  std::shared_ptr<Action> a (new Action(AgentOperation::RemoveBatch, "", objectAdresses));
+  queueAndExecuteAction(a, backend);
 }
 
 void AgentReference::bumpHeatbeat(objectstore::Backend& backend) {
-  std::shared_ptr<Action> a (new Action(AgentOperation::Heartbeat, ""));
+  std::shared_ptr<Action> a (new Action(AgentOperation::Heartbeat, "", std::list<std::string>()));
   queueAndExecuteAction(a, backend);
 }
 
@@ -262,6 +240,17 @@ void AgentReference::appyAction(Action& action, objectstore::Agent& agent,
     lc.log(log::DEBUG, "In AgentReference::appyAction(): added object to ownership.");
     break;
   }
+  case AgentOperation::AddBatch:
+  {
+    for (const auto & oa: action.objectAddressSet) {
+      ownershipSet.insert(oa);
+      log::ScopedParamContainer params(lc);
+      params.add("ownedObject", oa);
+      lc.log(log::DEBUG, "In AgentReference::appyAction(): added object to ownership (by batch).");
+    }
+    
+    break;
+  }
   case AgentOperation::Remove:
   {
     ownershipSet.erase(action.objectAddress);
@@ -270,6 +259,16 @@ void AgentReference::appyAction(Action& action, objectstore::Agent& agent,
     lc.log(log::DEBUG, "In AgentReference::appyAction(): removed object from ownership.");
     break;
   }
+  case AgentOperation::RemoveBatch:
+  {
+    for (const auto & oa: action.objectAddressSet) {
+      ownershipSet.erase(oa);
+      log::ScopedParamContainer params(lc);
+      params.add("ownedObject", oa);
+      lc.log(log::DEBUG, "In AgentReference::appyAction(): removed object from ownership (by batch).");
+    }
+    break;
+  }
   case AgentOperation::Heartbeat:
     agent.bumpHeartbeat();
     break;
diff --git a/objectstore/AgentReference.hpp b/objectstore/AgentReference.hpp
index 26dabf5b48..3b12c5f3c2 100644
--- a/objectstore/AgentReference.hpp
+++ b/objectstore/AgentReference.hpp
@@ -110,6 +110,8 @@ private:
   enum class AgentOperation: char {
     Add,
     Remove,
+    AddBatch,
+    RemoveBatch,
     Heartbeat
   };
   
@@ -117,9 +119,11 @@ private:
    * An operation with its parameter and promise
    */
   struct Action {
-    Action(AgentOperation op, const std::string & objectAddress): op(op), objectAddress(objectAddress) {}
+    Action(AgentOperation op, const std::string & objectAddress, const std::list<std::string> & objectAddressSet): 
+      op(op), objectAddress(objectAddress), objectAddressSet(objectAddressSet) {}
     AgentOperation op;
     const std::string & objectAddress;
+    const std::list<std::string> & objectAddressSet;
     std::promise<void> promise;
     /***
      * A mutex ensuring the object will not be released before the promise's result
-- 
GitLab