Commit 45bbb7f7 authored by Daniele Kruse's avatar Daniele Kruse
Browse files

adding new objectstore queues for tapes and tapepools

parent 94ef8d87
......@@ -42,6 +42,8 @@ set (CTAProtoDependants objectstore/Agent.hpp
objectstore/SchedulerGlobalLock.hpp
objectstore/Tape.hpp
objectstore/TapePool.hpp
objectstore/TapeQueue.hpp
objectstore/TapePoolQueue.hpp
objectstore/UserIdentity.hpp)
SET_SOURCE_FILES_PROPERTIES(${CTAProtoHeaders} PROPERTIES HEADER_FILE_ONLY TRUE)
......@@ -57,6 +59,8 @@ add_library (ctaobjectstore SHARED
AgentWatchdog.cpp
TapePool.cpp
Tape.cpp
TapePoolQueue.cpp
TapeQueue.cpp
ArchiveToFileRequest.cpp
ArchiveRequest.cpp
RetrieveToFileRequest.cpp
......
......@@ -20,6 +20,8 @@
#include "AgentRegister.hpp"
#include "Agent.hpp"
#include "TapePool.hpp"
#include "TapePoolQueue.hpp"
#include "TapeQueue.hpp"
#include "DriveRegister.hpp"
#include "GenericObject.hpp"
#include "SchedulerGlobalLock.hpp"
......@@ -572,6 +574,111 @@ auto cta::objectstore::RootEntry::dumpTapePools() -> std::list<TapePoolDump> {
return ret;
}
// =============================================================================
// ========== Tape pool queues manipulations =========================================
// =============================================================================
// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
bool operator==(const std::string &tp,
const cta::objectstore::serializers::TapePoolQueuePointer & tpp) {
return tpp.name() == tp;
}
}
std::string cta::objectstore::RootEntry::addOrGetTapePoolQueueAndCommit(const std::string& tapePool, Agent& agent) {
checkPayloadWritable();
// Check the tape pool does not already exist
try {
return serializers::findElement(m_payload.tapepoolqueuepointers(), tapePool).address();
} catch (serializers::NotFound &) {}
// Insert the tape pool, then its pointer, with agent intent log update
// First generate the intent. We expect the agent to be passed locked.
std::string tapePoolQueueAddress = agent.nextId("tapePoolQueue");
// TODO Do we expect the agent to be passed locked or not: to be clarified.
ScopedExclusiveLock agl(agent);
agent.fetch();
agent.addToOwnership(tapePoolQueueAddress);
agent.commit();
// Then create the tape pool object
TapePool tp(tapePoolQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
tp.initialize(tapePool);
tp.setOwner(agent.getAddressIfSet());
tp.setBackupOwner("root");
tp.insert();
ScopedExclusiveLock tpl(tp);
// Now move the tape pool's ownership to the root entry
auto * tpp = m_payload.mutable_tapepoolqueuepointers()->Add();
tpp->set_address(tapePoolQueueAddress);
// We must commit here to ensure the tape pool object is referenced.
commit();
// Now update the tape pool's ownership.
tp.setOwner(getAddressIfSet());
tp.setBackupOwner(getAddressIfSet());
tp.commit();
// ... and clean up the agent
agent.removeFromOwnership(tapePoolQueueAddress);
agent.commit();
return tapePoolQueueAddress;
}
void cta::objectstore::RootEntry::removeTapePoolQueueAndCommit(const std::string& tapePool) {
checkPayloadWritable();
// find the address of the tape pool object
try {
auto tpp = serializers::findElement(m_payload.tapepoolqueuepointers(), tapePool);
// Open the tape pool object
TapePoolQueue tp (tpp.address(), ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
ScopedExclusiveLock tpl(tp);
tp.fetch();
// Verify this is the tapepool we're looking for.
if (tp.getName() != tapePool) {
std::stringstream err;
err << "Unexpected tape pool name found in object pointed to for tape pool: "
<< tapePool << " found: " << tp.getName();
throw WrongTapePoolQueue(err.str());
}
// Check the tape pool is empty
if (!tp.isEmpty()) {
throw TapePoolQueueNotEmpty ("In RootEntry::removeTapePoolQueueAndCommit: trying to "
"remove a non-empty tape pool");
}
// We can delete the pool
tp.remove();
// ... and remove it from our entry
serializers::removeOccurences(m_payload.mutable_tapepoolqueuepointers(), tapePool);
// We commit for safety and symmetry with the add operation
commit();
} catch (serializers::NotFound &) {
// No such tape pool. Nothing to to.
throw NoSuchTapePoolQueue("In RootEntry::removeTapePoolQueueAndCommit: trying to remove non-existing tape pool");
}
}
std::string cta::objectstore::RootEntry::getTapePoolQueueAddress(const std::string& tapePool) {
checkPayloadReadable();
try {
auto & tpp = serializers::findElement(m_payload.tapepoolqueuepointers(), tapePool);
return tpp.address();
} catch (serializers::NotFound &) {
throw NotAllocated("In RootEntry::getTapePoolQueueAddress: tape pool not allocated");
}
}
auto cta::objectstore::RootEntry::dumpTapePoolQueues() -> std::list<TapePoolQueueDump> {
checkPayloadReadable();
std::list<TapePoolQueueDump> ret;
auto & tpl = m_payload.tapepoolqueuepointers();
for (auto i = tpl.begin(); i!=tpl.end(); i++) {
ret.push_back(TapePoolQueueDump());
ret.back().address = i->address();
ret.back().tapePool = i->name();
}
return ret;
}
// =============================================================================
// ================ Drive register manipulation ================================
// =============================================================================
......
......@@ -155,6 +155,24 @@ public:
};
std::list<TapePoolDump> dumpTapePools();
// TapePoolQueue Manipulations =====================================================
CTA_GENERATE_EXCEPTION_CLASS(TapePoolQueueNotEmpty);
CTA_GENERATE_EXCEPTION_CLASS(WrongTapePoolQueue);
/** This function implicitly creates the tape pool structure and updates
* the pointer to it. It needs to implicitly commit the object to the store. */
std::string addOrGetTapePoolQueueAndCommit(const std::string & tapePool, Agent & agent);
/** This function implicitly deletes the tape pool structure.
* Fails if it not empty*/
CTA_GENERATE_EXCEPTION_CLASS(NoSuchTapePoolQueue);
void removeTapePoolQueueAndCommit(const std::string & tapePool);
std::string getTapePoolQueueAddress(const std::string & tapePool);
class TapePoolQueueDump {
public:
std::string tapePool;
std::string address;
};
std::list<TapePoolQueueDump> dumpTapePoolQueues();
// Drive register manipulations ==============================================
CTA_GENERATE_EXCEPTION_CLASS(DriveRegisterNotEmpty);
std::string getDriveRegisterAddress();
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "TapePoolQueue.hpp"
#include "GenericObject.hpp"
#include "ProtocolBuffersAlgorithms.hpp"
#include "CreationLog.hpp"
#include "Tape.hpp"
#include "RootEntry.hpp"
#include <json-c/json.h>
cta::objectstore::TapePoolQueue::TapePoolQueue(const std::string& address, Backend& os):
ObjectOps<serializers::TapePoolQueue, serializers::TapePoolQueue_t>(os, address) { }
cta::objectstore::TapePoolQueue::TapePoolQueue(GenericObject& go):
ObjectOps<serializers::TapePoolQueue, serializers::TapePoolQueue_t>(go.objectStore()) {
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
std::string cta::objectstore::TapePoolQueue::dump() {
checkPayloadReadable();
std::stringstream ret;
ret << "TapePoolQueue" << std::endl;
struct json_object * jo = json_object_new_object();
json_object_object_add(jo, "name", json_object_new_string(m_payload.tapepoolname().c_str()));
json_object_object_add(jo, "ArchiveJobsTotalSize", json_object_new_int64(m_payload.archivejobstotalsize()));
json_object_object_add(jo, "oldestJobCreationTime", json_object_new_int64(m_payload.oldestjobcreationtime()));
{
json_object * array = json_object_new_array();
for (auto i = m_payload.pendingarchivejobs().begin(); i!=m_payload.pendingarchivejobs().end(); i++) {
json_object * jot = json_object_new_object();
json_object_object_add(jot, "address", json_object_new_string(i->address().c_str()));
json_object_object_add(jot, "copynb", json_object_new_int(i->copynb()));
json_object_object_add(jot, "path", json_object_new_string(i->path().c_str()));
json_object_object_add(jot, "size", json_object_new_int64(i->size()));
json_object_array_add(array, jot);
}
json_object_object_add(jo, "pendingarchivejobs", array);
}
{
json_object * array = json_object_new_array();
for (auto i = m_payload.orphanedarchivejobsnscreation().begin(); i!=m_payload.orphanedarchivejobsnscreation().end(); i++) {
json_object * jot = json_object_new_object();
json_object_object_add(jot, "address", json_object_new_string(i->address().c_str()));
json_object_object_add(jot, "copynb", json_object_new_int(i->copynb()));
json_object_object_add(jot, "path", json_object_new_string(i->path().c_str()));
json_object_object_add(jot, "size", json_object_new_int64(i->size()));
json_object_array_add(array, jot);
}
json_object_object_add(jo, "orphanedarchivejobsnscreation", array);
}
{
json_object * array = json_object_new_array();
for (auto i = m_payload.orphanedarchivejobsnsdeletion().begin(); i!=m_payload.orphanedarchivejobsnsdeletion().end(); i++) {
json_object * jot = json_object_new_object();
json_object_object_add(jot, "address", json_object_new_string(i->address().c_str()));
json_object_object_add(jot, "copynb", json_object_new_int(i->copynb()));
json_object_object_add(jot, "path", json_object_new_string(i->path().c_str()));
json_object_object_add(jot, "size", json_object_new_int64(i->size()));
json_object_array_add(array, jot);
}
json_object_object_add(jo, "orphanedarchivejobsnsdeletion", array);
}
ret << json_object_to_json_string_ext(jo, JSON_C_TO_STRING_PRETTY) << std::endl;
json_object_put(jo);
return ret.str();
}
void cta::objectstore::TapePoolQueue::initialize(const std::string& name) {
// Setup underlying object
ObjectOps<serializers::TapePoolQueue, serializers::TapePoolQueue_t>::initialize();
// Setup the object so it's valid
m_payload.set_tapepoolname(name);
// set the archive jobs counter to zero
m_payload.set_archivejobstotalsize(0);
m_payload.set_oldestjobcreationtime(0);
// This object is good to go (to storage)
m_payloadInterpreted = true;
}
bool cta::objectstore::TapePoolQueue::isEmpty() {
checkPayloadReadable();
// Check we have no archive jobs pending
if (m_payload.pendingarchivejobs_size()
|| m_payload.orphanedarchivejobsnscreation_size()
|| m_payload.orphanedarchivejobsnsdeletion_size())
return false;
// If we made it to here, it seems the pool is indeed empty.
return true;
}
void cta::objectstore::TapePoolQueue::garbageCollect(const std::string &presumedOwner) {
checkPayloadWritable();
// If the agent is not anymore the owner of the object, then only the very
// last operation of the tape pool creation failed. We have nothing to do.
if (presumedOwner != m_header.owner())
return;
// If the owner is still the agent, there are 2 possibilities
// 1) The tape pool is referenced in the root entry, and then nothing is needed
// besides setting the tape pool's owner to the root entry's address in
// order to enable its usage. Before that, it was considered as a dangling
// pointer.
{
RootEntry re(m_objectStore);
ScopedSharedLock rel (re);
re.fetch();
auto tpd=re.dumpTapePoolQueues();
for (auto tp=tpd.begin(); tp!=tpd.end(); tp++) {
if (tp->address == getAddressIfSet()) {
setOwner(re.getAddressIfSet());
commit();
return;
}
}
}
// 2) The tape pool is not referenced by the root entry. It is then effectively
// not accessible and should be discarded.
if (!isEmpty()) {
throw (NotEmpty("Trying to garbage collect a non-empty TapePoolQueue: internal error"));
}
remove();
}
void cta::objectstore::TapePoolQueue::setName(const std::string& name) {
checkPayloadWritable();
m_payload.set_tapepoolname(name);
}
std::string cta::objectstore::TapePoolQueue::getName() {
checkPayloadReadable();
return m_payload.tapepoolname();
}
void cta::objectstore::TapePoolQueue::addJob(const ArchiveRequest::JobDump& job,
const std::string & archiveToFileAddress, const std::string & path,
uint64_t size, uint64_t priority, time_t startTime) {
checkPayloadWritable();
// The tape pool gets the highest priority of its jobs
if (m_payload.pendingarchivejobs_size()) {
if ((uint64_t)startTime < m_payload.oldestjobcreationtime())
m_payload.set_oldestjobcreationtime(startTime);
m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + size);
} else {
m_payload.set_archivejobstotalsize(size);
m_payload.set_oldestjobcreationtime(startTime);
}
auto * j = m_payload.add_pendingarchivejobs();
j->set_address(archiveToFileAddress);
j->set_size(size);
j->set_path(path);
j->set_copynb(job.copyNb);
}
auto cta::objectstore::TapePoolQueue::getJobsSummary() -> JobsSummary {
checkPayloadReadable();
JobsSummary ret;
ret.files = m_payload.pendingarchivejobs_size();
ret.bytes = m_payload.archivejobstotalsize();
ret.oldestJobStartTime = m_payload.oldestjobcreationtime();
return ret;
}
bool cta::objectstore::TapePoolQueue::addJobIfNecessary(
const ArchiveRequest::JobDump& job,
const std::string& archiveToFileAddress,
const std::string & path, uint64_t size) {
checkPayloadWritable();
auto & jl=m_payload.pendingarchivejobs();
for (auto j=jl.begin(); j!= jl.end(); j++) {
if (j->address() == archiveToFileAddress)
return false;
}
auto * j = m_payload.add_pendingarchivejobs();
j->set_address(archiveToFileAddress);
j->set_size(size);
j->set_path(path);
j->set_copynb(job.copyNb);
return true;
}
void cta::objectstore::TapePoolQueue::removeJob(const std::string& archiveToFileAddress) {
checkPayloadWritable();
auto * jl=m_payload.mutable_pendingarchivejobs();
bool found = false;
do {
found = false;
// Push the found entry all the way to the end.
for (size_t i=0; i<(size_t)jl->size(); i++) {
if (jl->Get(i).address() == archiveToFileAddress) {
found = true;
while (i+1 < (size_t)jl->size()) {
jl->SwapElements(i, i+1);
i++;
}
break;
}
}
// and remove it
if (found)
jl->RemoveLast();
} while (found);
}
auto cta::objectstore::TapePoolQueue::dumpJobs() -> std::list<JobDump> {
checkPayloadReadable();
std::list<JobDump> ret;
auto & jl=m_payload.pendingarchivejobs();
for (auto j=jl.begin(); j!=jl.end(); j++) {
ret.push_back(JobDump());
ret.back().address = j->address();
ret.back().size = j->size();
ret.back().copyNb = j->copynb();
}
return ret;
}
bool cta::objectstore::TapePoolQueue::addOrphanedJobPendingNsCreation(
const ArchiveRequest::JobDump& job,
const std::string& archiveToFileAddress,
const std::string & path,
uint64_t size) {
checkPayloadWritable();
auto & jl=m_payload.orphanedarchivejobsnscreation();
for (auto j=jl.begin(); j!= jl.end(); j++) {
if (j->address() == archiveToFileAddress)
return false;
}
auto * j = m_payload.add_orphanedarchivejobsnscreation();
j->set_address(archiveToFileAddress);
j->set_size(size);
j->set_path(path);
j->set_copynb(job.copyNb);
return true;
}
bool cta::objectstore::TapePoolQueue::addOrphanedJobPendingNsDeletion(
const ArchiveRequest::JobDump& job,
const std::string& archiveToFileAddress,
const std::string & path, uint64_t size) {
checkPayloadWritable();
auto & jl=m_payload.orphanedarchivejobsnsdeletion();
for (auto j=jl.begin(); j!= jl.end(); j++) {
if (j->address() == archiveToFileAddress)
return false;
}
auto * j = m_payload.add_orphanedarchivejobsnsdeletion();
j->set_address(archiveToFileAddress);
j->set_size(size);
j->set_path(path);
return true;
}
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "Backend.hpp"
#include "ObjectOps.hpp"
#include <string>
#include "objectstore/cta.pb.h"
#include "common/CreationLog.hpp"
#include "common/MountControl.hpp"
#include "ArchiveToFileRequest.hpp"
#include "ArchiveRequest.hpp"
#include "CreationLog.hpp"
#include "Agent.hpp"
#include "common/archiveNS/Tape.hpp"
namespace cta { namespace objectstore {
class GenericObject;
class TapePoolQueue: public ObjectOps<serializers::TapePoolQueue, serializers::TapePoolQueue_t> {
public:
// Constructor
TapePoolQueue(const std::string & address, Backend & os);
// Upgrader form generic object
TapePoolQueue(GenericObject & go);
// In memory initialiser
void initialize(const std::string & name);
// Set/get name
void setName(const std::string & name);
std::string getName();
// Archive jobs management ===================================================
void addJob(const ArchiveRequest::JobDump & job,
const std::string & archiveToFileAddress, const std::string & path,
uint64_t size, uint64_t priority, time_t startTime);
/// This version will check for existence of the job in the queue before
// returns true if a new job was actually inserted.
bool addJobIfNecessary(const ArchiveRequest::JobDump & job,
const std::string & archiveToFileAddress,
const std::string & path, uint64_t size);
/// This version will check for existence of the job in the queue before
// returns true if a new job was actually inserted.
bool addOrphanedJobPendingNsCreation(const ArchiveRequest::JobDump& job,
const std::string& archiveToFileAddress, const std::string & path,
uint64_t size);
/// This version will check for existence of the job in the queue before
// returns true if a new job was actually inserted.
bool addOrphanedJobPendingNsDeletion(const ArchiveRequest::JobDump& job,
const std::string& archiveToFileAddress,
const std::string & path, uint64_t size);
struct JobsSummary {
uint64_t files;
uint64_t bytes;
time_t oldestJobStartTime;
uint64_t priority;
};
JobsSummary getJobsSummary();
void removeJob(const std::string &archiveToFileAddress);
class JobDump {
public:
uint64_t size;
std::string address;
uint16_t copyNb;
};
std::list<JobDump> dumpJobs();
// Check that the tape pool is empty (of both tapes and jobs)
bool isEmpty();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
// Garbage collection
void garbageCollect(const std::string &presumedOwner);
std::string dump();
};
}}
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "TapeQueue.hpp"
#include "GenericObject.hpp"
#include "CreationLog.hpp"
#include <json-c/json.h>
cta::objectstore::TapeQueue::TapeQueue(const std::string& address, Backend& os):
ObjectOps<serializers::TapeQueue, serializers::TapeQueue_t>(os, address) { }
cta::objectstore::TapeQueue::TapeQueue(GenericObject& go):
ObjectOps<serializers::TapeQueue, serializers::TapeQueue_t>(go.objectStore()){
// Here we transplant the generic object into the new object
go.transplantHeader(*this);
// And interpret the header.
getPayloadFromHeader();
}
void cta::objectstore::TapeQueue::initialize(const std::string &name,
const std::string &logicallibrary, const cta::CreationLog & creationLog) {
ObjectOps<serializers::TapeQueue, serializers::TapeQueue_t>::initialize();
// Set the reguired fields
m_payload.set_oldestjobtime(0);
m_payload.set_retrievejobstotalsize(0);
m_payloadInterpreted = true;
}
bool cta::objectstore::TapeQueue::isEmpty() {
checkPayloadReadable();
return !m_payload.retrievejobs_size();
}
void cta::objectstore::TapeQueue::removeIfEmpty() {
checkPayloadWritable();
if (!isEmpty()) {
throw NotEmpty("In TapeQueue::removeIfEmpty: trying to remove an tape with retrieves queued");
}
remove();
}
std::string cta::objectstore::TapeQueue::getVid() {
checkPayloadReadable();
return m_payload.vid();
}
std::string cta::objectstore::TapeQueue::dump() {
checkPayloadReadable();
std::stringstream ret;
ret << "TapePool" << std::endl;
struct json_object * jo = json_object_new_object();
json_object_object_add(jo, "vid", json_object_new_string(m_payload.vid().c_str()));
json_object_object_add(jo, "retrievejobstotalsize", json_object_new_int64(m_payload.retrievejobstotalsize()));
json_object_object_add(jo, "oldestjobtime", json_object_new_int64(m_payload.oldestjobtime()));