Commit 8c2cf546 authored by Eric Cano's avatar Eric Cano
Browse files

Added data integrity mechanism: garbage collection signaling.

The garbage collector now sets a flag on the agent object indicating garbage collection.
The process will immediately exit when detecting garbage collection.
parent dbffbebe
......@@ -44,6 +44,7 @@ void cta::objectstore::Agent::initialize() {
m_payload.set_heartbeat(0);
m_payload.set_timeout_us(60*1000*1000);
m_payload.set_description("");
m_payload.set_being_garbage_collected(false);
m_payloadInterpreted = true;
}
......@@ -138,6 +139,16 @@ bool cta::objectstore::Agent::isEmpty() {
return true;
}
bool cta::objectstore::Agent::isBeingGarbageCollected() {
checkPayloadReadable();
return m_payload.being_garbage_collected();
}
void cta::objectstore::Agent::setBeingGarbageCollected() {
checkPayloadWritable();
m_payload.set_being_garbage_collected(true);
}
void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkPayloadWritable();
......
......@@ -58,6 +58,10 @@ public:
bool isEmpty();
void setBeingGarbageCollected();
bool isBeingGarbageCollected();
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
......
......@@ -159,6 +159,12 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
objectstore::ScopedExclusiveLock agl(ag);
double agentLockTime = t.secs(utils::Timer::resetCounter);
ag.fetch();
if (ag.isBeingGarbageCollected()) {
log::ScopedParamContainer params(lc);
params.add("agentObject", ag.getAddressIfSet());
lc.log(log::CRIT, "In AgentReference::queueAndExecuteAction(): agent object being garbage collected. Exiting.");
::exit(EXIT_FAILURE);
}
double agentFetchTime = t.secs(utils::Timer::resetCounter);
size_t agentOwnershipSizeBefore = ag.getOwnershipListSize();
size_t operationsCount = q->queue.size() + 1;
......
......@@ -172,9 +172,10 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, std::list<l
// The agent will be removed from our ownership by the calling function: we're done.
return;
}
// Aquire ownership of the agent.
// Aquire ownership of the agent. Prevent further updates to it.
m_ourAgentReference.addToOwnership(address,m_objectStore);
agent.setOwner(m_ourAgentReference.getAgentAddress());
agent.setBeingGarbageCollected();
agent.commit();
// Update the register
ScopedExclusiveLock arl(m_agentRegister);
......
......@@ -155,6 +155,7 @@ message Agent {
required uint64 heartbeat = 2001;
required uint64 timeout_us = 2002;
repeated string ownedobjects = 2003;
optional bool being_garbage_collected = 2004 [default = false];
}
message AgentRegister {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment