Skip to content
Snippets Groups Projects
Commit 0905874c authored by Eric Cano's avatar Eric Cano
Browse files

Added the SchdulerGlobalLock object and pluged it in the RootEntry.

parent 4a8d7f39
Branches
Tags
No related merge requests found
......@@ -27,7 +27,8 @@ add_library (CTAObjectStore
ObjectOps.cpp
ProtocolBuffersAlgorithms.cpp
GenericObject.cpp
GarbageCollector.cpp)
GarbageCollector.cpp
SchedulerGlobalLock.cpp)
set(ObjectStoreUnitTests
BackendTest.cpp
......
......@@ -45,8 +45,9 @@ void cta::objectstore::DriveRegister::garbageCollect(const std::string &presumed
if (presumedOwner != m_header.owner())
return;
// If the owner is still the agent, we have 2 possibilities:
// 1) The register is referenced by the root entry. We just need to officialize
// the ownership on the reguster
// 1) The register is referenced by the root entry. We just need to officialise
// the ownership on the register (if not, we will either get the NotAllocated
// exception) or the address will be different.
{
RootEntry re(m_objectStore);
ScopedSharedLock rel (re);
......
......@@ -33,6 +33,7 @@ namespace cta { namespace objectstore {
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Tape);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(ArchiveToFileRequest);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RetrieveToFileRequest);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(SchedulerGlobalLock);
#undef MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID
}}
\ No newline at end of file
......@@ -21,6 +21,7 @@
#include "Agent.hpp"
#include "TapePool.hpp"
#include "DriveRegister.hpp"
#include "SchedulerGlobalLock.hpp"
#include <cxxabi.h>
#include "ProtocolBuffersAlgorithms.hpp"
......@@ -49,6 +50,8 @@ bool cta::objectstore::RootEntry::isEmpty() {
return false;
if (m_payload.agentregisterpointer().address().size())
return false;
if (m_payload.schedulerlockpointer().address().size())
return false;
return true;
}
......@@ -749,6 +752,80 @@ void cta::objectstore::RootEntry::addIntendedAgentRegistry(const std::string& ad
m_payload.set_agentregisterintent(address);
}
// =============================================================================
// ================ Scheduler global lock manipulation =========================
// =============================================================================
std::string cta::objectstore::RootEntry::getSchedulerGlobalLock() {
checkPayloadReadable();
// If the scheduler lock is defined, return it, job done.
if (m_payload.schedulerlockpointer().address().size())
return m_payload.schedulerlockpointer().address();
throw NotAllocated("In RootEntry::getAgentRegister: scheduler global lock not yet allocated");
}
// Get the name of a (possibly freshly created) scheduler global lock
std::string cta::objectstore::RootEntry::addOrGetSchedulerGlobalLockAndCommit(Agent & agent,
const CreationLog & log) {
checkPayloadWritable();
// Check if the drive register exists
try {
return getSchedulerGlobalLock();
} catch (NotAllocated &) {
// decide on the object's name and add to agent's intent. We expect the
// agent to be passed locked.
std::string sglAddress (agent.nextId("schedulerGlobalLock"));
ScopedExclusiveLock agl(agent);
agent.fetch();
agent.addToOwnership(sglAddress);
agent.commit();
// Then create the drive register object
SchedulerGlobalLock sgl(sglAddress, m_objectStore);
sgl.initialize();
sgl.setOwner(agent.getAddressIfSet());
sgl.setBackupOwner(getAddressIfSet());
sgl.insert();
// Take a lock on scheduler global lock
ScopedExclusiveLock sglLock(sgl);
// Move drive registry ownership to the root entry
auto * msgl = m_payload.mutable_schedulerlockpointer();
msgl->set_address(sglAddress);
log.serialize(*msgl->mutable_log());
commit();
// Record completion in scheduler global lock
sgl.setOwner(getAddressIfSet());
sgl.setBackupOwner(getAddressIfSet());
sgl.commit();
//... and clean up the agent
agent.removeFromOwnership(sglAddress);
agent.commit();
return sglAddress;
}
}
void cta::objectstore::RootEntry::removeSchedulerGlobalLockAndCommit() {
checkPayloadWritable();
// Get the address of the scheduler lock (nothing to do if there is none)
if (!m_payload.schedulerlockpointer().address().size())
return;
std::string sglAddress = m_payload.schedulerlockpointer().address();
SchedulerGlobalLock sgl(sglAddress, ObjectOps<serializers::RootEntry>::m_objectStore);
ScopedExclusiveLock sgll(sgl);
sgl.fetch();
// Check the drive register is empty
if (!sgl.isEmpty()) {
throw DriveRegisterNotEmpty("In RootEntry::removeSchedulerGlobalLockAndCommit: "
"trying to remove a non-empty scheduler global lock");
}
// we can delete the drive register
sgl.remove();
// And update the root entry
m_payload.mutable_schedulerlockpointer()->set_address("");
// We commit for safety and symmetry with the add operation
commit();
}
// Dump the root entry
std::string cta::objectstore::RootEntry::dump () {
checkPayloadReadable();
......
......@@ -168,6 +168,11 @@ public:
const CreationLog & log);
void removeAgentRegisterAndCommit();
// Agent register manipulations ==============================================
std::string getSchedulerGlobalLock();
std::string addOrGetSchedulerGlobalLockAndCommit(Agent & agent, const CreationLog & log);
void removeSchedulerGlobalLockAndCommit();
private:
void addIntendedAgentRegistry(const std::string & address);
......
......@@ -587,4 +587,58 @@ TEST(ObjectStore, RootEntryAgentRegister) {
ASSERT_EQ(false, re.exists());
}
TEST (ObjectStore, RootEntrySchedulerGlobalLock) {
cta::objectstore::BackendVFS be;
{
// Try to create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
}
cta::objectstore::CreationLog cl(cta::UserIdentity(99, 99),
"unittesthost", time(NULL), "Creation of unit test scheduler global lock");
cta::objectstore::Agent ag(be);
ag.initialize();
ag.generateName("UnitTests");
{
// Try to create the root entry and allocate the agent register
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
re.addOrGetAgentRegisterPointerAndCommit(ag, cl);
}
ag.insertAndRegisterSelf();
std::string schedulerGlobalLockAddress;
{
// create the drive register
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
ASSERT_THROW(re.getDriveRegisterAddress(),
cta::objectstore::RootEntry::NotAllocated);
/*ASSERT_NO_THROW*/(
schedulerGlobalLockAddress = re.addOrGetSchedulerGlobalLockAndCommit(ag, cl));
ASSERT_TRUE(be.exists(schedulerGlobalLockAddress));
}
{
// delete the drive register
// create the drive register
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
re.removeSchedulerGlobalLockAndCommit();
ASSERT_FALSE(be.exists(schedulerGlobalLockAddress));
}
// Unregister the agent
cta::objectstore::ScopedExclusiveLock agl(ag);
ag.removeAndUnregisterSelf();
// Delete the root entry
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
re.removeAgentRegisterAndCommit();
re.removeIfEmpty();
ASSERT_EQ(false, re.exists());
}
}
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "SchedulerGlobalLock.hpp"
#include "GenericObject.hpp"
#include "RootEntry.hpp"
namespace cta { namespace objectstore {
SchedulerGlobalLock::SchedulerGlobalLock(const std::string& address, Backend& os):
ObjectOps<serializers::SchedulerGlobalLock>(os, address) { }
SchedulerGlobalLock::SchedulerGlobalLock(GenericObject& go):
ObjectOps<serializers::SchedulerGlobalLock>(go.objectStore()) {
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
void SchedulerGlobalLock::initialize() {
// Setup underlying object
ObjectOps<serializers::SchedulerGlobalLock>::initialize();
// Setup the object
m_payload.set_nextmountid(1);
m_payloadInterpreted = 1;
}
void SchedulerGlobalLock::garbageCollect(const std::string &presumedOwner) {
checkPayloadWritable();
// If the agent is not anymore the owner of the object, then only the very
// last operation of the drive register creation failed. We have nothing to do.
if (presumedOwner != m_header.owner())
return;
// If the owner is still the agent, we have 2 possibilities:
// 1) The register is referenced by the root entry. We just need to officialise
// the ownership on the scheduler lock(if not, we will either get the NotAllocated
// exception) or the address will be different.
{
RootEntry re(m_objectStore);
ScopedSharedLock rel (re);
re.fetch();
try {
if (re.getSchedulerGlobalLock() == getAddressIfSet()) {
setOwner(re.getAddressIfSet());
commit();
return;
}
} catch (RootEntry::NotAllocated &) {}
}
// 2) The tape pool is not referenced in the root entry. We can just clean it up.
if (!isEmpty()) {
throw (NotEmpty("Trying to garbage collect a non-empty AgentRegister: internal error"));
}
remove();
}
bool SchedulerGlobalLock::isEmpty() {
checkPayloadReadable();
// There is no content in the global lock.
return true;
}
}}
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
namespace cta { namespace objectstore {
class Backend;
class Agent;
class GenericObject;
class SchedulerGlobalLock: public ObjectOps<serializers::SchedulerGlobalLock> {
public:
SchedulerGlobalLock(const std::string & address, Backend & os);
SchedulerGlobalLock(GenericObject & go);
void initialize();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void garbageCollect(const std::string &presumedOwner);
bool isEmpty();
// Mount id management =======================================================
void setNextMountId(uint64_t nextId);
uint64_t getIncreaseCommitMountId();
};
}}
......@@ -11,6 +11,7 @@ enum ObjectType {
Tape_t = 5;
ArchiveToFileRequest_t = 6;
RetrieveToFileRequest_t = 7;
SchedulerGlobalLock_t = 8;
GenericObject_t = 1000;
}
......@@ -107,6 +108,12 @@ message AgentRegisterPointer {
required CreationLog log = 101;
}
// Pointer to the scheduler global lock
message SchedulerGlobalLockPointer {
required string address = 110;
required CreationLog log = 111;
}
// The root entry. This entry contains all the most static information, i.e.
// the admin handled configuration information
message RootEntry {
......@@ -118,6 +125,7 @@ message RootEntry {
optional DriveRegisterPointer driveregisterpointer = 1005;
optional AgentRegisterPointer agentregisterpointer = 1006;
optional string agentregisterintent = 1007;
optional SchedulerGlobalLockPointer schedulerlockpointer = 1008;
}
//=========== Sub-objects ======================================================
......@@ -338,5 +346,10 @@ message DriveRegister {
repeated DriveInfo drives = 7000;
}
// ------------- Scheduler global lock handling -------------------------------
message SchedulerGlobalLock {
required uint64 nextmountid = 8000;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment