Skip to content
Snippets Groups Projects
Commit 5379e60a authored by Eric Cano's avatar Eric Cano
Browse files

Extended shared access to Agent ownership to all cases.

The older styled add/remove by bulk are now sharing the access to object with heartbeat.
They should be more efficient. They are used for popping from queue and reporting jobs are done,
in the tape server.
parent 500645a1
No related branches found
No related tags found
No related merge requests found
......@@ -74,49 +74,27 @@ std::string AgentReference::nextId(const std::string& childType) {
}
void AgentReference::addToOwnership(const std::string& objectAddress, objectstore::Backend& backend) {
std::shared_ptr<Action> a (new Action(AgentOperation::Add, objectAddress));
std::shared_ptr<Action> a (new Action(AgentOperation::Add, objectAddress, std::list<std::string>()));
queueAndExecuteAction(a, backend);
}
void AgentReference::addBatchToOwnership(const std::list<std::string>& objectAdresses, objectstore::Backend& backend) {
objectstore::Agent ag(m_agentAddress, backend);
log::LogContext lc(m_logger);
log::ScopedParamContainer params(lc);
params.add("agentObject", m_agentAddress);
objectstore::ScopedExclusiveLock agl(ag);
ag.fetch();
for (const auto & oa: objectAdresses) {
ag.addToOwnership(oa);
log::ScopedParamContainer params(lc);
params.add("ownedObject", oa);
lc.log(log::DEBUG, "In AgentReference::addBatchToOwnership(): added object to ownership.");
}
ag.commit();
std::shared_ptr<Action> a (new Action(AgentOperation::AddBatch, "", objectAdresses));
queueAndExecuteAction(a, backend);
}
void AgentReference::removeFromOwnership(const std::string& objectAddress, objectstore::Backend& backend) {
std::shared_ptr<Action> a (new Action(AgentOperation::Remove, objectAddress));
std::shared_ptr<Action> a (new Action(AgentOperation::Remove, objectAddress, std::list<std::string>()));
queueAndExecuteAction(a, backend);
}
void AgentReference::removeBatchFromOwnership(const std::list<std::string>& objectAdresses, objectstore::Backend& backend) {
objectstore::Agent ag(m_agentAddress, backend);
log::LogContext lc(m_logger);
log::ScopedParamContainer params(lc);
params.add("agentObject", m_agentAddress);
objectstore::ScopedExclusiveLock agl(ag);
ag.fetch();
for (const auto & oa: objectAdresses) {
ag.removeFromOwnership(oa);
log::ScopedParamContainer params(lc);
params.add("ownedObject", oa);
lc.log(log::DEBUG, "In AgentReference::removeBatchFromOwnership(): removed object from ownership.");
}
ag.commit();
std::shared_ptr<Action> a (new Action(AgentOperation::RemoveBatch, "", objectAdresses));
queueAndExecuteAction(a, backend);
}
void AgentReference::bumpHeatbeat(objectstore::Backend& backend) {
std::shared_ptr<Action> a (new Action(AgentOperation::Heartbeat, ""));
std::shared_ptr<Action> a (new Action(AgentOperation::Heartbeat, "", std::list<std::string>()));
queueAndExecuteAction(a, backend);
}
......@@ -262,6 +240,17 @@ void AgentReference::appyAction(Action& action, objectstore::Agent& agent,
lc.log(log::DEBUG, "In AgentReference::appyAction(): added object to ownership.");
break;
}
case AgentOperation::AddBatch:
{
for (const auto & oa: action.objectAddressSet) {
ownershipSet.insert(oa);
log::ScopedParamContainer params(lc);
params.add("ownedObject", oa);
lc.log(log::DEBUG, "In AgentReference::appyAction(): added object to ownership (by batch).");
}
break;
}
case AgentOperation::Remove:
{
ownershipSet.erase(action.objectAddress);
......@@ -270,6 +259,16 @@ void AgentReference::appyAction(Action& action, objectstore::Agent& agent,
lc.log(log::DEBUG, "In AgentReference::appyAction(): removed object from ownership.");
break;
}
case AgentOperation::RemoveBatch:
{
for (const auto & oa: action.objectAddressSet) {
ownershipSet.erase(oa);
log::ScopedParamContainer params(lc);
params.add("ownedObject", oa);
lc.log(log::DEBUG, "In AgentReference::appyAction(): removed object from ownership (by batch).");
}
break;
}
case AgentOperation::Heartbeat:
agent.bumpHeartbeat();
break;
......
......@@ -110,6 +110,8 @@ private:
enum class AgentOperation: char {
Add,
Remove,
AddBatch,
RemoveBatch,
Heartbeat
};
......@@ -117,9 +119,11 @@ private:
* An operation with its parameter and promise
*/
struct Action {
Action(AgentOperation op, const std::string & objectAddress): op(op), objectAddress(objectAddress) {}
Action(AgentOperation op, const std::string & objectAddress, const std::list<std::string> & objectAddressSet):
op(op), objectAddress(objectAddress), objectAddressSet(objectAddressSet) {}
AgentOperation op;
const std::string & objectAddress;
const std::list<std::string> & objectAddressSet;
std::promise<void> promise;
/***
* A mutex ensuring the object will not be released before the promise's result
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment