Commit e21dd9f8 authored by Eric Cano's avatar Eric Cano
Browse files

Added new queue sets in RootEntry.

We now have live (to be executed), failed and to be reported queues.
Archive jobs have all three types, retrieve jobs do not have a to be reported
queue. The RootEntry interface interface now provides the choice between the types.
All uses are currently hardcoded to LiveJobs.
parent 069da5df
......@@ -785,9 +785,9 @@ void copyString(char *const dst, const size_t dstSize, const std::string &src) {
//-----------------------------------------------------------------------------
std::string getCurrentLocalTime() {
::timeval tv;
gettimeofday(&tv, nullptr);
time_t now = (time_t)tv.tv_sec;
struct tm * localNow;
::gettimeofday(&tv, nullptr);
::time_t now = (::time_t)tv.tv_sec;
struct ::tm * localNow;
::time(&now);
localNow = ::localtime(&now);
char buff[80];
......
......@@ -221,7 +221,7 @@ void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReferen
RootEntry re(m_objectStore);
ScopedSharedLock rel (re);
re.fetch();
auto tpd=re.dumpArchiveQueues();
auto tpd=re.dumpArchiveQueues(QueueType::LiveJobs);
for (auto tp=tpd.begin(); tp!=tpd.end(); tp++) {
if (tp->address == getAddressIfSet()) {
setOwner(re.getAddressIfSet());
......
......@@ -337,7 +337,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
// recreated (this will be done by helper).
ArchiveQueue aq(m_objectStore);
ScopedExclusiveLock aql;
Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), lc);
Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), QueueType::LiveJobs, lc);
queueObject=aq.getAddressIfSet();
ArchiveRequest::JobDump jd;
jd.copyNb = j->copynb();
......
......@@ -397,7 +397,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a
// Get the archive queue and add references to the jobs in it.
ArchiveQueue aq(objectStore);
ScopedExclusiveLock aql;
Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, tapepool.first, lc);
Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, tapepool.first, QueueType::LiveJobs, lc);
queueLockFetchTime = t.secs(utils::Timer::resetCounter);
auto jobsSummary=aq.getJobsSummary();
filesBefore=jobsSummary.jobs;
......@@ -568,7 +568,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent&
// Get the retrieve queue and add references to the jobs to it.
RetrieveQueue rq(objectStore);
ScopedExclusiveLock rql;
Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq,rql, agentReference, tape.first, lc);
Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq,rql, agentReference, tape.first, QueueType::LiveJobs, lc);
queueLockFetchTime = t.secs(utils::Timer::resetCounter);
auto jobsSummary=rq.getJobsSummary();
filesBefore=jobsSummary.files;
......
......@@ -325,7 +325,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
re.fetch();
std::stringstream tapePoolName;
tapePoolName << "TapePool" << i;
tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef, lc);
tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef, cta::objectstore::QueueType::LiveJobs, lc);
cta::objectstore::ArchiveQueue aq(tpAddr[i], be);
}
// Create the various ATFR's, stopping one step further each time.
......@@ -454,7 +454,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
std::list<std::string> tapePools = { "TapePool0", "TapePool1" };
for (auto & tp: tapePools) {
// Empty queue
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp), be);
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp, cta::objectstore::QueueType::LiveJobs), be);
cta::objectstore::ScopedExclusiveLock aql(aq);
aq.fetch();
std::list<std::string> ajtr;
......@@ -464,7 +464,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
aq.removeJobsAndCommit(ajtr);
aql.release();
// Remove queues from root
re.removeArchiveQueueAndCommit(tp, lc);
re.removeArchiveQueueAndCommit(tp, cta::objectstore::QueueType::LiveJobs, lc);
}
ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc));
......@@ -524,7 +524,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
re.fetch();
std::stringstream vid;
vid << "Tape" << i;
tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef);
tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef, cta::objectstore::QueueType::LiveJobs, lc);
cta::objectstore::RetrieveQueue rq(tAddr[i], be);
}
// Create the various ATFR's, stopping one step further each time.
......@@ -637,7 +637,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
std::list<std::string> retrieveQueues = { "Tape0", "Tape1" };
for (auto & vid: retrieveQueues) {
// Empty queue
cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(vid), be);
cta::objectstore::RetrieveQueue rq(re.getRetrieveQueueAddress(vid, cta::objectstore::QueueType::LiveJobs), be);
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
std::list<std::string> jtrl;
......@@ -647,7 +647,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
rq.removeJobsAndCommit(jtrl);
rql.release();
// Remove queues from root
re.removeRetrieveQueueAndCommit(vid, lc);
re.removeRetrieveQueueAndCommit(vid, cta::objectstore::QueueType::LiveJobs, lc);
}
ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc));
......
......@@ -36,7 +36,7 @@ namespace cta { namespace objectstore {
template <>
void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
ScopedExclusiveLock& archiveQueueLock, AgentReference & agentReference,
const std::string& tapePool, log::LogContext & lc) {
const std::string& tapePool, QueueType queueType, log::LogContext & lc) {
// TODO: if necessary, we could use a singleton caching object here to accelerate
// lookups.
// Getting a locked AQ is the name of the game.
......@@ -57,13 +57,13 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
re.fetchNoLock();
rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
try {
archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool));
archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool, queueType));
} catch (cta::exception::Exception & ex) {
ScopedExclusiveLock rexl(re);
rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter);
re.fetch();
rootRefetchTime = t.secs(utils::Timer::resetCounter);
archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool, agentReference, lc));
archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool, agentReference, queueType, lc));
addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter);
}
}
......@@ -107,7 +107,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
re.fetch();
rootRefetchTime += t.secs(utils::Timer::resetCounter);
try {
re.removeArchiveQueueAndCommit(tapePool, lc);
re.removeArchiveQueueAndCommit(tapePool, queueType, lc);
rootQueueDereferenceTime += t.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
......@@ -154,7 +154,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
template <>
void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQueue,
ScopedExclusiveLock& retrieveQueueLock, AgentReference& agentReference,
const std::string& vid, log::LogContext & lc) {
const std::string& vid, QueueType queueType, log::LogContext & lc) {
// TODO: if necessary, we could use a singleton caching object here to accelerate
// lookups.
// Getting a locked AQ is the name of the game.
......@@ -175,13 +175,13 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue
re.fetchNoLock();
rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
try {
retrieveQueue.setAddress(re.getRetrieveQueueAddress(vid));
retrieveQueue.setAddress(re.getRetrieveQueueAddress(vid, queueType));
} catch (cta::exception::Exception & ex) {
ScopedExclusiveLock rexl(re);
rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter);
re.fetch();
rootRefetchTime = t.secs(utils::Timer::resetCounter);
retrieveQueue.setAddress(re.addOrGetRetrieveQueueAndCommit(vid, agentReference));
retrieveQueue.setAddress(re.addOrGetRetrieveQueueAndCommit(vid, agentReference, queueType, lc));
addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter);
}
}
......@@ -225,7 +225,7 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue
re.fetch();
rootRefetchTime += t.secs(utils::Timer::resetCounter);
try {
re.removeRetrieveQueueAndCommit(vid, lc);
re.removeRetrieveQueueAndCommit(vid, queueType, lc);
rootQueueDereferenceTime += t.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
......@@ -413,7 +413,7 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueS
continue;
std::string rqAddr;
try {
std::string rqAddr = re.getRetrieveQueueAddress(tf.second.vid);
std::string rqAddr = re.getRetrieveQueueAddress(tf.second.vid, QueueType::LiveJobs);
} catch (cta::exception::Exception &) {
ret.push_back(SchedulerDatabase::RetrieveQueueStatistics());
ret.back().vid=tf.second.vid;
......
......@@ -23,6 +23,7 @@
#include "common/threading/MutexLocker.hpp"
#include "catalogue/Catalogue.hpp"
#include "scheduler/OStoreDB/OStoreDB.hpp"
#include "QueueType.hpp"
#include <string>
#include <set>
#include <future>
......@@ -55,7 +56,7 @@ public:
template <class Queue>
static void getLockedAndFetchedQueue(Queue & queue,
ScopedExclusiveLock & queueLock, AgentReference & agentReference,
const std::string & tapePoolOrVid, log::LogContext & lc);
const std::string & tapePoolOrVid, QueueType queueType, log::LogContext & lc);
CTA_GENERATE_EXCEPTION_CLASS(NoTapeAvailableForRetrieve);
/**
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
namespace cta { namespace objectstore {
enum class QueueType { LiveJobs, FailedJobs, JobsToReport };
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -135,7 +135,7 @@ jobFound:;
// We now need to grab the queue a requeue the request.
RetrieveQueue rq(m_objectStore);
ScopedExclusiveLock rql;
Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, lc);
Helpers::getLockedAndFetchedQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, QueueType::LiveJobs, lc);
// Enqueue add the job to the queue
objectstore::MountPolicySerDeser mp;
mp.deserialize(m_payload.mountpolicy());
......
......@@ -67,10 +67,14 @@ bool RootEntry::isEmpty() {
if (m_payload.has_schedulerlockpointer() &&
m_payload.schedulerlockpointer().address().size())
return false;
if (m_payload.archivequeuepointers().size())
return false;
if (m_payload.retrievequeuepointers().size())
return false;
for (auto &qt: {QueueType::LiveJobs, QueueType::FailedJobs, QueueType::JobsToReport}) {
if (archiveQueuePointers(qt).size())
return false;
}
for (auto &qt: {QueueType::LiveJobs, QueueType::FailedJobs}) {
if (retrieveQueuePointers(qt).size())
return false;
}
return true;
}
......@@ -91,6 +95,58 @@ void RootEntry::garbageCollect(const std::string& presumedOwner, AgentReference
throw ForbiddenOperation("In RootEntry::garbageCollect(): RootEntry cannot be garbage collected");
}
// =============================================================================
// ========== Queue types and helper functions =================================
// =============================================================================
const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>& RootEntry::archiveQueuePointers(QueueType queueType) {
switch(queueType) {
case QueueType::LiveJobs:
return m_payload.livearchivejobsqueuepointers();
case QueueType::JobsToReport:
return m_payload.archivejobstoreportqueuepointers();
case QueueType::FailedJobs:
return m_payload.failedarchivejobsqueuepointers();
default:
throw cta::exception::Exception("In RootEntry::archiveQueuePointers(): unknown queue type.");
}
}
::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>* RootEntry::mutableArchiveQueuePointers(QueueType queueType) {
switch(queueType) {
case QueueType::LiveJobs:
return m_payload.mutable_livearchivejobsqueuepointers();
case QueueType::JobsToReport:
return m_payload.mutable_archivejobstoreportqueuepointers();
case QueueType::FailedJobs:
return m_payload.mutable_failedarchivejobsqueuepointers();
default:
throw cta::exception::Exception("In RootEntry::mutableArchiveQueuePointers(): unknown queue type.");
}
}
const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>& RootEntry::retrieveQueuePointers(QueueType queueType) {
switch(queueType) {
case QueueType::LiveJobs:
return m_payload.liveretrievejobsqueuepointers();
case QueueType::FailedJobs:
return m_payload.failedretrievejobsqueuepointers();
default:
throw cta::exception::Exception("In RootEntry::retrieveQueuePointers(): unknown queue type.");
}
}
::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>* RootEntry::mutableRetrieveQueuePointers(QueueType queueType) {
switch(queueType) {
case QueueType::LiveJobs:
return m_payload.mutable_liveretrievejobsqueuepointers();
case QueueType::FailedJobs:
return m_payload.mutable_failedretrievejobsqueuepointers();
default:
throw cta::exception::Exception("In RootEntry::mutableRetrieveQueuePointers(): unknown queue type.");
}
}
// =============================================================================
// ========== Archive queues manipulations =====================================
// =============================================================================
......@@ -105,11 +161,14 @@ namespace {
}
}
std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, AgentReference& agentRef, log::LogContext & lc) {
std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, AgentReference& agentRef,
QueueType queueType, log::LogContext & lc) {
checkPayloadWritable();
// Check the archive queue does not already exist
try {
return serializers::findElement(m_payload.archivequeuepointers(), tapePool).address();
return serializers::findElement(archiveQueuePointers(queueType), tapePool).address();
} catch (serializers::NotFound &) {}
// Insert the archive queue, then its pointer, with agent intent log update
// First generate the intent. We expect the agent to be passed locked.
......@@ -123,7 +182,7 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool
aq.insert();
ScopedExclusiveLock tpl(aq);
// Now move the tape pool's ownership to the root entry
auto * tpp = m_payload.mutable_archivequeuepointers()->Add();
auto * tpp = mutableArchiveQueuePointers(queueType)->Add();
tpp->set_address(archiveQueueAddress);
tpp->set_name(tapePool);
// We must commit here to ensure the tape pool object is referenced.
......@@ -137,11 +196,11 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool
return archiveQueueAddress;
}
void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool, log::LogContext & lc) {
void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool, QueueType queueType, log::LogContext & lc) {
checkPayloadWritable();
// find the address of the archive queue object
try {
auto aqp = serializers::findElement(m_payload.archivequeuepointers(), tapePool);
auto aqp = serializers::findElement(archiveQueuePointers(queueType), tapePool);
// Open the tape pool object
ArchiveQueue aq (aqp.address(), m_objectStore);
ScopedExclusiveLock aql;
......@@ -181,7 +240,7 @@ void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool, log::Lo
}
deleteFromRootEntry:
// ... and remove it from our entry
serializers::removeOccurences(m_payload.mutable_archivequeuepointers(), tapePool);
serializers::removeOccurences(mutableArchiveQueuePointers(queueType), tapePool);
// We commit for safety and symmetry with the add operation
commit();
{
......@@ -195,24 +254,24 @@ void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool, log::Lo
}
}
void RootEntry::removeMissingArchiveQueueReference(const std::string& tapePool) {
serializers::removeOccurences(m_payload.mutable_archivequeuepointers(), tapePool);
void RootEntry::removeMissingArchiveQueueReference(const std::string& tapePool, QueueType queueType) {
serializers::removeOccurences(mutableArchiveQueuePointers(queueType), tapePool);
}
std::string RootEntry::getArchiveQueueAddress(const std::string& tapePool) {
std::string RootEntry::getArchiveQueueAddress(const std::string& tapePool, QueueType queueType) {
checkPayloadReadable();
try {
auto & tpp = serializers::findElement(m_payload.archivequeuepointers(), tapePool);
auto & tpp = serializers::findElement(archiveQueuePointers(queueType), tapePool);
return tpp.address();
} catch (serializers::NotFound &) {
throw NoSuchArchiveQueue("In RootEntry::getArchiveQueueAddress: archive queue not allocated");
}
}
auto RootEntry::dumpArchiveQueues() -> std::list<ArchiveQueueDump> {
auto RootEntry::dumpArchiveQueues(QueueType queueType) -> std::list<ArchiveQueueDump> {
checkPayloadReadable();
std::list<ArchiveQueueDump> ret;
auto & tpl = m_payload.archivequeuepointers();
auto & tpl = archiveQueuePointers(queueType);
for (auto i = tpl.begin(); i!=tpl.end(); i++) {
ret.push_back(ArchiveQueueDump());
ret.back().address = i->address();
......@@ -234,11 +293,12 @@ namespace {
}
}
std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, AgentReference& agentRef) {
std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, AgentReference& agentRef,
QueueType queueType, log::LogContext & lc) {
checkPayloadWritable();
// Check the retrieve queue does not already exist
try {
return serializers::findElement(m_payload.retrievequeuepointers(), vid).address();
return serializers::findElement(retrieveQueuePointers(queueType), vid).address();
} catch (serializers::NotFound &) {}
// Insert the retrieve queue, then its pointer, with agent intent log update
// First generate the intent. We expect the agent to be passed locked.
......@@ -253,7 +313,7 @@ std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Ag
rq.insert();
ScopedExclusiveLock tpl(rq);
// Now move the tape pool's ownership to the root entry
auto * rqp = m_payload.mutable_retrievequeuepointers()->Add();
auto * rqp = mutableRetrieveQueuePointers(queueType)->Add();
rqp->set_address(retrieveQueueAddress);
rqp->set_vid(vid);
// We must commit here to ensure the tape pool object is referenced.
......@@ -267,15 +327,15 @@ std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Ag
return retrieveQueueAddress;
}
void RootEntry::removeMissingRetrieveQueueReference(const std::string& vid) {
serializers::removeOccurences(m_payload.mutable_retrievequeuepointers(), vid);
void RootEntry::removeMissingRetrieveQueueReference(const std::string& vid, QueueType queueType) {
serializers::removeOccurences(mutableRetrieveQueuePointers(queueType), vid);
}
void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid, log::LogContext & lc) {
void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid, QueueType queueType, log::LogContext & lc) {
checkPayloadWritable();
// find the address of the retrieve queue object
try {
auto rqp=serializers::findElement(m_payload.retrievequeuepointers(), vid);
auto rqp=serializers::findElement(retrieveQueuePointers(queueType), vid);
// Open the retrieve queue object
RetrieveQueue rq(rqp.address(), m_objectStore);
ScopedExclusiveLock rql;
......@@ -315,7 +375,7 @@ void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid, log::LogCon
}
deleteFromRootEntry:
// ... and remove it from our entry
serializers::removeOccurences(m_payload.mutable_retrievequeuepointers(), vid);
serializers::removeOccurences(mutableRetrieveQueuePointers(queueType), vid);
// We commit for safety and symmetry with the add operation
commit();
{
......@@ -330,20 +390,20 @@ void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid, log::LogCon
}
std::string RootEntry::getRetrieveQueueAddress(const std::string& vid) {
std::string RootEntry::getRetrieveQueueAddress(const std::string& vid, QueueType queueType) {
checkPayloadReadable();
try {
auto & rqp = serializers::findElement(m_payload.retrievequeuepointers(), vid);
auto & rqp = serializers::findElement(retrieveQueuePointers(queueType), vid);
return rqp.address();
} catch (serializers::NotFound &) {
throw NoSuchRetrieveQueue("In RootEntry::getRetreveQueueAddress: retrieve queue not allocated");
}
}
auto RootEntry::dumpRetrieveQueues() -> std::list<RetrieveQueueDump> {
auto RootEntry::dumpRetrieveQueues(QueueType queueType) -> std::list<RetrieveQueueDump> {
checkPayloadReadable();
std::list<RetrieveQueueDump> ret;
auto & tpl = m_payload.retrievequeuepointers();
auto & tpl = retrieveQueuePointers(queueType);
for (auto i = tpl.begin(); i!=tpl.end(); i++) {
ret.push_back(RetrieveQueueDump());
ret.back().address = i->address();
......
......@@ -20,6 +20,7 @@
#include "objectstore/cta.pb.h"
#include "QueueType.hpp"
#include "Backend.hpp"
#include "ObjectOps.hpp"
#include "EntryLogSerDeser.hpp"
......@@ -58,40 +59,55 @@ public:
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
// Queue types and helper functions ==========================================
private:
const ::google::protobuf::RepeatedPtrField< ::cta::objectstore::serializers::ArchiveQueuePointer >&
archiveQueuePointers(QueueType queueType);
::google::protobuf::RepeatedPtrField< ::cta::objectstore::serializers::ArchiveQueuePointer >*
mutableArchiveQueuePointers(QueueType queueType);
const ::google::protobuf::RepeatedPtrField< ::cta::objectstore::serializers::RetrieveQueuePointer >&
retrieveQueuePointers(QueueType queueType);
::google::protobuf::RepeatedPtrField< ::cta::objectstore::serializers::RetrieveQueuePointer >*
mutableRetrieveQueuePointers(QueueType queueType);
public:
// ArchiveQueue handling ====================================================
CTA_GENERATE_EXCEPTION_CLASS(ArchiveQueueNotEmpty);
CTA_GENERATE_EXCEPTION_CLASS(WrongArchiveQueue);
/** This function implicitly creates the archive queue structure and updates
* the pointer to it. It will implicitly commit the object to the store. */
std::string addOrGetArchiveQueueAndCommit(const std::string & tapePool, AgentReference & agentRef, log::LogContext & lc);
std::string addOrGetArchiveQueueAndCommit(const std::string & tapePool, AgentReference & agentRef,
QueueType queueType, log::LogContext & lc);
/** This function implicitly deletes the tape pool structure.
* Fails if it not empty*/
CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveQueue);
void removeArchiveQueueAndCommit(const std::string & tapePool, log::LogContext & lc);
void removeArchiveQueueAndCommit(const std::string & tapePool, QueueType queueType, log::LogContext & lc);
/** This function is used in a cleanup utility. Removes unconditionally the reference to the archive queue */
void removeMissingArchiveQueueReference(const std::string & tapePool);
std::string getArchiveQueueAddress(const std::string & tapePool);
void removeMissingArchiveQueueReference(const std::string & tapePool, QueueType queueType);
void removeArchiveQueueIfAddressMatchesAndCommit(const std::string & tapePool, const std::string & archiveQueueAddress, QueueType queueType);
std::string getArchiveQueueAddress(const std::string & tapePool, QueueType queueType);
struct ArchiveQueueDump {
std::string tapePool;
std::string address;
};
std::list<ArchiveQueueDump> dumpArchiveQueues();
std::list<ArchiveQueueDump> dumpArchiveQueues(QueueType queueType);
// RetrieveQueue handling ====================================================
CTA_GENERATE_EXCEPTION_CLASS(RetrieveQueueNotEmpty);
/** This function implicitly creates the retrieve queue structure and updates
* the pointer to it. It will implicitly commit the object to the store. */
std::string addOrGetRetrieveQueueAndCommit(const std::string & vid, AgentReference & agentRef);
std::string addOrGetRetrieveQueueAndCommit(const std::string & vid, AgentReference & agentRef,
QueueType queueType, log::LogContext & lc);
/** This function is used in a cleanup utility. Removes unconditionally the reference to the retrieve queue */
void removeMissingRetrieveQueueReference(const std::string & address);
void removeMissingRetrieveQueueReference(const std::string & address, QueueType queueType);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchRetrieveQueue);
void removeRetrieveQueueAndCommit(const std::string & vid, log::LogContext & lc);
std::string getRetrieveQueueAddress(const std::string & vid);
void removeRetrieveQueueAndCommit(const std::string & vid, QueueType queueType, log::LogContext & lc);
std::string getRetrieveQueueAddress(const std::string & vid, QueueType queueType);
struct RetrieveQueueDump {
std::string vid;
std::string address;
};
std::list<RetrieveQueueDump> dumpRetrieveQueues();
std::list<RetrieveQueueDump> dumpRetrieveQueues(QueueType queueType);
// Drive register manipulations ==============================================
CTA_GENERATE_EXCEPTION_CLASS(DriveRegisterNotEmpty);
......
......@@ -97,9 +97,9 @@ TEST (ObjectStore, RootEntryArchiveQueues) {
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
ASSERT_THROW(re.getArchiveQueueAddress("tapePool1"),
ASSERT_THROW(re.getArchiveQueueAddress("tapePool1", cta::objectstore::QueueType::LiveJobs),
cta::objectstore::RootEntry::NoSuchArchiveQueue);
tpAddr1 = re.addOrGetArchiveQueueAndCommit("tapePool1", agr, lc);
tpAddr1 = re.addOrGetArchiveQueueAndCommit("tapePool1", agr, cta::objectstore::QueueType::LiveJobs, lc);
// Check that we car read it
cta::objectstore::ArchiveQueue aq(tpAddr1, be);
cta::objectstore::ScopedSharedLock aql(aq);
......@@ -110,7 +110,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) {
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
tpAddr2 = re.addOrGetArchiveQueueAndCommit("tapePool2", agr, lc);
tpAddr2 = re.addOrGetArchiveQueueAndCommit("tapePool2", agr, cta::objectstore::QueueType::LiveJobs, lc);
ASSERT_TRUE(be.exists(tpAddr2));
}
{
......@@ -118,7 +118,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) {
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
re.removeArchiveQueueAndCommit("tapePool2", lc);
re.removeArchiveQueueAndCommit("tapePool2", cta::objectstore::QueueType::LiveJobs, lc);
ASSERT_FALSE(be.exists(tpAddr2));
}
// Unregister the agent
......@@ -129,7 +129,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) {
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
re.removeAgentRegisterAndCommit(lc);
re.removeArchiveQueueAndCommit("tapePool1", lc);
re.removeArchiveQueueAndCommit("tapePool1", cta::objectstore::QueueType::LiveJobs, lc);
ASSERT_FALSE(be.exists(tpAddr1));
re.removeIfEmpty(lc);
ASSERT_FALSE(re.exists());
......
......@@ -58,14 +58,14 @@ int main(int argc, char ** argv) {
cta::objectstore::ScopedExclusiveLock rel(re);
re.fetch();
std::list<std::string> missingArchiveQueues, missingRetrieveQueues;
for (auto & aq: re.dumpArchiveQueues()) {
for (auto & aq: re.dumpArchiveQueues(cta::objectstore::QueueType::LiveJobs)) {
if (!be->exists(aq.address)) {
missingArchiveQueues.emplace_back(aq.tapePool);
std::cout << "The archive queue for tape pool " << aq.tapePool << " at address " << aq.address
<< " is missing and will be dereferenced." << std::endl;
}