Skip to content
Snippets Groups Projects
Commit 5c928007 authored by Michael Davis's avatar Michael Davis
Browse files

[os-generic-queues] Implements ContainerTraits<RetrieveQueue>::getLockedAndFetchedNoCreate()

parent dc4d38c8
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,7 @@ const std::string ContainerTraits<RetrieveQueue>::c_identifierType = "vid";
void ContainerTraitsTypes<RetrieveQueue>::PoppedElementsSummary::
addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer &params) {
throw std::runtime_error("1 Not implemented.");
#if 0
params.add("filesAdded", files - previous.files)
.add("bytesAdded", bytes - previous.bytes)
......@@ -43,6 +44,7 @@ addDeltaToLog(const PoppedElementsSummary &previous, log::ScopedParamContainer &
void ContainerTraitsTypes<RetrieveQueue>::ContainerSummary::
addDeltaToLog(const ContainerSummary &previous, log::ScopedParamContainer &params) {
throw std::runtime_error("2 Not implemented.");
#if 0
params.add("queueJobsBefore", previous.jobs)
.add("queueBytesBefore", previous.bytes)
......@@ -89,57 +91,54 @@ template<>
void ContainerTraits<RetrieveQueue>::
getLockedAndFetchedNoCreate(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId, log::LogContext &lc)
{
#if 0
// Try and get access to a queue.
size_t attemptCount = 0;
retry:
retry:
objectstore::RootEntry re(cont.m_objectStore);
re.fetchNoLock();
std::string aqAddress;
auto aql = re.dumpArchiveQueues(QueueType::LiveJobs);
for (auto & aqp : aql) {
if (aqp.tapePool == cId)
aqAddress = aqp.address;
std::string rqAddress;
auto rql = re.dumpRetrieveQueues(QueueType::LiveJobs);
for (auto &rqp : rql) {
if (rqp.vid == cId)
rqAddress = rqp.address;
}
if (!aqAddress.size()) throw NoSuchContainer("In ContainerTraits<ArchiveQueue>::getLockedAndFetchedNoCreate(): no such archive queue");
// try and lock the archive queue. Any failure from here on means the end of the getting jobs.
cont.setAddress(aqAddress);
//findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter);
if(rqAddress.empty()) throw NoSuchContainer("In ContainerTraits<RetrieveQueue>::getLockedAndFetchedNoCreate(): no such retrieve queue");
// try and lock the retrieve queue. Any failure from here on means the end of the getting jobs.
cont.setAddress(rqAddress);
try {
if (contLock.isLocked()) contLock.release();
if(contLock.isLocked()) contLock.release();
contLock.lock(cont);
cont.fetch();
//lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter);
} catch (cta::exception::Exception & ex) {
} catch(cta::exception::Exception & ex) {
// The queue is now absent. We can remove its reference in the root entry.
// A new queue could have been added in the mean time, and be non-empty.
// A new queue could have been added in the meantime, and be non-empty.
// We will then fail to remove from the RootEntry (non-fatal).
ScopedExclusiveLock rexl(re);
re.fetch();
try {
re.removeArchiveQueueAndCommit(cId, QueueType::LiveJobs, lc);
re.removeRetrieveQueueAndCommit(cId, QueueType::LiveJobs, lc);
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
params.add("vid", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): de-referenced missing queue from root entry");
} catch (RootEntry::ArchiveQueueNotEmpty & ex) {
lc.log(log::INFO, "In ContainerTraits<RetrieveQueue>::getLockedAndFetchedNoCreate(): dereferenced missing queue from root entry");
} catch (RootEntry::RetrieveQueueNotEmpty &ex) {
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
params.add("vid", cId)
.add("queueObject", cont.getAddressIfSet())
.add("Message", ex.getMessageValue());
lc.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry");
} catch (RootEntry::NoSuchArchiveQueue & ex) {
// Somebody removed the queue in the mean time. Barely worth mentioning.
lc.log(log::INFO, "In ContainerTraits<RetrieveQueue>::getLockedAndFetchedNoCreate(): could not dereference missing queue from root entry");
} catch (RootEntry::NoSuchRetrieveQueue &ex) {
// Somebody removed the queue in the meantime. Barely worth mentioning.
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
params.add("vid", cId)
.add("queueObject", cont.getAddressIfSet());
lc.log(log::DEBUG, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry: already done.");
lc.log(log::DEBUG, "In ContainerTraits<RetrieveQueue>::getLockedAndFetchedNoCreate(): could not dereference missing queue from root entry: already done.");
}
//emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter);
attemptCount++;
goto retry;
}
#endif
}
template<>
......@@ -147,6 +146,7 @@ void ContainerTraits<RetrieveQueue>::
addReferencesAndCommit(Container &cont, InsertedElement::list &elemMemCont, AgentReference &agentRef,
log::LogContext &lc)
{
throw std::runtime_error("4 Not implemented.");
std::list<RetrieveQueue::JobToAdd> jobsToAdd;
for (auto &e : elemMemCont) {
RetrieveRequest &rr = *e.retrieveRequest;
......@@ -172,6 +172,7 @@ template<>
void ContainerTraits<RetrieveQueue>::addReferencesIfNecessaryAndCommit(Container& cont,
InsertedElement::list& elemMemCont, AgentReference& agentRef, log::LogContext& lc)
{
throw std::runtime_error("5 Not implemented.");
#if 0
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
for (auto & e: elemMemCont) {
......@@ -207,6 +208,7 @@ removeReferencesAndCommit(Container &cont, std::list<ElementAddress> &elementAdd
template<>
auto ContainerTraits<RetrieveQueue>::
getContainerSummary(Container &cont) -> ContainerSummary {
throw std::runtime_error("6 Not implemented.");
ContainerSummary ret;
#if 0
ret.JobsSummary::operator=(cont.getJobsSummary());
......@@ -250,6 +252,7 @@ auto ContainerTraits<RetrieveQueue>::
getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip,
log::LogContext &lc) -> PoppedElementsBatch
{
throw std::runtime_error("7 Not implemented.");
PoppedElementsBatch ret;
#if 0
auto candidateJobsFromQueue=cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip);
......@@ -266,6 +269,7 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
template<>
auto ContainerTraits<RetrieveQueue>::
getElementSummary(const PoppedElement &poppedElement) -> PoppedElementsSummary {
throw std::runtime_error("8 Not implemented.");
PoppedElementsSummary ret;
#if 0
ret.bytes = poppedElement.bytes;
......@@ -280,6 +284,7 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t,
log::LogContext &lc) -> OpFailure<PoppedElement>::list
{
throw std::runtime_error("9 Not implemented.");
#if 0
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: popedElementBatch.elements) {
......@@ -318,6 +323,7 @@ void ContainerTraits<RetrieveQueue>::
trimContainerIfNeeded(Container &cont, ScopedExclusiveLock &contLock, const ContainerIdentifier &cId,
log::LogContext &lc)
{
throw std::runtime_error("10 Not implemented.");
#if 0
if(cont.isEmpty()) {
// The current implementation is done unlocked.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment