Commit adc63355 authored by Eric Cano's avatar Eric Cano
Browse files

Added the SchdulerGlobalLock object and pluged it in the RootEntry.

parent 4cfa378a
......@@ -27,7 +27,8 @@ add_library (CTAObjectStore
ObjectOps.cpp
ProtocolBuffersAlgorithms.cpp
GenericObject.cpp
GarbageCollector.cpp)
GarbageCollector.cpp
SchedulerGlobalLock.cpp)
set(ObjectStoreUnitTests
BackendTest.cpp
......
......@@ -45,8 +45,9 @@ void cta::objectstore::DriveRegister::garbageCollect(const std::string &presumed
if (presumedOwner != m_header.owner())
return;
// If the owner is still the agent, we have 2 possibilities:
// 1) The register is referenced by the root entry. We just need to officialize
// the ownership on the reguster
// 1) The register is referenced by the root entry. We just need to officialise
// the ownership on the register (if not, we will either get the NotAllocated
// exception) or the address will be different.
{
RootEntry re(m_objectStore);
ScopedSharedLock rel (re);
......
......@@ -33,6 +33,7 @@ namespace cta { namespace objectstore {
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(Tape);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(ArchiveToFileRequest);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(RetrieveToFileRequest);
MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID(SchedulerGlobalLock);
#undef MAKE_CTA_OBJECTSTORE_OBJECTOPS_TYPEID
}}
\ No newline at end of file
......@@ -21,6 +21,7 @@
#include "Agent.hpp"
#include "TapePool.hpp"
#include "DriveRegister.hpp"
#include "SchedulerGlobalLock.hpp"
#include <cxxabi.h>
#include "ProtocolBuffersAlgorithms.hpp"
......@@ -49,6 +50,8 @@ bool cta::objectstore::RootEntry::isEmpty() {
return false;
if (m_payload.agentregisterpointer().address().size())
return false;
if (m_payload.schedulerlockpointer().address().size())
return false;
return true;
}
......@@ -749,6 +752,80 @@ void cta::objectstore::RootEntry::addIntendedAgentRegistry(const std::string& ad
m_payload.set_agentregisterintent(address);
}
// =============================================================================
// ================ Scheduler global lock manipulation =========================
// =============================================================================
std::string cta::objectstore::RootEntry::getSchedulerGlobalLock() {
checkPayloadReadable();
// If the scheduler lock is defined, return it, job done.
if (m_payload.schedulerlockpointer().address().size())
return m_payload.schedulerlockpointer().address();
throw NotAllocated("In RootEntry::getAgentRegister: scheduler global lock not yet allocated");
}
// Get the name of a (possibly freshly created) scheduler global lock
std::string cta::objectstore::RootEntry::addOrGetSchedulerGlobalLockAndCommit(Agent & agent,
const CreationLog & log) {
checkPayloadWritable();
// Check if the drive register exists
try {
return getSchedulerGlobalLock();
} catch (NotAllocated &) {
// decide on the object's name and add to agent's intent. We expect the
// agent to be passed locked.
std::string sglAddress (agent.nextId("schedulerGlobalLock"));
ScopedExclusiveLock agl(agent);
agent.fetch();
agent.addToOwnership(sglAddress);
agent.commit();
// Then create the drive register object
SchedulerGlobalLock sgl(sglAddress, m_objectStore);
sgl.initialize();
sgl.setOwner(agent.getAddressIfSet());
sgl.setBackupOwner(getAddressIfSet());
sgl.insert();
// Take a lock on scheduler global lock
ScopedExclusiveLock sglLock(sgl);
// Move drive registry ownership to the root entry
auto * msgl = m_payload.mutable_schedulerlockpointer();
msgl->set_address(sglAddress);
log.serialize(*msgl->mutable_log());
commit();
// Record completion in scheduler global lock
sgl.setOwner(getAddressIfSet());
sgl.setBackupOwner(getAddressIfSet());
sgl.commit();
//... and clean up the agent
agent.removeFromOwnership(sglAddress);
agent.commit();
return sglAddress;
}
}
void cta::objectstore::RootEntry::removeSchedulerGlobalLockAndCommit() {
checkPayloadWritable();
// Get the address of the scheduler lock (nothing to do if there is none)
if (!m_payload.schedulerlockpointer().address().size())
return;
std::string sglAddress = m_payload.schedulerlockpointer().address();
SchedulerGlobalLock sgl(sglAddress, ObjectOps<serializers::RootEntry>::m_objectStore);
ScopedExclusiveLock sgll(sgl);
sgl.fetch();
// Check the drive register is empty
if (!sgl.isEmpty()) {
throw DriveRegisterNotEmpty("In RootEntry::removeSchedulerGlobalLockAndCommit: "
"trying to remove a non-empty scheduler global lock");
}
// we can delete the drive register
sgl.remove();
// And update the root entry
m_payload.mutable_schedulerlockpointer()->set_address("");
// We commit for safety and symmetry with the add operation
commit();
}
// Dump the root entry
std::string cta::objectstore::RootEntry::dump () {
checkPayloadReadable();
......
......@@ -168,6 +168,11 @@ public:
const CreationLog & log);
void removeAgentRegisterAndCommit();
// Agent register manipulations ==============================================
std::string getSchedulerGlobalLock();
std::string addOrGetSchedulerGlobalLockAndCommit(Agent & agent, const CreationLog & log);
void removeSchedulerGlobalLockAndCommit();
private:
void addIntendedAgentRegistry(const std::string & address);
......
......@@ -587,4 +587,58 @@ TEST(ObjectStore, RootEntryAgentRegister) {
ASSERT_EQ(false, re.exists());
}
TEST (ObjectStore, RootEntrySchedulerGlobalLock) {
cta::objectstore::BackendVFS be;
{
// Try to create the root entry
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
}
cta::objectstore::CreationLog cl(cta::UserIdentity(99, 99),
"unittesthost", time(NULL), "Creation of unit test scheduler global lock");
cta::objectstore::Agent ag(be);
ag.initialize();
ag.generateName("UnitTests");
{
// Try to create the root entry and allocate the agent register
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
re.addOrGetAgentRegisterPointerAndCommit(ag, cl);
}
ag.insertAndRegisterSelf();
std::string schedulerGlobalLockAddress;
{
// create the drive register
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
ASSERT_THROW(re.getDriveRegisterAddress(),
cta::objectstore::RootEntry::NotAllocated);
/*ASSERT_NO_THROW*/(
schedulerGlobalLockAddress = re.addOrGetSchedulerGlobalLockAndCommit(ag, cl));
ASSERT_TRUE(be.exists(schedulerGlobalLockAddress));
}
{
// delete the drive register
// create the drive register
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
re.removeSchedulerGlobalLockAndCommit();
ASSERT_FALSE(be.exists(schedulerGlobalLockAddress));
}
// Unregister the agent
cta::objectstore::ScopedExclusiveLock agl(ag);
ag.removeAndUnregisterSelf();
// Delete the root entry
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
re.removeAgentRegisterAndCommit();
re.removeIfEmpty();
ASSERT_EQ(false, re.exists());
}
}
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "SchedulerGlobalLock.hpp"
#include "GenericObject.hpp"
#include "RootEntry.hpp"
namespace cta { namespace objectstore {
SchedulerGlobalLock::SchedulerGlobalLock(const std::string& address, Backend& os):
ObjectOps<serializers::SchedulerGlobalLock>(os, address) { }
SchedulerGlobalLock::SchedulerGlobalLock(GenericObject& go):
ObjectOps<serializers::SchedulerGlobalLock>(go.objectStore()) {
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
void SchedulerGlobalLock::initialize() {
// Setup underlying object
ObjectOps<serializers::SchedulerGlobalLock>::initialize();
// Setup the object
m_payload.set_nextmountid(1);
m_payloadInterpreted = 1;
}
void SchedulerGlobalLock::garbageCollect(const std::string &presumedOwner) {
checkPayloadWritable();
// If the agent is not anymore the owner of the object, then only the very
// last operation of the drive register creation failed. We have nothing to do.
if (presumedOwner != m_header.owner())
return;
// If the owner is still the agent, we have 2 possibilities:
// 1) The register is referenced by the root entry. We just need to officialise
// the ownership on the scheduler lock(if not, we will either get the NotAllocated
// exception) or the address will be different.
{
RootEntry re(m_objectStore);
ScopedSharedLock rel (re);
re.fetch();
try {
if (re.getSchedulerGlobalLock() == getAddressIfSet()) {
setOwner(re.getAddressIfSet());
commit();
return;
}
} catch (RootEntry::NotAllocated &) {}
}
// 2) The tape pool is not referenced in the root entry. We can just clean it up.
if (!isEmpty()) {
throw (NotEmpty("Trying to garbage collect a non-empty AgentRegister: internal error"));
}
remove();
}
bool SchedulerGlobalLock::isEmpty() {
checkPayloadReadable();
// There is no content in the global lock.
return true;
}
}}
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
namespace cta { namespace objectstore {
class Backend;
class Agent;
class GenericObject;
class SchedulerGlobalLock: public ObjectOps<serializers::SchedulerGlobalLock> {
public:
SchedulerGlobalLock(const std::string & address, Backend & os);
SchedulerGlobalLock(GenericObject & go);
void initialize();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void garbageCollect(const std::string &presumedOwner);
bool isEmpty();
// Mount id management =======================================================
void setNextMountId(uint64_t nextId);
uint64_t getIncreaseCommitMountId();
};
}}
......@@ -11,6 +11,7 @@ enum ObjectType {
Tape_t = 5;
ArchiveToFileRequest_t = 6;
RetrieveToFileRequest_t = 7;
SchedulerGlobalLock_t = 8;
GenericObject_t = 1000;
}
......@@ -107,6 +108,12 @@ message AgentRegisterPointer {
required CreationLog log = 101;
}
// Pointer to the scheduler global lock
message SchedulerGlobalLockPointer {
required string address = 110;
required CreationLog log = 111;
}
// The root entry. This entry contains all the most static information, i.e.
// the admin handled configuration information
message RootEntry {
......@@ -118,6 +125,7 @@ message RootEntry {
optional DriveRegisterPointer driveregisterpointer = 1005;
optional AgentRegisterPointer agentregisterpointer = 1006;
optional string agentregisterintent = 1007;
optional SchedulerGlobalLockPointer schedulerlockpointer = 1008;
}
//=========== Sub-objects ======================================================
......@@ -338,5 +346,10 @@ message DriveRegister {
repeated DriveInfo drives = 7000;
}
// ------------- Scheduler global lock handling -------------------------------
message SchedulerGlobalLock {
required uint64 nextmountid = 8000;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment