Commit bc169107 authored by Eric Cano's avatar Eric Cano
Browse files

Improved implementation of agent.

Added support and unit test for object creating calls in RootEntry (DriveRegister, TapePool, AgentRegister)
parent 20b5b67d
......@@ -88,7 +88,13 @@ void cta::objectstore::Agent::insertAndRegisterSelf() {
arLock.release();
}
void cta::objectstore::Agent::deleteAndUnregisterSelf() {
void cta::objectstore::Agent::removeAndUnregisterSelf() {
// Check that we own the proper lock
checkPayloadWritable();
// Check that we are not empty
if (!isEmpty()) {
throw AgentStillOwnsObjects("In Agent::deleteAndUnregisterSelf: agent still owns objects");
}
// First delete ourselves
remove();
// Then we remove the dangling pointer about ourselves in the agent register.
......@@ -106,6 +112,14 @@ void cta::objectstore::Agent::deleteAndUnregisterSelf() {
arLock.release();
}
bool cta::objectstore::Agent::isEmpty() {
checkPayloadReadable();
if (m_payload.ownedobjects_size())
return false;
return true;
}
/*void cta::objectstore::Agent::create() {
if (!m_setupDone)
throw SetupNotDone("In Agent::create(): setup() not yet done");
......
......@@ -37,6 +37,7 @@ namespace cta { namespace objectstore {
class Agent: public ObjectOps<serializers::Agent> {
public:
CTA_GENERATE_EXCEPTION_CLASS(AgentStillOwnsObjects);
Agent(Backend & os);
Agent(const std::string & name, Backend & os);
......@@ -49,7 +50,9 @@ public:
void insertAndRegisterSelf();
void deleteAndUnregisterSelf();
void removeAndUnregisterSelf();
bool isEmpty();
/* class ScopedIntent {
public:
......
......@@ -18,6 +18,7 @@ add_library (CTAObjectStore SHARED
AgentRegister.cpp
AgentWatchdog.cpp
TapePool.cpp
DriveRegister.cpp
#AdminUsersList.cpp
BackendVFS.cpp
BackendRados.cpp
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "DriveRegister.hpp"
#include "ProtcolBuffersAlgorithms.hpp"
cta::objectstore::DriveRegister::DriveRegister(const std::string & address, Backend & os):
ObjectOps<serializers::DriveRegister>(os, address) { }
void cta::objectstore::DriveRegister::initialize() {
// Setup underlying object
ObjectOps<serializers::DriveRegister>::initialize();
m_payloadInterpreted = true;
}
void cta::objectstore::DriveRegister::addDrive(std::string driveName) {
checkPayloadWritable();
// Check that we are not trying to duplicate a drive
try {
serializers::findElement(m_payload.drivenames(), driveName);
throw DuplicateEntry("In DriveRegister::addDrive: entry already exists");
} catch (serializers::NotFound &) {}
m_payload.add_drivenames(driveName);
}
bool cta::objectstore::DriveRegister::isEmpty() {
checkPayloadReadable();
if (m_payload.drivenames_size())
return false;
return true;
}
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include <list>
namespace cta { namespace objectstore {
class Backend;
class Agent;
class DriveRegister: public ObjectOps<serializers::DriveRegister> {
CTA_GENERATE_EXCEPTION_CLASS(DuplicateEntry);
public:
DriveRegister(const std::string & address, Backend & os);
void initialize();
bool isEmpty();
void addDrive (std::string name);
void removeDrive (const std::string & name);
std::list<std::string> getDrives();
std::string dump();
};
}}
\ No newline at end of file
......@@ -208,7 +208,7 @@ void GarbageCollector::checkHeartbeats() {
}
}
// We now processed all the owned objects. We can delete the agent's entry
agent.deleteAndUnregisterSelf();
agent.removeAndUnregisterSelf();
}
......
......@@ -28,6 +28,7 @@ namespace cta { namespace objectstore {
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(AgentRegister);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Agent);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(TapePool);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(DriveRegister);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(JobPool);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RecallFIFO);
// MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(MigrationFIFO);
......
......@@ -140,22 +140,18 @@ public:
}
void setOwner(const std::string & owner) {
checkHeaderWritable();
m_header.set_owner(owner);
}
std::string getOwner() {
checkHeaderReadable();
return m_header.owner();
}
void setBackupOwner(const std::string & owner) {
checkHeaderWritable();
m_header.set_backupowner(owner);
}
std::string getBackupOwner() {
checkHeaderReadable();
return m_header.backupowner();
}
......
......@@ -20,6 +20,7 @@
#include "AgentRegister.hpp"
#include "Agent.hpp"
#include "TapePool.hpp"
#include "DriveRegister.hpp"
#include <cxxabi.h>
#include "ProtcolBuffersAlgorithms.hpp"
......@@ -36,6 +37,31 @@ void cta::objectstore::RootEntry::initialize() {
m_payloadInterpreted = true;
}
bool cta::objectstore::RootEntry::isEmpty() {
checkPayloadReadable();
if (m_payload.storageclasses_size())
return false;
if (m_payload.tapepoolpointers_size())
return false;
if (m_payload.driveregisterpointer().address().size())
return false;
if (m_payload.agentregisterintent().size())
return false;
if (m_payload.agentregisterpointer().address().size())
return false;
return true;
}
void cta::objectstore::RootEntry::removeIfEmpty() {
checkPayloadWritable();
if (!isEmpty()) {
throw NotEmpty("In RootEntry::removeIfEmpty(): root entry not empty");
}
remove();
}
// =============================================================================
// ================ Admin Hosts manipulations ==================================
// =============================================================================
......@@ -349,17 +375,15 @@ namespace {
}
}
void cta::objectstore::RootEntry::addTapePoolAndCommit(const std::string& tapePool,
const CreationLog& log, Agent& agent) {
checkHeaderWritable();
std::string cta::objectstore::RootEntry::addOrGetTapePoolAndCommit(const std::string& tapePool,
Agent& agent, const CreationLog& log) {
checkPayloadWritable();
// Check the tape pool does not already exist
try {
serializers::findElement(m_payload.tapepoolpointers(), tapePool);
throw DuplicateEntry("In RootEntry::addTapePool: trying to create duplicate entry");
return serializers::findElement(m_payload.tapepoolpointers(), tapePool).address();
} catch (serializers::NotFound &) {}
// Insert the tape pool, then its pointer, with agent intent log update
// First generate the intent
ScopedExclusiveLock al(agent);
// First generate the intent. We expect the agent to be passed locked.
std::string tapePoolAddress = agent.nextId("tapePool");
agent.fetch();
agent.addToOwnership(tapePoolAddress);
......@@ -375,6 +399,7 @@ void cta::objectstore::RootEntry::addTapePoolAndCommit(const std::string& tapePo
auto * tpp = m_payload.mutable_tapepoolpointers()->Add();
tpp->set_address(tapePoolAddress);
tpp->set_name(tapePool);
log.serialize(*tpp->mutable_log());
// We must commit here to ensure the tape pool object is referenced.
commit();
// Now update the tape pool's ownership.
......@@ -384,16 +409,16 @@ void cta::objectstore::RootEntry::addTapePoolAndCommit(const std::string& tapePo
// ... and clean up the agent
agent.removeFromOwnership(tapePoolAddress);
agent.commit();
return tapePoolAddress;
}
void cta::objectstore::RootEntry::removeTapePoolAndCommit(const std::string& tapePool,
Agent& agent) {
void cta::objectstore::RootEntry::removeTapePoolAndCommit(const std::string& tapePool) {
checkPayloadWritable();
// find the address of the tape pool object
try {
auto tpp = serializers::findElement(m_payload.tapepoolpointers(), tapePool);
// Open the tape pool object
TapePool tp (tpp.name(), ObjectOps<serializers::RootEntry>::m_objectStore);
TapePool tp (tpp.address(), ObjectOps<serializers::RootEntry>::m_objectStore);
ScopedExclusiveLock tpl(tp);
tp.fetch();
// Verify this is the tapepool we're looking for.
......@@ -422,8 +447,12 @@ void cta::objectstore::RootEntry::removeTapePoolAndCommit(const std::string& tap
std::string cta::objectstore::RootEntry::getTapePoolAddress(const std::string& tapePool) {
checkPayloadReadable();
try {
auto & tpp = serializers::findElement(m_payload.tapepoolpointers(), tapePool);
return tpp.address();
} catch (serializers::NotFound &) {
throw NotAllocated("In RootEntry::getTapePoolAddress: tape pool not allocated");
}
}
auto cta::objectstore::RootEntry::dumpTapePool() -> std::list<TapePoolDump> {
......@@ -439,6 +468,81 @@ auto cta::objectstore::RootEntry::dumpTapePool() -> std::list<TapePoolDump> {
return ret;
}
// =============================================================================
// ================ Drive register manipulation ================================
// =============================================================================
std::string cta::objectstore::RootEntry::addOrGetDriveRegisterPointerAndCommit(
Agent & agent, const CreationLog & log) {
checkPayloadWritable();
// Check if the drive register exists
try {
return getDriveRegisterAddress();
} catch (NotAllocated &) {
// decide on the object's name and add to agent's intent. We expect the
// agent to be passed locked.
std::string drAddress (agent.nextId("agentRegister"));
ScopedExclusiveLock agl(agent);
agent.fetch();
agent.addToOwnership(drAddress);
agent.commit();
// Then create the drive register object
DriveRegister dr(drAddress, m_objectStore);
dr.initialize();
// There is no garbage collection for a drive register: if it is not
// plugged to the root entry, it does not exist.
dr.setOwner("");
dr.setBackupOwner("");
dr.insert();
// Take a lock on drive registry
ScopedExclusiveLock drLock(dr);
// Move drive registry ownership to the root entry
auto * mdrp = m_payload.mutable_driveregisterpointer();
mdrp->set_address(drAddress);
log.serialize(*mdrp->mutable_log());
commit();
// Record completion in drive registry
dr.setOwner(getAddressIfSet());
dr.setBackupOwner(getAddressIfSet());
dr.commit();
//... and clean up the agent
agent.removeFromOwnership(drAddress);
agent.commit();
return drAddress;
}
}
void cta::objectstore::RootEntry::removeDriveRegisterAndCommit() {
checkPayloadWritable();
// Get the address of the drive register (nothing to do if there is none)
if (!m_payload.driveregisterpointer().address().size())
return;
std::string drAddr = m_payload.driveregisterpointer().address();
DriveRegister dr(drAddr, ObjectOps<serializers::RootEntry>::m_objectStore);
ScopedExclusiveLock drl(dr);
dr.fetch();
// Check the drive register is empty
if (!dr.isEmpty()) {
throw DriveRegisterNotEmpty("In RootEntry::removeDriveRegisterAndCommit: "
"trying to remove a non-empty drive register");
}
// we can delete the drive register
dr.remove();
// And update the root entry
m_payload.mutable_driveregisterpointer()->set_address("");
// We commit for safety and symmetry with the add operation
commit();
}
std::string cta::objectstore::RootEntry::getDriveRegisterAddress() {
checkPayloadWritable();
if (m_payload.driveregisterpointer().address().size()) {
return m_payload.driveregisterpointer().address();
}
throw NotAllocated("In RootEntry::getDriveRegisterAddress: drive register not allocated");
}
// =============================================================================
// ================ Agent register manipulation ================================
// =============================================================================
......@@ -497,7 +601,7 @@ std::string cta::objectstore::RootEntry::addOrGetAgentRegisterPointerAndCommit(A
}
}
void cta::objectstore::RootEntry::removeAgentRegister() {
void cta::objectstore::RootEntry::removeAgentRegisterAndCommit() {
checkPayloadWritable();
// Check that we do have an agent register set. Cleanup a potential intent as
// well
......@@ -513,14 +617,21 @@ void cta::objectstore::RootEntry::removeAgentRegister() {
"a non-empty intended agent register. Internal error.");
}
iar.remove();
m_payload.set_agentregisterintent("");
commit();
}
if (m_payload.agentregisterpointer().address().size()) {
AgentRegister ar(m_payload.agentregisterpointer().address(),
ObjectOps<serializers::RootEntry>::m_objectStore);
ScopedExclusiveLock arl(ar);
ar.fetch();
if (!ar.isEmpty()) {
throw AgentRegisterNotEmpty("In RootEntry::removeAgentRegister: the agent "
"register is not empty. Cannot remove.");
}
ar.remove();
m_payload.mutable_agentregisterpointer()->set_address("");
commit();
}
}
......
......@@ -36,10 +36,17 @@ public:
RootEntry(Backend & os);
CTA_GENERATE_EXCEPTION_CLASS(NotAllocated);
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
// In memory initialiser
void initialize();
// Emptyness checker
bool isEmpty();
// Safe remover
void removeIfEmpty();
// Manipulations of AdminHosts ===============================================
void addAdminHost(const std::string & hostname, const CreationLog & log);
void removeAdminHost(const std::string & hostname);
......@@ -111,11 +118,11 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(WrongTapePool);
/** This function implicitly creates the tape pool structure and updates
* the pointer to it. It needs to implicitly commit the object to the store. */
void addTapePoolAndCommit(const std::string & tapePool, const CreationLog & log,
Agent & agent);
std::string addOrGetTapePoolAndCommit(const std::string & tapePool, Agent & agent,
const CreationLog & log);
/** This function implicitly deletes the tape pool structure.
* Fails if it not empty*/
void removeTapePoolAndCommit(const std::string & tapePool, Agent & agent);
void removeTapePoolAndCommit(const std::string & tapePool);
std::string getTapePoolAddress(const std::string & tapePool);
class TapePoolDump {
public:
......@@ -126,9 +133,10 @@ public:
std::list<TapePoolDump> dumpTapePool();
// Drive register manipulations ==============================================
std::string getDriveRegisterPointer();
std::string addOrGetDriveRegisterPointer(const CreationLog & log, Agent & agent);
std::string removeDriveRegister();
CTA_GENERATE_EXCEPTION_CLASS(DriveRegisterNotEmpty);
std::string getDriveRegisterAddress();
std::string addOrGetDriveRegisterPointerAndCommit(Agent & agent, const CreationLog & log);
void removeDriveRegisterAndCommit();
// Agent register manipulations ==============================================
CTA_GENERATE_EXCEPTION_CLASS(AgentRegisterNotEmpty);
......@@ -139,7 +147,7 @@ public:
* object name generation, but not yet tracking. */
std::string addOrGetAgentRegisterPointerAndCommit(Agent & agent,
const CreationLog & log);
void removeAgentRegister();
void removeAgentRegisterAndCommit();
private:
void addIntendedAgentRegistry(const std::string & address);
......
......@@ -62,7 +62,9 @@ TEST(RootEntry, BasicAccess) {
// Delete the root entry
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.remove();
re.fetch();
re.removeAgentRegisterAndCommit();
re.removeIfEmpty();
ASSERT_EQ(false, re.exists());
}
......@@ -127,7 +129,8 @@ TEST(RootEntry, AdminHosts) {
// Delete the root entry
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.remove();
re.fetch();
re.removeIfEmpty();
ASSERT_EQ(false, re.exists());
}
......@@ -204,7 +207,8 @@ TEST(RootEntry, AdminUsers) {
// Delete the root entry
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.remove();
re.fetch();
re.removeIfEmpty();
ASSERT_EQ(false, re.exists());
}
......@@ -338,10 +342,20 @@ TEST(RootEntry, StorageClassesAndArchivalRoutes) {
//re.dumpTapePool();
});
}
{
// remove the remaining storage classes
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
re.removeStorageClass("class1");
re.removeStorageClass("class3");
re.commit();
}
// Delete the root entry
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.remove();
re.fetch();
re.removeIfEmpty();
ASSERT_EQ(false, re.exists());
}
......@@ -404,13 +418,124 @@ TEST(RootEntry, Libraries) {
// Delete the root entry
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.remove();
re.fetch();
re.removeIfEmpty();
ASSERT_EQ(false, re.exists());
}
TEST (RootEntry, TapePools) {}
TEST (RootEntry, TapePools) {
cta::objectstore::BackendVFS be;
cta::objectstore::CreationLog cl(99, "dummyUser", 99, "dummyGroup",
"unittesthost", time(NULL), "Creation of unit test agent register");
cta::objectstore::Agent ag(be);
ag.initialize();
ag.generateName("UnitTests");
{
// Try to create the root entry and allocate the agent register
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
cta::objectstore::ScopedExclusiveLock rel(re);
re.addOrGetAgentRegisterPointerAndCommit(ag, cl);
}
ag.insertAndRegisterSelf();
cta::objectstore::ScopedExclusiveLock agl(ag);
std::string tpAddr1, tpAddr2;
{
// Create the tape pools
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
ASSERT_THROW(re.getTapePoolAddress("tapePool1"),
cta::objectstore::RootEntry::NotAllocated);
tpAddr1 = re.addOrGetTapePoolAndCommit("tapePool1", ag, cl);
// Check that we car read it
cta::objectstore::TapePool tp(tpAddr1, be);
cta::objectstore::ScopedSharedLock tpl(tp);
ASSERT_NO_THROW(tp.fetch());
}
{
// Create another pool
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
tpAddr2 = re.addOrGetTapePoolAndCommit("tapePool2", ag, cl);
ASSERT_TRUE(be.exists(tpAddr2));
}
{
// Remove the other pool
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
re.removeTapePoolAndCommit("tapePool2");
ASSERT_FALSE(be.exists(tpAddr2));
}
// Unregister the agent
ag.removeAndUnregisterSelf();
// Delete the root entry
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
re.removeAgentRegisterAndCommit();
re.removeTapePoolAndCommit("tapePool1");
ASSERT_FALSE(be.exists(tpAddr1));
re.removeIfEmpty();
ASSERT_EQ(false, re.exists());
}
TEST (RootEntry, DriveRegister) {}
TEST (RootEntry, DriveRegister) {
cta::objectstore::BackendVFS be;
{
// Try to create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
}
cta::objectstore::CreationLog cl(99, "dummyUser", 99, "dummyGroup",
"unittesthost", time(NULL), "Creation of unit test agent register");
cta::objectstore::Agent ag(be);
ag.initialize();
ag.generateName("UnitTests");
{
// Try to create the root entry and allocate the agent register
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
re.addOrGetAgentRegisterPointerAndCommit(ag, cl);
}
ag.insertAndRegisterSelf();
std::string driveRegisterAddress;
{
// create the drive register
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
ASSERT_THROW(re.getDriveRegisterAddress(),
cta::objectstore::RootEntry::NotAllocated);