Commit 095cfa3c authored by Eric Cano's avatar Eric Cano
Browse files

Re-implemented the updating of the agent ownership.

parent 089113f6
......@@ -181,6 +181,21 @@ std::list<std::string>
return ret;
}
std::set<std::string> cta::objectstore::Agent::getOwnershipSet() {
checkPayloadReadable();
std::set<std::string> ret;
for (const auto &oo: m_payload.ownedobjects())
ret.insert(oo);
return ret;
}
void cta::objectstore::Agent::resetOwnership(const std::set<std::string>& ownershipSet) {
checkPayloadWritable();
m_payload.mutable_ownedobjects()->Clear();
for (const auto &oo: ownershipSet)
*m_payload.mutable_ownedobjects()->Add() = oo;
}
size_t cta::objectstore::Agent::getOwnershipListSize() {
checkPayloadReadable();
return m_payload.ownedobjects_size();
......
......@@ -117,6 +117,10 @@ private:
public:
std::list<std::string> getOwnershipList();
std::set<std::string> getOwnershipSet();
void resetOwnership(const std::set<std::string>& ownershipSet);
size_t getOwnershipListSize();
std::string dump();
......
......@@ -184,19 +184,37 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
double agentFetchTime = t.secs(utils::Timer::resetCounter);
size_t agentOwnershipSizeBefore = ag.getOwnershipListSize();
size_t operationsCount = q->queue.size() + 1;
bool ownershipModification = false;
// First, determine if any action is an ownership modification
if (action->op == AgentOperation::Add || action->op == AgentOperation::Remove)
ownershipModification = true;
if (!ownershipModification) {
for (auto &a: q->queue) {
if (a->op == AgentOperation::Add || a->op == AgentOperation::Remove) {
ownershipModification = true;
break;
}
}
}
std::set<std::string> ownershipSet;
// If necessary, we will dump the ownership list into a set, manipulate it in memory,
// and then recreate it.
if (ownershipModification) ownershipSet = ag.getOwnershipSet();
// First we apply our own modification
appyAction(*action, ag, lc);
appyAction(*action, ag, ownershipSet, lc);
// Then those of other threads
for (auto a: q->queue) {
threading::MutexLocker ml(a->mutex);
appyAction(*a, ag, lc);
appyAction(*a, ag, ownershipSet, lc);
}
// Record the new ownership if needed.
if (ownershipModification) ag.resetOwnership(ownershipSet);
size_t agentOwnershipSizeAfter = ag.getOwnershipListSize();
double agentUpdateTime = t.secs(utils::Timer::resetCounter);
// and commit
ag.commit();
double agentCommitTime = t.secs(utils::Timer::resetCounter);
{
if (ownershipModification) {
log::ScopedParamContainer params(lc);
params.add("agentOwnershipSizeBefore", agentOwnershipSizeBefore)
.add("agentOwnershipSizeAfter", agentOwnershipSizeAfter)
......@@ -233,11 +251,12 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
}
}
void AgentReference::appyAction(Action& action, objectstore::Agent& agent, log::LogContext &lc) {
void AgentReference::appyAction(Action& action, objectstore::Agent& agent,
std::set<std::string> & ownershipSet, log::LogContext &lc) {
switch (action.op) {
case AgentOperation::Add:
{
agent.addToOwnership(action.objectAddress);
ownershipSet.insert(action.objectAddress);
log::ScopedParamContainer params(lc);
params.add("ownedObject", action.objectAddress);
lc.log(log::DEBUG, "In AgentReference::appyAction(): added object to ownership.");
......@@ -245,7 +264,7 @@ void AgentReference::appyAction(Action& action, objectstore::Agent& agent, log::
}
case AgentOperation::Remove:
{
agent.removeFromOwnership(action.objectAddress);
ownershipSet.erase(action.objectAddress);
log::ScopedParamContainer params(lc);
params.add("ownedObject", action.objectAddress);
lc.log(log::DEBUG, "In AgentReference::appyAction(): removed object from ownership.");
......
......@@ -28,6 +28,7 @@
#include <string>
#include <future>
#include <list>
#include <set>
namespace cta { namespace objectstore {
......@@ -142,10 +143,12 @@ private:
/**
* Helper function applying the action to the already fetched agent.
* Ownership operations are done on the pre-extracted ownershipSet.
* @param action
* @param agent
*/
void appyAction(Action &action, objectstore::Agent & agent, log::LogContext &lc);
void appyAction(Action& action, objectstore::Agent& agent,
std::set<std::string> & ownershipSet, log::LogContext &lc);
/**
* The global function actually doing the job: creates a queue if needed, add
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment