diff --git a/objectstore/AgentReference.cpp b/objectstore/AgentReference.cpp index e0256a522ecbdefd500aa56ba3a08e3876c5e316..774ab765c003130c5eb7decf82abc8dd5857e839 100644 --- a/objectstore/AgentReference.cpp +++ b/objectstore/AgentReference.cpp @@ -74,49 +74,27 @@ std::string AgentReference::nextId(const std::string& childType) { } void AgentReference::addToOwnership(const std::string& objectAddress, objectstore::Backend& backend) { - std::shared_ptr<Action> a (new Action(AgentOperation::Add, objectAddress)); + std::shared_ptr<Action> a (new Action(AgentOperation::Add, objectAddress, std::list<std::string>())); queueAndExecuteAction(a, backend); } void AgentReference::addBatchToOwnership(const std::list<std::string>& objectAdresses, objectstore::Backend& backend) { - objectstore::Agent ag(m_agentAddress, backend); - log::LogContext lc(m_logger); - log::ScopedParamContainer params(lc); - params.add("agentObject", m_agentAddress); - objectstore::ScopedExclusiveLock agl(ag); - ag.fetch(); - for (const auto & oa: objectAdresses) { - ag.addToOwnership(oa); - log::ScopedParamContainer params(lc); - params.add("ownedObject", oa); - lc.log(log::DEBUG, "In AgentReference::addBatchToOwnership(): added object to ownership."); - } - ag.commit(); + std::shared_ptr<Action> a (new Action(AgentOperation::AddBatch, "", objectAdresses)); + queueAndExecuteAction(a, backend); } void AgentReference::removeFromOwnership(const std::string& objectAddress, objectstore::Backend& backend) { - std::shared_ptr<Action> a (new Action(AgentOperation::Remove, objectAddress)); + std::shared_ptr<Action> a (new Action(AgentOperation::Remove, objectAddress, std::list<std::string>())); queueAndExecuteAction(a, backend); } void AgentReference::removeBatchFromOwnership(const std::list<std::string>& objectAdresses, objectstore::Backend& backend) { - objectstore::Agent ag(m_agentAddress, backend); - log::LogContext lc(m_logger); - log::ScopedParamContainer params(lc); - params.add("agentObject", m_agentAddress); - objectstore::ScopedExclusiveLock agl(ag); - ag.fetch(); - for (const auto & oa: objectAdresses) { - ag.removeFromOwnership(oa); - log::ScopedParamContainer params(lc); - params.add("ownedObject", oa); - lc.log(log::DEBUG, "In AgentReference::removeBatchFromOwnership(): removed object from ownership."); - } - ag.commit(); + std::shared_ptr<Action> a (new Action(AgentOperation::RemoveBatch, "", objectAdresses)); + queueAndExecuteAction(a, backend); } void AgentReference::bumpHeatbeat(objectstore::Backend& backend) { - std::shared_ptr<Action> a (new Action(AgentOperation::Heartbeat, "")); + std::shared_ptr<Action> a (new Action(AgentOperation::Heartbeat, "", std::list<std::string>())); queueAndExecuteAction(a, backend); } @@ -262,6 +240,17 @@ void AgentReference::appyAction(Action& action, objectstore::Agent& agent, lc.log(log::DEBUG, "In AgentReference::appyAction(): added object to ownership."); break; } + case AgentOperation::AddBatch: + { + for (const auto & oa: action.objectAddressSet) { + ownershipSet.insert(oa); + log::ScopedParamContainer params(lc); + params.add("ownedObject", oa); + lc.log(log::DEBUG, "In AgentReference::appyAction(): added object to ownership (by batch)."); + } + + break; + } case AgentOperation::Remove: { ownershipSet.erase(action.objectAddress); @@ -270,6 +259,16 @@ void AgentReference::appyAction(Action& action, objectstore::Agent& agent, lc.log(log::DEBUG, "In AgentReference::appyAction(): removed object from ownership."); break; } + case AgentOperation::RemoveBatch: + { + for (const auto & oa: action.objectAddressSet) { + ownershipSet.erase(oa); + log::ScopedParamContainer params(lc); + params.add("ownedObject", oa); + lc.log(log::DEBUG, "In AgentReference::appyAction(): removed object from ownership (by batch)."); + } + break; + } case AgentOperation::Heartbeat: agent.bumpHeartbeat(); break; diff --git a/objectstore/AgentReference.hpp b/objectstore/AgentReference.hpp index 26dabf5b484dd9216edb8e6e9195fed73db8b3de..3b12c5f3c2f92fed6be8da39aa3682b01be45ce4 100644 --- a/objectstore/AgentReference.hpp +++ b/objectstore/AgentReference.hpp @@ -110,6 +110,8 @@ private: enum class AgentOperation: char { Add, Remove, + AddBatch, + RemoveBatch, Heartbeat }; @@ -117,9 +119,11 @@ private: * An operation with its parameter and promise */ struct Action { - Action(AgentOperation op, const std::string & objectAddress): op(op), objectAddress(objectAddress) {} + Action(AgentOperation op, const std::string & objectAddress, const std::list<std::string> & objectAddressSet): + op(op), objectAddress(objectAddress), objectAddressSet(objectAddressSet) {} AgentOperation op; const std::string & objectAddress; + const std::list<std::string> & objectAddressSet; std::promise<void> promise; /*** * A mutex ensuring the object will not be released before the promise's result