Commit cfe8f58b authored by Eric Cano's avatar Eric Cano
Browse files

cta/CTA#46: Adding garbage collection for object store structures:

Created a garbage collector subprocess for the drive daemon.
Created a AgentHeartbeat thread and added it to frontend, drive subprocess and GC subprocess.
Fixed logs and drive subprocess.
Renamed common/threading/Threading.[hc]pp to Thread.[hc]pp as there is only one class left in this file.
parent eb6ff4ed
......@@ -117,7 +117,7 @@ set (COMMON_LIB_SRC_FILES
threading/Mutex.cpp
threading/SocketPair.cpp
threading/System.cpp
threading/Threading.cpp
threading/Thread.cpp
threading/Semaphores.cpp
threading/SubProcess.cpp
utils/GetOptThreadSafe.cpp
......
......@@ -20,7 +20,7 @@
#pragma once
#include "common/threading/MutexLocker.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
namespace cta {
namespace threading {
......
......@@ -21,7 +21,7 @@
* @author Castor Dev team, castor-dev@cern.ch
*****************************************************************************/
#include <gtest/gtest.h>
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
#include "common/threading/AtomicCounter.hpp"
namespace unitTests {
struct ThreadPlus : public cta::threading::Thread{
......
......@@ -20,7 +20,7 @@
#pragma once
#include "common/threading/MutexLocker.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
namespace cta {
namespace threading {
......
......@@ -22,7 +22,7 @@
#include <exception>
#include "common/threading/MutexLocker.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
#include "common/threading/Semaphores.hpp"
namespace cta {
......
......@@ -18,7 +18,7 @@
#include "common/threading/MutexLocker.hpp"
#include "common/threading/Semaphores.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
#include "common/exception/Errnum.hpp"
#include "common/exception/Exception.hpp"
#include <errno.h>
......
......@@ -16,7 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Threading.hpp"
#include "Thread.hpp"
#include <errno.h>
#include <typeinfo>
#include <stdlib.h>
......
......@@ -17,7 +17,7 @@
*/
#include <gtest/gtest.h>
#include "Threading.hpp"
#include "Thread.hpp"
#include "ChildProcess.hpp"
/* This is a collection of multi process unit tests, which can (and should)
......
......@@ -17,7 +17,7 @@
*/
#include "common/threading/MutexLocker.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
#include "common/threading/Semaphores.hpp"
#include <gtest/gtest.h>
......
......@@ -17,7 +17,7 @@
*/
#include "common/threading/MutexLocker.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
#include "common/threading/ChildProcess.hpp"
#include "common/threading/Semaphores.hpp"
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "AgentHeartbeatThread.hpp"
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// AgentHeartbeatThread::stopAndWaitThread
//------------------------------------------------------------------------------
void AgentHeartbeatThread::stopAndWaitThread() {
m_exit.set_value();
wait();
}
//------------------------------------------------------------------------------
// AgentHeartbeatThread::stopAndWaitThread
//------------------------------------------------------------------------------
void AgentHeartbeatThread::run() {
while (std::future_status::ready != m_exit.get_future().wait_for(m_heartRate)) {
m_agentReference.bumpHeatbeat(m_backend);
}
}
}} // namespace cta::objectstore
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "AgentReference.hpp"
#include "common/threading/Thread.hpp"
#include <future>
#include <chrono>
namespace cta { namespace objectstore {
/**
* Thread handler managing a heartbeat updated for the agent object representing
* the process. Just needs to be started and stopped.
*/
class AgentHeartbeatThread: private cta::threading::Thread {
public:
/**
* Constructor
* @param agentReference reference to the agent object.
*/
AgentHeartbeatThread(AgentReference & agentReference, Backend & backend):
m_backend(backend) , m_agentReference(agentReference) {}
/**
* Start the thread
*/
void startThread() { start(); }
/**
* Stop and wait for the thread (graceful shutdown)
*/
void stopAndWaitThread();
private:
/// Reference to the object store backend.
Backend & m_backend;
/// Reference to the agent
AgentReference & m_agentReference;
/// The thread's run function
void run() override;
/// A promise used for graceful exit.
std::promise<void> m_exit;
/// The heartbeat update rate.
std::chrono::seconds const m_heartRate = std::chrono::seconds(30);
};
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -79,6 +79,12 @@ void AgentReference::removeFromOwnership(const std::string& objectAddress, objec
queueAndExecuteAction(a, backend);
}
void AgentReference::bumpHeatbeat(objectstore::Backend& backend) {
Action a{AgentOperation::Heartbeat, "", std::promise<void>()};
queueAndExecuteAction(a, backend);
}
void AgentReference::queueAndExecuteAction(Action& action, objectstore::Backend& backend) {
// First, we need to determine if a queue exists or not.
// If so, we just use it, and if not, we create and serve it.
......@@ -161,6 +167,8 @@ void AgentReference::appyAction(Action& action, objectstore::Agent& agent) {
case AgentOperation::Remove:
agent.removeFromOwnership(action.objectAddress);
break;
case AgentOperation::Heartbeat:
agent.bumpHeartbeat();
default:
throw cta::exception::Exception("In AgentReference::appyAction(): unknown operation.");
}
......
......@@ -72,6 +72,12 @@ public:
* @param objectAddress
*/
void removeFromOwnership(const std::string &objectAddress, objectstore::Backend& backend);
/**
* Bumps up the heart beat of the agent. This action is queued in memory like the
* additions and removals from ownership.
*/
void bumpHeatbeat(objectstore::Backend& backend);
/**
* Gets the address of the Agent object generated on construction.
......@@ -87,7 +93,8 @@ private:
*/
enum class AgentOperation: char {
Add,
Remove
Remove,
Heartbeat
};
/**
......
......@@ -51,6 +51,7 @@ add_library (ctaobjectstore SHARED
${CTAProtoSources}
RootEntry.cpp
Agent.cpp
AgentHeartbeatThread.cpp
AgentReference.cpp
AgentRegister.cpp
AgentWatchdog.cpp
......
......@@ -39,7 +39,7 @@
#include "common/log/StringLogger.hpp"
#include "common/make_unique.hpp"
#include "common/processCap/ProcessCapDummy.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
#include "common/utils/utils.hpp"
#include "mediachanger/MediaChangerFacade.hpp"
//#include "smc_struct.h"
......
......@@ -28,7 +28,7 @@
#include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
#include "castor/tape/tapeserver/file/RadosStriperPool.hpp"
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
#include "common/threading/AtomicCounter.hpp"
#include "common/log/LogContext.hpp"
#include "common/Timer.hpp"
......
......@@ -24,7 +24,7 @@
#pragma once
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
#include "common/threading/AtomicCounter.hpp"
#include "common/log/LogContext.hpp"
#include "castor/tape/tapeserver/utils/suppressUnusedVariable.hpp"
......
......@@ -24,7 +24,7 @@
#pragma once
#include "common/threading/BlockingQueue.hpp"
#include "common/threading/Threading.hpp"
#include "common/threading/Thread.hpp"
#include "common/log/LogContext.hpp"
namespace castor {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment