Commit d7e53c4e authored by Eric Cano's avatar Eric Cano
Browse files

Added RepackQueue.

parent ee03509a
......@@ -38,6 +38,7 @@ set (CTAProtoDependants objectstore/Agent.hpp
objectstore/ObjectOps.hpp
objectstore/RepackIndex.hpp
objectstore/RepackRequest.hpp
objectstore/RepackQueue.hpp
objectstore/RetrieveRequest.hpp
objectstore/RootEntry.hpp
objectstore/SchedulerGlobalLock.hpp
......@@ -77,6 +78,7 @@ add_library (ctaobjectstore SHARED
DriveState.cpp
RepackIndex.cpp
RepackRequest.cpp
RepackQueue.cpp
RepackQueueType.cpp
BackendVFS.cpp
BackendRados.cpp
......
......@@ -21,12 +21,15 @@
#include "ArchiveQueue.hpp"
#include "AgentReference.hpp"
#include "RetrieveQueue.hpp"
#include "RepackQueue.hpp"
#include "RootEntry.hpp"
#include "DriveRegister.hpp"
#include "DriveState.hpp"
#include "RepackIndex.hpp"
#include "catalogue/Catalogue.hpp"
#include "common/exception/NonRetryableError.hpp"
#include "common/range.hpp"
#include "common/log/TimingList.hpp"
#include <random>
namespace cta { namespace objectstore {
......@@ -264,6 +267,97 @@ void Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(RetrieveQueue& retrieve
+ vid.value());
}
//------------------------------------------------------------------------------
// Helpers::getLockedAndFetchedRepackQueue()
//------------------------------------------------------------------------------
void Helpers::getLockedAndFetchedRepackQueue(RepackQueue& queue, ScopedExclusiveLock& queueLock, AgentReference& agentReference,
RepackQueueType queueType, log::LogContext& lc) {
// Try and find the repack queue.
Backend & be = queue.m_objectStore;
for (auto i: cta::range<size_t>(5)) {
utils::Timer t;
log::TimingList timings;
{
RootEntry re(be);
re.fetchNoLock();
timings.insertAndReset("rootFetchNoLockTime", t);
try {
queue.setAddress(re.getRepackQueueAddress(queueType));
} catch (cta::exception::Exception & ex) {
ScopedExclusiveLock rexl(re);
timings.insertAndReset("rootRelockExclusiveTime", t);
re.fetch();
timings.insertAndReset("rootRelockExclusiveTime", t);
queue.setAddress(re.addOrGetRepackQueueAndCommit(agentReference, queueType, lc));
timings.insertAndReset("addOrGetQueueandCommitTime", t);
rexl.release();
timings.insertAndReset("rootUnlockExclusiveTime", t);
}
}
try {
queueLock.lock(queue);
timings.insertAndReset("queueLockTime", t);
queue.fetch();
timings.insertAndReset("queueFetchTime", t);
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("queueObject", queue.getAddressIfSet());
timings.addToLog(params);
lc.log(log::INFO, "In Helpers::getLockedAndFetchedRepackQueue(): Successfully found and locked a repack queue.");
return;
} catch (cta::exception::Exception & ex) {
// We have a (rare) opportunity for a race condition, where we identify the
// queue and it gets deleted before we manage to lock it.
// The locking or fetching will fail in this case.
// We hence allow ourselves to retry a couple times.
// We also need to make sure the lock on the queue is released (it is
// an object and hence not scoped).
// We should also deal with the case where a queue was deleted but left
// referenced in the root entry. We will try to clean up if necessary.
// Failing to do this, we will spin and exhaust all of our retries.
if (i && typeid(ex) == typeid(cta::objectstore::Backend::NoSuchObject)) {
// The queue has been proven to not exist. Let's make sure we de-reference
// it form the root entry.
RootEntry re(be);
ScopedExclusiveLock rexl(re);
timings.insOrIncAndReset("rootRelockExclusiveTime", t);
re.fetch();
timings.insOrIncAndReset("rootRefetchTime", t);
try {
re.removeRepackQueueAndCommit(queueType, lc);
timings.insOrIncAndReset("rootQueueDereferenceTime", t);
log::ScopedParamContainer params(lc);
params.add("queueObject", queue.getAddressIfSet())
.add("exceptionMsg", ex.getMessageValue());
lc.log(log::INFO, "In Helpers::getLockedAndFetchedRepackQueue(): removed reference to gone repack queue from root entry.");
} catch (...) { /* Failing here is not fatal. We can get an exception if the queue was deleted in the meantime */ }
}
if (queueLock.isLocked()) {
queueLock.release();
timings.insOrIncAndReset("queueLockReleaseTime", t);
}
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("exceptionMessage", ex.getMessageValue())
.add("queueObject", queue.getAddressIfSet());
timings.addToLog(params);
lc.log(log::INFO, "In Helpers::getLockedAndFetchedRepackQueue(): failed to fetch an existing queue. Retrying.");
queue.resetAddress();
continue;
} catch (...) {
// Also release the lock if needed here.
if (queueLock.isLocked()) queueLock.release();
queue.resetAddress();
throw;
}
} // end of retry loop.
// Also release the lock if needed here.
if (queueLock.isLocked()) queueLock.release();
queue.resetAddress();
throw cta::exception::Exception(
"In OStoreDB::getLockedAndFetchedRepackQueue(): failed to find or create and lock repack queue after 5 retries");
}
//------------------------------------------------------------------------------
// Helpers::selectBestRetrieveQueue()
//------------------------------------------------------------------------------
......
......@@ -24,6 +24,7 @@
#include "catalogue/Catalogue.hpp"
#include "scheduler/OStoreDB/OStoreDB.hpp"
#include "JobQueueType.hpp"
#include "RepackQueueType.hpp"
#include <string>
#include <set>
#include <future>
......@@ -37,6 +38,7 @@ namespace cta { namespace objectstore {
class ScopedExclusiveLock;
class AgentReference;
class DriveState;
class RepackQueue;
/**
* A class with static functions allowing multi-object operations
......@@ -56,6 +58,16 @@ public:
ScopedExclusiveLock & queueLock, AgentReference & agentReference,
const cta::optional<std::string> & tapePoolOrVid, JobQueueType queueType, log::LogContext & lc);
/**
* Find or create a repack queue, and return it locked and fetched to the caller
* (Queue and ScopedExclusiveLock objects are provided empty)
* @param queue the queue object, empty
* @param queueLock the lock, not initialized
* @param agentReference the agent reference that will be needed in case of object creation
*/
static void getLockedAndFetchedRepackQueue(RepackQueue & queue,
ScopedExclusiveLock & queueLock, AgentReference & agentReference,
RepackQueueType queueType, log::LogContext & lc);
CTA_GENERATE_EXCEPTION_CLASS(NoTapeAvailableForRetrieve);
/**
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RepackQueue.hpp"
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
RepackQueue::RepackQueue(const std::string& address, Backend& os):
ObjectOps<serializers::RepackQueue, serializers::RepackQueue_t>(os, address) {}
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
RepackQueue::RepackQueue(Backend& os):
ObjectOps<serializers::RepackQueue, serializers::RepackQueue_t>(os) { }
//------------------------------------------------------------------------------
// RepackQueue::initialize()
//------------------------------------------------------------------------------
void RepackQueue::initialize() {
// Setup underlying object
ObjectOps<serializers::RepackQueue, serializers::RepackQueue_t>::initialize();
// Nothing more to do.
m_payloadInterpreted = true;
}
//------------------------------------------------------------------------------
// RepackQueue::addRequestsAndCommit()
//------------------------------------------------------------------------------
void RepackQueue::addRequestsAndCommit(const std::list<std::string>& requestAddresses, log::LogContext& lc) {
checkPayloadWritable();
// This queue does not need to be sharded
for (auto &address: requestAddresses) m_payload.add_repackrequestpointers()->set_address(address);
commit();
}
//------------------------------------------------------------------------------
// RepackQueue::addRequestsIfNecessaryAndCommit()
//------------------------------------------------------------------------------
void RepackQueue::addRequestsIfNecessaryAndCommit(const std::list<std::string>& requestAddresses, log::LogContext& lc) {
checkPayloadWritable();
std::set<std::string> existingAddresses;
for (auto &a: m_payload.repackrequestpointers()) existingAddresses.insert(a.address());
bool didAdd = false;
for (auto &a: requestAddresses)
if (!existingAddresses.count(a)) {
m_payload.add_repackrequestpointers()->set_address(a);
didAdd = true;
}
if (didAdd) commit();
}
//------------------------------------------------------------------------------
// RepackQueue::garbageCollect()
//------------------------------------------------------------------------------
void RepackQueue::garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc,
cta::catalogue::Catalogue& catalogue) {
throw exception::Exception("In RepackQueue::garbageCollect(): not implemented.");
}
//------------------------------------------------------------------------------
// RepackQueue::removeRequestsAndCommit()
//------------------------------------------------------------------------------
void RepackQueue::removeRequestsAndCommit(const std::list<std::string>& requestsAddresses) {
checkPayloadWritable();
std::set<std::string> requestsToRemove (requestsAddresses.begin(), requestsAddresses.end());
bool didRemove=false;
std::list<std::string> newQueue;
for (auto &a: m_payload.repackrequestpointers()) {
if (requestsToRemove.count(a.address())) {
didRemove=true;
} else {
newQueue.emplace_back(a.address());
}
}
if (didRemove) {
m_payload.mutable_repackrequestpointers()->Clear();
for (auto &a: newQueue) m_payload.add_repackrequestpointers()->set_address(a);
commit();
}
}
//------------------------------------------------------------------------------
// RepackQueue::getRequestsSummary()
//------------------------------------------------------------------------------
auto RepackQueue::getRequestsSummary() -> RequestsSummary {
checkPayloadReadable();
RequestsSummary ret;
ret.requests = m_payload.repackrequestpointers().size();
return ret;
}
//------------------------------------------------------------------------------
// RepackQueue::isEmpty()
//------------------------------------------------------------------------------
bool RepackQueue::isEmpty() {
checkPayloadReadable();
return m_payload.repackrequestpointers().size();
}
}} // namespace cta::objectstore.
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
namespace cta { namespace objectstore {
class GenericObject;
class RepackQueue: public ObjectOps<serializers::RepackQueue, serializers::RepackQueue_t> {
public:
// Constructor
RepackQueue(const std::string & address, Backend & os);
// Undefined object constructor
RepackQueue(Backend & os);
// Upgrader form generic object
RepackQueue(GenericObject & go);
// In memory initialiser
void initialize();
struct RequestsSummary {
uint64_t requests;
};
RequestsSummary getRequestsSummary();
void addRequestsAndCommit(const std::list<std::string> & requestAddresses, log::LogContext & lc);
void addRequestsIfNecessaryAndCommit(const std::list<std::string> & requestAddresses, log::LogContext & lc);
void removeRequestsAndCommit(const std::list<std::string> & requestsAddresses);
bool isEmpty();
void garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) override;
};
class RepackQueuePending: public RepackQueue {
public:
template<typename...Ts> RepackQueuePending(Ts&...args): RepackQueue(args...) {}
};
class RepackQueueToExpand: public RepackQueue {
public:
template<typename...Ts> RepackQueueToExpand(Ts&...args): RepackQueue(args...) {}
};
}} // namespace cta::objectstore
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "Algorithms.hpp"
#include "RepackQueue.hpp"
#include "common/make_unique.hpp"
#include "common/optional.hpp"
namespace cta { namespace objectstore {
// Partial specialisation of RepackQueue traits
template<typename C>
struct ContainerTraits<RepackQueue,C>
{
struct ContainerSummary : public RepackQueue::RequestsSummary {
std::string repackRequestAddress;
void addDeltaToLog(ContainerSummary&, log::ScopedParamContainer&);
};
typedef cta::objectstore::RepackQueueType QueueType;
struct InsertedElement {
std::unique_ptr<RepackRequest> repackRequest;
typedef std::list<InsertedElement> list;
};
typedef std::string ElementDescriptor;
struct PoppedElement {
std::unique_ptr<RepackRequest> repackRequest;
std::string vid;
serializers::RepackRequestStatus status;
};
struct PoppedElementsSummary;
struct PopCriteria {
uint64_t requests;
PopCriteria() : requests(1) {}
PopCriteria& operator-=(const PoppedElementsSummary&);
};
struct PoppedElementsSummary {
uint64_t requests;
PoppedElementsSummary(uint64_t r = 0) : requests(r) {}
bool operator< (const PopCriteria & pc) {
return requests < pc.requests;
}
PoppedElementsSummary& operator+=(const PoppedElementsSummary &other) {
requests += other.requests;
return *this;
}
void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer&);
};
struct PoppedElementsList : public std::list<PoppedElement> {
void insertBack(PoppedElementsList&&);
void insertBack(PoppedElement&&);
};
struct PoppedElementsBatch {
PoppedElementsList elements;
PoppedElementsSummary summary;
void addToLog(log::ScopedParamContainer&);
};
typedef RepackQueue Container;
typedef std::string ContainerAddress;
typedef std::string ElementAddress;
typedef cta::nullopt_t ContainerIdentifier;
typedef std::list<std::unique_ptr<InsertedElement>> ElementMemoryContainer;
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
typedef std::set<ElementAddress> ElementsToSkipSet;
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
template<typename Element>
struct OpFailure {
Element *element;
std::exception_ptr failure;
typedef std::list<OpFailure> list;
OpFailure() {}
OpFailure(Element *e, const std::exception_ptr &f) : element(e), failure(f) {}
};
struct OwnershipSwitchFailure: public cta::exception::Exception {
OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {};
typename OpFailure<InsertedElement>::list failedElements;
};
template<typename Element>
static ElementAddress getElementAddress(const Element &e) {
return e.repackRequest->getAddressIfSet();
}
static ContainerSummary getContainerSummary(Container &cont);
static void trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock,
const ContainerIdentifier &cId, log::LogContext &lc);
static void getLockedAndFetched(Container &cont, ScopedExclusiveLock &contLock, AgentReference &agRef,
const ContainerIdentifier &cId, RepackQueueType queueType, log::LogContext &lc);
static void getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock,
const ContainerIdentifier &cId, RepackQueueType queueType, log::LogContext &lc);
static void addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemCont,
AgentReference &agentRef, log::LogContext &lc);
static void addReferencesIfNecessaryAndCommit(Container &cont, typename InsertedElement::list &elemMemCont,
AgentReference &agentRef, log::LogContext &lc);
static void removeReferencesAndCommit(Container &cont, typename OpFailure<InsertedElement>::list &elementsOpFailures);
static void removeReferencesAndCommit(Container &cont, std::list<ElementAddress> &elementAddressList);
static typename OpFailure<InsertedElement>::list
switchElementsOwnership(typename InsertedElement::list &elemMemCont, const ContainerAddress &contAddress,
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc);
static typename OpFailure<PoppedElement>::list
switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const ContainerAddress &contAddress,
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc);
static PoppedElementsSummary getElementSummary(const PoppedElement &);
static PoppedElementsBatch getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
ElementsToSkipSet &elemtsToSkip, log::LogContext &lc);
static const std::string c_containerTypeName;
static const std::string c_identifierType;
private:
static void trimContainerIfNeeded(Container &cont, JobQueueType queueType, ScopedExclusiveLock &contLock,
const ContainerIdentifier &cId, log::LogContext &lc);
};
// RepackQueue partial specialisations for ContainerTraits.
//
// Add a full specialisation to override for a specific RepackQueue type.
template<typename C>
void ContainerTraits<RepackQueue,C>::ContainerSummary::
addDeltaToLog(ContainerSummary &previous, log::ScopedParamContainer &params) {
params.add("queueRequestsBefore", previous.requests)
.add("queueRequestsAfter", requests);
}
template<typename C>
auto ContainerTraits<RepackQueue,C>::PopCriteria::
operator-=(const PoppedElementsSummary &pes) -> PopCriteria& {
requests -= pes.requests;
return *this;
}
template<typename C>
void ContainerTraits<RepackQueue,C>::PoppedElementsSummary::
addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer &params) {
params.add("requestsAdded", requests - previous.requests)
.add("requestsBefore", previous.requests)
.add("requestsAfter", requests);
}
template<typename C>
void ContainerTraits<RepackQueue,C>::PoppedElementsList::
insertBack(PoppedElementsList &&insertedList) {
for (auto &e: insertedList) {
std::list<PoppedElement>::emplace_back(std::move(e));
}
}
template<typename C>
void ContainerTraits<RepackQueue,C>::PoppedElementsList::
insertBack(PoppedElement &&e) {
std::list<PoppedElement>::emplace_back(std::move(e));
}
template<typename C>
void ContainerTraits<RepackQueue,C>::PoppedElementsBatch::
addToLog(log::ScopedParamContainer &params) {
params.add("requests", summary.requests);
}
template<typename C>
void ContainerTraits<RepackQueue,C>::
trimContainerIfNeeded(Container& cont, JobQueueType queueType, ScopedExclusiveLock & contLock,
const ContainerIdentifier & cId, log::LogContext& lc)
{
// Repack queues are one per status, so we do not need to trim them.
}
template<typename C>
void ContainerTraits<RepackQueue,C>::
getLockedAndFetched(Container& cont, ScopedExclusiveLock& aqL, AgentReference& agRef,
const ContainerIdentifier& contId, RepackQueueType queueType, log::LogContext& lc)
{
Helpers::getLockedAndFetchedRepackQueue(cont, aqL, agRef, queueType, lc);
}
template<typename C>
void ContainerTraits<RepackQueue,C>::
getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, const ContainerIdentifier& cId,
RepackQueueType queueType, log::LogContext& lc)
{
// Try and get access to a queue.
size_t attemptCount = 0;
retry:
objectstore::RootEntry re(cont.m_objectStore);
re.fetchNoLock();
std::string rpkQAddress;
try {
rpkQAddress = re.getRepackQueueAddress(queueType);
} catch (RootEntry::NotAllocated &) {
throw NoSuchContainer("In ContainerTraits<RepackQueue,C>::getLockedAndFetchedNoCreate(): no such repack queue");
}
// try and lock the archive queue. Any failure from here on means the end of the getting jobs.
cont.setAddress(rpkQAddress);
//findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter);
try {
if (contLock.isLocked()) contLock.release();
contLock.lock(cont);
cont.fetch();
//lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter);
} catch (Backend::NoSuchObject & ex) {
// The queue is now absent. We can remove its reference in the root entry.
// A new queue could have been added in the mean time, and be non-empty.
// We will then fail to remove from the RootEntry (non-fatal).
ScopedExclusiveLock rexl(re);
re.fetch();
try {
re.removeRepackQueueAndCommit(queueType, lc);
log::ScopedParamContainer params(lc);
params.add("queueObject", cont.getAddressIfSet());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry");
} catch (RootEntry::RepackQueueNotEmpty & ex) {
log::ScopedParamContainer params(lc);
params.add("queueObject", cont.getAddressIfSet())
.add("Message", ex.getMessageValue());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry");
} catch (RootEntry::NoSuchRepackQueue & ex) {
// Somebody removed the queue in the mean time. Barely worth mentioning.
log::ScopedParamContainer params(lc);