Commit bc52524e authored by Eric Cano's avatar Eric Cano
Browse files

Implemented promotion of repack requests from Pending to ToExpand

This promotion is controlled so that only a limited number a requests
are in the
state ToExpand or Starting at any point in time. This ensures both the
availabality
of repack file requests to system while preventing an explosion of file
level requests.

Created a one-round popping from the container (algorithms) with status
switching.
  - Used for repack requests switching from pendig to to expand

Added ElementStatus to algorithms.

Implemented promotion interface in Scheduler and OstoreDb. The actual
decision is taken at
the Scheduler level. The function itself is called by the
RepackRequestManager.

Promotion is tested in a unit test.

Various code maintenance:
Switched to "using"-based constructor inheritance.
Fixed privacy of function in cta::range.
parent b6ae9b50
......@@ -33,7 +33,8 @@ struct RepackInfo {
enum class Type {
ExpandAndRepack,
ExpandOnly,
RepackOnly
RepackOnly,
Undefined
} type;
enum class Status {
// Those values are matching the cta.proto values
......@@ -44,7 +45,8 @@ struct RepackInfo {
Aborting = 5,
Aborted = 6,
Complete = 7,
Failed = 8
Failed = 8,
Undefined = 999
} status;
// std::string tag;
// uint64_t totalFiles;
......
......@@ -38,6 +38,7 @@ public:
T operator*() { if (m_val == m_limit) throw DereferencingPastEnd("In range::operator*(): dereferencing out of bounds"); return m_val; }
iterator & operator++() { doInc(); return *this; }
iterator operator++(int) { iterator ret(*this); doInc(); return ret; }
private:
void doInc() {
switch (m_dir) {
// Increment/decrement variable, preventing over/underflow, and overshooting the limit.
......@@ -64,6 +65,7 @@ public:
break;
}
}
public:
bool operator==(const iterator& other) { checkIteratorsComparable(other); return m_val == other.m_val; }
bool operator!=(const iterator& other) { checkIteratorsComparable(other); return m_val != other.m_val; }
CTA_GENERATE_EXCEPTION_CLASS(NonComparableIterators);
......
......@@ -40,8 +40,10 @@ public:
typedef typename ContainerTraits<Q,C>::InsertedElement InsertedElement;
typedef typename ContainerTraits<Q,C>::PopCriteria PopCriteria;
typedef typename ContainerTraits<Q,C>::OwnershipSwitchFailure OwnershipSwitchFailure;
typedef typename ContainerTraits<Q,C>::PoppedElementsBatch PoppedElementsBatch;
/** Reference objects in the container and then switch their ownership. Objects
/**
* Reference objects in the container and then switch their ownership. Objects
* are provided existing and owned by algorithm's agent.
*/
void referenceAndSwitchOwnership(const typename ContainerTraits<Q,C>::ContainerIdentifier & contId,
......@@ -98,7 +100,7 @@ public:
}
/**
* Reference objects in the container if needed and then switch their ownership (if needed).
* Reference objects in the container and then switch their ownership (if needed).
*
* Objects are expected to be owned by an agent and not listed in the container, but situations
* might vary. This function is typically used by the garbage collector. We do not take care of
......@@ -152,14 +154,154 @@ public:
}
}
typename ContainerTraits<Q,C>::PoppedElementsBatch popNextBatch(
/**
* Do a single round of popping from the pre-locked container.
* @param cont
* @param contLock
* @param popCriteria
* @param newStatus: optional new status, for pop-and-switch-status combined operation.
* @param lc
* @return
*/
PoppedElementsBatch popNextBatchFromContainerAndSwitchStatus(
C &cont,
ScopedExclusiveLock & contLock,
PopCriteria &popCriteria,
cta::optional<typename ContainerTraits<Q,C>::ElementStatus> &newStatus,
log::LogContext &lc) {
PoppedElementsBatch ret;
auto previousSummary = ret.summary;
log::TimingList timingList;
utils::Timer t, totalTime;
typename ContainerTraits<Q,C>::ContainerSummary contSummaryBefore, contSummaryAfter;
try {
cont.fetch();
} catch (exception::Exception &) {
timingList.insertAndReset("fetchQueueTime", t);
// Failing to access the container will be logged
goto logAndReturn;
}
timingList.insertAndReset("fetchQueueTime", t);
contSummaryBefore = ContainerTraits<Q,C>::getContainerSummary(cont);
{
// We have a container. Get candidate element list from it.
typename ContainerTraits<Q,C>::ElementsToSkipSet emptyElementsToSkip;
PoppedElementsBatch candidateElements =
ContainerTraits<Q,C>::getPoppingElementsCandidates(cont, popCriteria,
emptyElementsToSkip, lc);
timingList.insertAndReset("jobSelectionTime", t);
// Reference the candidates to our agent
std::list<typename ContainerTraits<Q,C>::ElementAddress> candidateElementsAddresses;
for (auto & e: candidateElements.elements) {
candidateElementsAddresses.emplace_back(ContainerTraits<Q,C>::getElementAddress(e));
}
timingList.insertAndReset("ownershipAdditionTime", t);
m_agentReference.addBatchToOwnership(candidateElementsAddresses, m_backend);
// We can now attempt to switch ownership of elements
auto failedOwnershipSwitchElements = ContainerTraits<Q,C>::switchElementsOwnershipAndStatus(candidateElements,
m_agentReference.getAgentAddress(),
cont.getAddressIfSet(), timingList, t, lc, newStatus);
if (failedOwnershipSwitchElements.empty()) {
timingList.insertAndReset("updateResultProcessingTime", t);
// This is the easy case (and most common case). Everything went through fine.
ContainerTraits<Q,C>::removeReferencesAndCommit(cont, candidateElementsAddresses);
timingList.insertAndReset("containerUpdateTime", t);
contSummaryAfter = ContainerTraits<Q,C>::getContainerSummary(cont);
// We skip the container trimming as we do not have the contId.
// trimming might release the lock
if (contLock.isLocked()) contLock.release();
timingList.insertAndReset("containerUnlockTime", t);
// All jobs are validated
ret.summary += candidateElements.summary;
ret.elements.insertBack(std::move(candidateElements.elements));
timingList.insertAndReset("structureProcessingTime", t);
} else {
// For the failed files, we have to differentiate the not owned or not existing ones from other error cases.
// For the not owned, not existing and those successfully switched, we have to de reference them form the container.
// For other cases, we will leave the elements referenced in the container, as we cannot ensure de-referencing is safe.
std::set<typename ContainerTraits<Q,C>::ElementAddress> elementsNotToDereferenceFromContainer;
std::set<typename ContainerTraits<Q,C>::ElementAddress> elementsNotToReport;
std::list<typename ContainerTraits<Q,C>::ElementAddress> elementsToDereferenceFromAgent;
for (auto &e: failedOwnershipSwitchElements) {
try {
std::rethrow_exception(e.failure);
} catch (Backend::NoSuchObject &) {
elementsToDereferenceFromAgent.push_back(ContainerTraits<Q,C>::getElementAddress(*e.element));
elementsNotToReport.insert(ContainerTraits<Q,C>::getElementAddress(*e.element));
} catch (Backend::WrongPreviousOwner &) {
elementsToDereferenceFromAgent.push_back(ContainerTraits<Q,C>::getElementAddress(*e.element));
elementsNotToReport.insert(ContainerTraits<Q,C>::getElementAddress(*e.element));
} catch (Backend::CouldNotUnlock&) {
// Do nothing, this element was indeed OK.
}
catch (...) {
// This is a different error, so we will leave the reference to the element in the container
elementsNotToDereferenceFromContainer.insert(ContainerTraits<Q,C>::getElementAddress(*e.element));
elementsToDereferenceFromAgent.push_back(ContainerTraits<Q,C>::getElementAddress(*e.element));
elementsNotToReport.insert(ContainerTraits<Q,C>::getElementAddress(*e.element));
}
}
// We are done with the sorting. Apply the decisions...
std::list<typename ContainerTraits<Q,C>::ElementAddress> elementsToDereferenceFromContainer;
for (auto & e: candidateElements.elements) {
if (!elementsNotToDereferenceFromContainer.count(ContainerTraits<Q,C>::getElementAddress(e))) {
elementsToDereferenceFromContainer.push_back(ContainerTraits<Q,C>::getElementAddress(e));
}
}
timingList.insertAndReset("updateResultProcessingTime", t);
ContainerTraits<Q,C>::removeReferencesAndCommit(cont, elementsToDereferenceFromContainer);
timingList.insertAndReset("containerUpdateTime", t);
contSummaryAfter = ContainerTraits<Q,C>::getContainerSummary(cont);
if (contLock.isLocked()) contLock.release();
timingList.insertAndReset("containerUnlockTime", t);
m_agentReference.removeBatchFromOwnership(elementsToDereferenceFromAgent, m_backend);
for (auto & e: candidateElements.elements) {
if (!elementsNotToReport.count(ContainerTraits<Q,C>::getElementAddress(e))) {
ret.summary += ContainerTraits<Q,C>::getElementSummary(e);
ret.elements.insertBack(std::move(e));
}
}
timingList.insertAndReset("structureProcessingTime", t);
}
}
{
log::ScopedParamContainer params(lc);
params.add("C", ContainerTraits<Q,C>::c_containerTypeName)
.add("containerAddress", cont.getAddressIfSet());
ret.summary.addDeltaToLog(previousSummary, params);
contSummaryAfter.addDeltaToLog(contSummaryBefore, params);
timingList.addToLog(params);
lc.log(log::INFO, "In ContainerTraits<Q,C>::popNextBatchFromContainer(): did one round of elements retrieval.");
}
logAndReturn:
{
log::ScopedParamContainer params(lc);
params.add("C", ContainerTraits<Q,C>::c_containerTypeName);
ret.addToLog(params);
timingList.addToLog(params);
params.add("schedulerDbTime", totalTime.secs());
lc.log(log::INFO, "In ContainerTraits<Q,C>::popNextBatchFromContainer(): elements retrieval complete.");
}
return ret;
}
/**
* Loop popping from a container until coming empty handed or fulfilling the criteria.
* The popping can take several iterations. The container is re-found on each round.
* @param contId container identifier (typically a string like vid, tape pool...)
* @param queueType container type (usually represents the steps in the requests lifecycle (ToTranfer, FailedToReport, Failed...)
* @param popCriteria
* @param lc
* @return
*/
PoppedElementsBatch popNextBatch(
const typename ContainerTraits<Q,C>::ContainerIdentifier &contId,
JobQueueType queueType,
typename ContainerTraits<Q,C>::PopCriteria &popCriteria,
log::LogContext &lc)
{
// Prepare the return value
typename ContainerTraits<Q,C>::PoppedElementsBatch ret;
PoppedElementsBatch ret;
typename ContainerTraits<Q,C>::PopCriteria unfulfilledCriteria = popCriteria;
size_t iterationCount=0;
typename ContainerTraits<Q,C>::ElementsToSkipSet elementsToSkip;
......@@ -185,7 +327,7 @@ public:
typename ContainerTraits<Q,C>::ContainerSummary contSummaryBefore, contSummaryAfter;
contSummaryBefore = ContainerTraits<Q,C>::getContainerSummary(cont);
// We have a container. Get candidate element list from it.
typename ContainerTraits<Q,C>::PoppedElementsBatch candidateElements =
PoppedElementsBatch candidateElements =
ContainerTraits<Q,C>::getPoppingElementsCandidates(cont, unfulfilledCriteria, elementsToSkip, lc);
localTimingList.insertAndReset("jobSelectionTime", t);
// Reference the candidates to our agent
......@@ -196,7 +338,8 @@ public:
localTimingList.insertAndReset("ownershipAdditionTime", t);
m_agentReference.addBatchToOwnership(candidateElementsAddresses, m_backend);
// We can now attempt to switch ownership of elements
auto failedOwnershipSwitchElements = ContainerTraits<Q,C>::switchElementsOwnership(candidateElements, m_agentReference.getAgentAddress(), cont.getAddressIfSet(), localTimingList, t, lc);
auto failedOwnershipSwitchElements = ContainerTraits<Q,C>::switchElementsOwnership(candidateElements, m_agentReference.getAgentAddress(),
cont.getAddressIfSet(), localTimingList, t, lc);
if (failedOwnershipSwitchElements.empty()) {
localTimingList.insertAndReset("updateResultProcessingTime", t);
// This is the easy case (and most common case). Everything went through fine.
......@@ -277,7 +420,7 @@ public:
ret.summary.addDeltaToLog(previousSummary, params);
contSummaryAfter.addDeltaToLog(contSummaryBefore, params);
localTimingList.addToLog(params);
lc.log(log::INFO, "In ContainerTraits<Q,C>::PoppedElementsBatch(): did one round of elements retrieval.");
lc.log(log::INFO, "In Algorithms::popNextBatch(): did one round of elements retrieval.");
timingList+=localTimingList;
}
logAndReturn:
......@@ -289,7 +432,7 @@ public:
timingList.addToLog(params);
params.add("schedulerDbTime", totalTime.secs());
params.add("iterationCount", iterationCount);
lc.log(log::INFO, "In ContainerTraits<Q,C>::PoppedElementsBatch(): elements retrieval complete.");
lc.log(log::INFO, "In Algorithms::popNextBatch(): elements retrieval complete.");
}
return ret;
}
......
......@@ -139,18 +139,15 @@ public:
};
class ArchiveQueueToTransfer: public ArchiveQueue {
public:
template<typename...Ts> ArchiveQueueToTransfer(Ts&...args): ArchiveQueue(args...) {}
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToReport: public ArchiveQueue {
public:
template<typename...Ts> ArchiveQueueToReport(Ts&...args): ArchiveQueue(args...) {}
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueFailed: public ArchiveQueue {
public:
template<typename...Ts> ArchiveQueueFailed(Ts&...args): ArchiveQueue(args...) {}
using ArchiveQueue::ArchiveQueue;
};
}}
......@@ -92,6 +92,7 @@ struct ContainerTraits<ArchiveQueue,C>
typedef std::list<std::unique_ptr<InsertedElement>> ElementMemoryContainer;
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
typedef std::set<ElementAddress> ElementsToSkipSet;
typedef serializers::ArchiveJobStatus ElementStatus;
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
......@@ -272,19 +273,19 @@ getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, cons
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry");
lc.log(log::INFO, "In ContainerTraits<ArchiveQueue,C>::getLockedAndFetchedNoCreate(): de-referenced missing queue from root entry");
} catch (RootEntry::ArchiveQueueNotEmpty & ex) {
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet())
.add("Message", ex.getMessageValue());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry");
lc.log(log::INFO, "In ContainerTraits<ArchiveQueue,C>::getLockedAndFetchedNoCreate(): could not de-referenced missing queue from root entry");
} catch (RootEntry::NoSuchArchiveQueue & ex) {
// Somebody removed the queue in the mean time. Barely worth mentioning.
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::DEBUG, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry: already done.");
lc.log(log::DEBUG, "In ContainerTraits<ArchiveQueue,C>::getLockedAndFetchedNoCreate(): could not de-referenced missing queue from root entry: already done.");
}
//emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter);
attemptCount++;
......
......@@ -79,6 +79,8 @@ add_library (ctaobjectstore SHARED
RepackIndex.cpp
RepackRequest.cpp
RepackQueue.cpp
RepackQueuePendingAlgorithms.cpp
RepackQueueToExpandAlgorithms.cpp
RepackQueueType.cpp
BackendVFS.cpp
BackendRados.cpp
......
......@@ -43,7 +43,9 @@ struct RetrieveQueue;
struct RetrieveQueueToTransfer;
struct RetrieveQueueToReport;
struct RetrieveQueueFailed;
struct RepackQueue;
struct RepackQueuePending;
struct RepackQueueToExpand;
class ObjectOpsBase {
friend class ScopedLock;
friend class ScopedSharedLock;
......@@ -56,6 +58,8 @@ class ObjectOpsBase {
friend ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>;
friend ContainerTraits<RetrieveQueue,RetrieveQueueToReport>;
friend ContainerTraits<RetrieveQueue,RetrieveQueueFailed>;
friend ContainerTraits<RepackQueue,RepackQueuePending>;
friend ContainerTraits<RepackQueue,RepackQueueToExpand>;
protected:
ObjectOpsBase(Backend & os): m_nameSet(false), m_objectStore(os),
m_headerInterpreted(false), m_payloadInterpreted(false),
......
......@@ -18,6 +18,7 @@
#include "RepackQueue.hpp"
#include "GenericObject.hpp"
#include "common/range.hpp"
#include <google/protobuf/util/json_util.h>
namespace cta { namespace objectstore {
......@@ -121,6 +122,25 @@ auto RepackQueue::getRequestsSummary() -> RequestsSummary {
return ret;
}
//------------------------------------------------------------------------------
// RepackQueue::getCandidateList()
//------------------------------------------------------------------------------
auto RepackQueue::getCandidateList(uint64_t maxRequests, std::set<std::string> repackRequestsToSkip)
-> CandidateJobList {
checkPayloadReadable();
CandidateJobList ret;
auto repreq = m_payload.repackrequestpointers().begin();
for (auto __attribute__((unused)) i: cta::range<uint64_t>(maxRequests)) {
if (repreq == m_payload.repackrequestpointers().end()) break;
ret.candidates.push_back(RequestDump());
ret.candidates.back().address = repreq->address();
++repreq;
}
ret.candidateRequests = ret.candidates.size();
ret.remainingRequestsAfterCandidates = m_payload.repackrequestpointers_size() - ret.candidateRequests;
return ret;
}
//------------------------------------------------------------------------------
// RepackQueue::isEmpty()
//------------------------------------------------------------------------------
......
......@@ -49,6 +49,20 @@ public:
void addRequestsIfNecessaryAndCommit(const std::list<std::string> & requestAddresses, log::LogContext & lc);
struct RequestDump {
std::string address;
};
struct CandidateJobList {
uint64_t remainingRequestsAfterCandidates = 0;
uint64_t candidateRequests = 0;
std::list<RequestDump> candidates;
};
// The set of repack requests to skip are requests previously identified by the caller as bad,
// which still should be removed from the queue. They will be disregarded from listing.
CandidateJobList getCandidateList(uint64_t maxRequests, std::set<std::string> repackRequestsToSkip);
void removeRequestsAndCommit(const std::list<std::string> & requestsAddresses);
bool isEmpty();
......@@ -59,13 +73,11 @@ public:
};
class RepackQueuePending: public RepackQueue {
public:
template<typename...Ts> RepackQueuePending(Ts&...args): RepackQueue(args...) {}
using RepackQueue::RepackQueue;
};
class RepackQueueToExpand: public RepackQueue {
public:
template<typename...Ts> RepackQueueToExpand(Ts&...args): RepackQueue(args...) {}
using RepackQueue::RepackQueue;
};
}} // namespace cta::objectstore
......@@ -21,6 +21,7 @@
#include "RepackQueue.hpp"
#include "common/make_unique.hpp"
#include "common/optional.hpp"
#include "RepackRequest.hpp"
namespace cta { namespace objectstore {
......@@ -30,7 +31,7 @@ template<typename C>
struct ContainerTraits<RepackQueue,C>
{
struct ContainerSummary : public RepackQueue::RequestsSummary {
std::string repackRequestAddress;
using RepackQueue::RequestsSummary::RequestsSummary;
void addDeltaToLog(ContainerSummary&, log::ScopedParamContainer&);
};
......@@ -38,6 +39,7 @@ struct ContainerTraits<RepackQueue,C>
struct InsertedElement {
std::unique_ptr<RepackRequest> repackRequest;
cta::optional<serializers::RepackRequestStatus> newStatus;
typedef std::list<InsertedElement> list;
};
......@@ -45,8 +47,7 @@ struct ContainerTraits<RepackQueue,C>
struct PoppedElement {
std::unique_ptr<RepackRequest> repackRequest;
std::string vid;
serializers::RepackRequestStatus status;
common::dataStructures::RepackInfo repackInfo;
};
struct PoppedElementsSummary;
struct PopCriteria {
......@@ -83,6 +84,7 @@ struct ContainerTraits<RepackQueue,C>
typedef std::list<std::unique_ptr<InsertedElement>> ElementMemoryContainer;
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
typedef std::set<ElementAddress> ElementsToSkipSet;
typedef serializers::RepackRequestStatus ElementStatus;
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
......@@ -127,6 +129,11 @@ struct ContainerTraits<RepackQueue,C>
static typename OpFailure<PoppedElement>::list
switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const ContainerAddress &contAddress,
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc);
static typename OpFailure<PoppedElement>::list
switchElementsOwnershipAndStatus(PoppedElementsBatch &poppedElementBatch, const ContainerAddress &contAddress,
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc,
cta::optional<ElementStatus> &newStatus = cta::nullopt);
static PoppedElementsSummary getElementSummary(const PoppedElement &);
static PoppedElementsBatch getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
......@@ -153,6 +160,25 @@ addDeltaToLog(ContainerSummary &previous, log::ScopedParamContainer &params) {
.add("queueRequestsAfter", requests);
}
template<typename C>
auto ContainerTraits<RepackQueue,C>::
getPoppingElementsCandidates(Container& cont, PopCriteria& unfulfilledCriteria, ElementsToSkipSet& elemtsToSkip,
log::LogContext& lc) -> PoppedElementsBatch
{
PoppedElementsBatch ret;
auto candidateReqsFromQueue=cont.getCandidateList(unfulfilledCriteria.requests, elemtsToSkip);
for (auto &crfq: candidateReqsFromQueue.candidates) {
ret.elements.emplace_back(PoppedElement());
PoppedElement & elem = ret.elements.back();
elem.repackRequest = cta::make_unique<RepackRequest>(crfq.address, cont.m_objectStore);
elem.repackInfo.status = common::dataStructures::RepackInfo::Status::Undefined;
elem.repackInfo.type = common::dataStructures::RepackInfo::Type::Undefined;
elem.repackInfo.vid = "";
ret.summary.requests++;
}
return ret;
}
template<typename C>
auto ContainerTraits<RepackQueue,C>::PopCriteria::
operator-=(const PoppedElementsSummary &pes) -> PopCriteria& {
......@@ -220,7 +246,7 @@ getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, cons
} catch (RootEntry::NotAllocated &) {
throw NoSuchContainer("In ContainerTraits<RepackQueue,C>::getLockedAndFetchedNoCreate(): no such repack queue");
}
// try and lock the archive queue. Any failure from here on means the end of the getting jobs.
// try and lock the repack queue. Any failure from here on means the end of the getting jobs.
cont.setAddress(rpkQAddress);
//findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter);
try {
......@@ -238,17 +264,20 @@ getLockedAndFetchedNoCreate(Container& cont, ScopedExclusiveLock& contLock, cons
re.removeRepackQueueAndCommit(queueType, lc);
log::ScopedParamContainer params(lc);
params.add("queueObject", cont.getAddressIfSet());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry");
lc.log(log::INFO,
"In ContainerTraits<RepackQueue,C>::getLockedAndFetchedNoCreate(): de-referenced missing queue from root entry");
} catch (RootEntry::RepackQueueNotEmpty & ex) {
log::ScopedParamContainer params(lc);
params.add("queueObject", cont.getAddressIfSet())
.add("Message", ex.getMessageValue());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry");
lc.log(log::INFO,
"In ContainerTraits<RepackQueue,C>::getLockedAndFetchedNoCreate(): could not de-referenced missing queue from root entry");
} catch (RootEntry::NoSuchRepackQueue & ex) {
// Somebody removed the queue in the mean time. Barely worth mentioning.
log::ScopedParamContainer params(lc);
params.add("queueObject", cont.getAddressIfSet());
lc.log(log::DEBUG, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry: already done.");
lc.log(log::DEBUG,
"In ContainerTraits<RepackQueue,C>::getLockedAndFetchedNoCreate(): could not de-referenced missing queue from root entry: already done.");
}
//emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter);
attemptCount++;
......@@ -298,10 +327,10 @@ switchElementsOwnership(typename InsertedElement::list& elemMemCont, const Conta
const ContainerAddress& previousOwnerAddress, log::TimingList& timingList, utils::Timer &t, log::LogContext& lc)
-> typename OpFailure<InsertedElement>::list
{
std::list<std::unique_ptr<RepackRequest::AsyncOwnerUpdater>> updaters;
std::list<std::unique_ptr<RepackRequest::AsyncOwnerAndStatusUpdater>> updaters;
for (auto & e: elemMemCont) {
RepackRequest & repr = *e.repackRequest;
updaters.emplace_back(repr.asyncUpdateOwner(contAddress, previousOwnerAddress, cta::nullopt));
updaters.emplace_back(repr.asyncUpdateOwnerAndStatus(contAddress, previousOwnerAddress, cta::nullopt));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
......@@ -324,15 +353,15 @@ switchElementsOwnership(typename InsertedElement::list& elemMemCont, const Conta
template<typename C>
auto ContainerTraits<RepackQueue,C>::
switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const ContainerAddress &contAddress,
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc)
switchElementsOwnershipAndStatus(PoppedElementsBatch &poppedElementBatch, const ContainerAddress &contAddress,
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc,
cta::optional<ElementStatus> &newStatus)
-> typename OpFailure<PoppedElement>::list
{
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
std::list<std::unique_ptr<RepackRequest::AsyncOwnerAndStatusUpdater>> updaters;
for (auto & e: poppedElementBatch.elements) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, contAddress, previousOwnerAddress, cta::nullopt));
RepackRequest & rr = *e.repackRequest;
updaters.emplace_back(rr.asyncUpdateOwnerAndStatus(contAddress, previousOwnerAddress, newStatus));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
......@@ -341,21 +370,7 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container
while (e != poppedElementBatch.elements.end()) {
try {
u->get()->wait();
e->archiveFile = u->get()->getArchiveFile();
e->archiveReportURL = u->get()->getArchiveReportURL();
e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL();
switch(u->get()->getJobStatus()) {
case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
e->reportType = SchedulerDatabase::ArchiveJob::ReportType::CompletionReport;
break;
case serializers::ArchiveJobStatus::AJS_ToReportForFailure:
e->reportType = SchedulerDatabase::ArchiveJob::ReportType::FailureReport;
break;
default:
e->reportType = SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired;
break;
}
e->repackInfo = u->get()->getInfo();
} catch (...) {
ret.push_back(OpFailure<PoppedElement>());
ret.back().element = &(*e);
......@@ -378,43 +393,4 @@ getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary {
// RepackQueue full specialisations for ContainerTraits.
template<>
struct ContainerTraits<RepackQueue,RepackQueuePending>::PopCriteria {
uint64_t files;
uint64_t bytes;
PopCriteria(uint64_t f = 0, uint64_t b = 0) : files(f), bytes(b) {}
template<typename PoppedElementsSummary_t>
PopCriteria& operator-=(const PoppedElementsSummary_t &pes) {
bytes -= pes.bytes;
files -= pes.files;
return *this;
}
};
template<>
struct ContainerTraits<RepackQueue,RepackQueuePending>::PoppedElementsSummary {
uint64_t files;
uint64_t bytes;