Commit e446e939 authored by Eric Cano's avatar Eric Cano
Browse files

cta/CTA#46: Garbage collection improvements

Implemented Agent::garbageCollect()
Made AgentWatchdog fetch the timout from the agent object.
Fixed unconditional time tounter reset in AgentWatchdog::checkAlive()
Fixed missing log for garbageCollection.
Added utility (cta-objectstore-unfollow-agent) to mark agents as not followed when they get orphaned. This utility detects looping ownerships (two garbage coolectors watch each other and both die synchronously). This utility is not packaged.
Various log improvements.
parent 62f9bcb5
......@@ -108,7 +108,26 @@ bool cta::objectstore::Agent::isEmpty() {
}
void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner) {
throw cta::exception::Exception("In Agent::garbageCollect(): not implemented.");
checkPayloadWritable();
// We are here limited to checking the presumed owner and mark the agent as
// untracked in the agent register in case of match, else we do nothing
if (m_header.owner() == presumedOwner) {
// We need to get hold of the agent register, which we suppose is available
RootEntry re(m_objectStore);
ScopedSharedLock reLock(re);
re.fetch();
AgentRegister ar(re.getAgentRegisterAddress(), m_objectStore);
reLock.release();
// Then we should first create a pointer to our agent
ScopedExclusiveLock arLock(ar);
ar.fetch();
ar.untrackAgent(getAddressIfSet());
ar.commit();
arLock.release();
// We now mark ourselves as owned by the agent register
setOwner(ar.getAddressIfSet());
commit();
}
}
......@@ -162,7 +181,8 @@ std::list<std::string>
void cta::objectstore::Agent::bumpHeartbeat() {
checkPayloadWritable();
m_payload.set_heartbeat(m_payload.heartbeat()+1);
auto heartbeat=m_payload.heartbeat()+1;
m_payload.set_heartbeat(heartbeat);
}
uint64_t cta::objectstore::Agent::getHeartbeatCount() {
......
......@@ -25,14 +25,21 @@ namespace cta { namespace objectstore {
class AgentWatchdog {
public:
AgentWatchdog(const std::string & name, Backend & os): m_agent(name, os),
m_heartbeatCounter(readHeartbeat()), m_timeout(5.0) {}
m_heartbeatCounter(readHeartbeat()) {
ScopedSharedLock lock(m_agent);
m_agent.fetch();
m_timeout = m_agent.getTimeout();
}
bool checkAlive() {
uint64_t newHeartBeatCount = readHeartbeat();
if (newHeartBeatCount == m_heartbeatCounter && m_timer.secs() > m_timeout)
auto timer = m_timer.secs();
if (newHeartBeatCount == m_heartbeatCounter && timer > m_timeout)
return false;
m_heartbeatCounter = newHeartBeatCount;
m_timer.reset();
if (newHeartBeatCount != m_heartbeatCounter) {
m_heartbeatCounter = newHeartBeatCount;
m_timer.reset();
}
return true;
}
......
......@@ -97,5 +97,9 @@ add_executable(cta-objectstore-dump-object cta-objectstore-dump-object.cpp)
target_link_libraries(cta-objectstore-dump-object
protobuf ctaobjectstore ctacommon)
add_executable(cta-objectstore-unfollow-agent cta-objectstore-unfollow-agent.cpp)
target_link_libraries(cta-objectstore-unfollow-agent
protobuf ctaobjectstore ctacommon)
install(TARGETS cta-objectstore-initialize cta-objectstore-list cta-objectstore-dump-object
DESTINATION usr/bin)
......@@ -163,6 +163,7 @@ void GarbageCollector::checkHeartbeats(log::LogContext & lc) {
// The agent is removed from our ownership by the calling function: we're done.
return;
}
lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): will cleanup dead agent.");
// Return all objects owned by the agent to their respective backup owners
auto ownedObjects = agent.getOwnershipList();
for (auto obj = ownedObjects.begin(); obj!= ownedObjects.end(); obj++) {
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* This program will create a VFS backend for the object store and populate
* it with the minimum elements (the root entry). The program will then print out
* the path the backend store and exit
*/
#include "BackendFactory.hpp"
#include "BackendVFS.hpp"
#include "Agent.hpp"
#include "RootEntry.hpp"
#include "AgentRegister.hpp"
#include <iostream>
#include <stdexcept>
int main(int argc, char ** argv) {
try {
std::unique_ptr<cta::objectstore::Backend> be;
if (3 == argc) {
be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release());
} else {
throw std::runtime_error("Wrong number of arguments: expected 2");
}
// If the backend is a VFS, make sure we don't delete it on exit.
// If not, nevermind.
try {
dynamic_cast<cta::objectstore::BackendVFS &>(*be).noDeleteOnExit();
} catch (std::bad_cast &){}
std::cout /* << "Object store path: " << be->getParams()->toURL()
<< " agent */<< "name=" << argv[2] << std::endl;
cta::objectstore::Agent ag(argv[2], *be);
cta::objectstore::ScopedExclusiveLock agl(ag);
ag.fetch();
// Add the agent to the list of untracked agents
cta::objectstore::RootEntry re (*be);
cta::objectstore::ScopedSharedLock rel(re);
re.fetch();
cta::objectstore::AgentRegister ar(re.getAgentRegisterAddress(), *be);
rel.release();
// Check that the agent is indeed orphaned (owned by a non-existing owner).
// Also check that he is not an active member of a cyclic ownership. To check we follow the
// ownership chain for a limited depth. If the chain comes back to this agent, we count it
// as a active cycler.
size_t depth = 50;
std::string currentOwner = ag.getOwner();
while (depth) {
// Simple orphan or owned by register: not a cycler.
if ((ar.getAddressIfSet() == currentOwner) || !be->exists(ag.getOwner())) break;
// Move to the next owner:
depth--;
cta::objectstore::Agent ag2(currentOwner, *be);
cta::objectstore::ScopedSharedLock ag2l(ag2);
ag2.fetch();
currentOwner = ag2.getOwner();
if (currentOwner == ag.getAddressIfSet()) {
std::cout << "This agent is a cycler." << std::endl;
goto cycler;
}
}
if ((ar.getAddressIfSet() != ag.getOwner()) && be->exists(ag.getOwner())) {
std::stringstream err;
err << "Agent not orphaned: owner object exists: " << ag.getOwner();
throw std::runtime_error(err.str().c_str());
}
cycler:
cta::objectstore::ScopedExclusiveLock arl(ar);
ar.fetch();
ar.untrackAgent(ag.getAddressIfSet());
ar.commit();
arl.release();
ag.setOwner(ar.getAddressIfSet());
ag.commit();
agl.release();
std::cout << "Agent is now listed as untracked." << std::endl;
} catch (std::exception & e) {
std::cerr << "Failed to untrack object: "
<< std::endl << e.what() << std::endl;
}
}
\ No newline at end of file
......@@ -240,9 +240,6 @@ SubprocessHandler::ProcessingStatus GarbageCollectorHandler::processTimeout() {
// GarbageCollectorHandler::runChild
//------------------------------------------------------------------------------
int GarbageCollectorHandler::runChild() {
// We will need a log object
cta::log::DummyLogger dl("unitTest");
cta::log::LogContext lc(dl);
// We are in the child process. It is time to open connections to the catalogue
// and object store, and run the garbage collector.
// We do not have to care for previous crashed sessions as we will garbage
......@@ -304,9 +301,9 @@ int GarbageCollectorHandler::runChild() {
pollList["0"]=m_socketPair.get();
bool receivedMessage=false;
do {
m_processManager.logContext().log(log::INFO,
m_processManager.logContext().log(log::DEBUG,
"In GarbageCollectorHandler::runChild(): About to run a GC pass.");
gc.runOnePass(lc);
gc.runOnePass(m_processManager.logContext());
try {
server::SocketPair::poll(pollList, s_pollInterval, server::SocketPair::Side::parent);
receivedMessage=true;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment