From b9e4973c6831a00c1472db0d72b2be3e70c842e8 Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Mon, 13 May 2019 17:20:27 +0200 Subject: [PATCH] Added data integrity mechanism: garbage collection signaling. The garbage collector now sets a flag on the agent object indicating garbage collection. The process will immediately exit when detecting garbage collection. --- objectstore/Agent.cpp | 11 +++++++++++ objectstore/Agent.hpp | 4 ++++ objectstore/AgentReference.cpp | 6 ++++++ objectstore/GarbageCollector.cpp | 3 ++- objectstore/cta.proto | 1 + 5 files changed, 24 insertions(+), 1 deletion(-) diff --git a/objectstore/Agent.cpp b/objectstore/Agent.cpp index a103b63dc2..cb68dcb0e3 100644 --- a/objectstore/Agent.cpp +++ b/objectstore/Agent.cpp @@ -44,6 +44,7 @@ void cta::objectstore::Agent::initialize() { m_payload.set_heartbeat(0); m_payload.set_timeout_us(60*1000*1000); m_payload.set_description(""); + m_payload.set_being_garbage_collected(false); m_payloadInterpreted = true; } @@ -138,6 +139,16 @@ bool cta::objectstore::Agent::isEmpty() { return true; } +bool cta::objectstore::Agent::isBeingGarbageCollected() { + checkPayloadReadable(); + return m_payload.being_garbage_collected(); +} + +void cta::objectstore::Agent::setBeingGarbageCollected() { + checkPayloadWritable(); + m_payload.set_being_garbage_collected(true); +} + void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc, cta::catalogue::Catalogue & catalogue) { checkPayloadWritable(); diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp index e3263e1f69..da25916118 100644 --- a/objectstore/Agent.hpp +++ b/objectstore/Agent.hpp @@ -58,6 +58,10 @@ public: bool isEmpty(); + void setBeingGarbageCollected(); + + bool isBeingGarbageCollected(); + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, cta::catalogue::Catalogue & catalogue) override; diff --git a/objectstore/AgentReference.cpp b/objectstore/AgentReference.cpp index 761328ea59..ea719ed9e1 100644 --- a/objectstore/AgentReference.cpp +++ b/objectstore/AgentReference.cpp @@ -159,6 +159,12 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec objectstore::ScopedExclusiveLock agl(ag); double agentLockTime = t.secs(utils::Timer::resetCounter); ag.fetch(); + if (ag.isBeingGarbageCollected()) { + log::ScopedParamContainer params(lc); + params.add("agentObject", ag.getAddressIfSet()); + lc.log(log::CRIT, "In AgentReference::queueAndExecuteAction(): agent object being garbage collected. Exiting."); + ::exit(EXIT_FAILURE); + } double agentFetchTime = t.secs(utils::Timer::resetCounter); size_t agentOwnershipSizeBefore = ag.getOwnershipListSize(); size_t operationsCount = q->queue.size() + 1; diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 1d49ac4a9e..279a223450 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -172,9 +172,10 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, std::list<l // The agent will be removed from our ownership by the calling function: we're done. return; } - // Aquire ownership of the agent. + // Aquire ownership of the agent. Prevent further updates to it. m_ourAgentReference.addToOwnership(address,m_objectStore); agent.setOwner(m_ourAgentReference.getAgentAddress()); + agent.setBeingGarbageCollected(); agent.commit(); // Update the register ScopedExclusiveLock arl(m_agentRegister); diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 1e4d465f5a..e08ac8568a 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -155,6 +155,7 @@ message Agent { required uint64 heartbeat = 2001; required uint64 timeout_us = 2002; repeated string ownedobjects = 2003; + optional bool being_garbage_collected = 2004 [default = false]; } message AgentRegister { -- GitLab