Skip to content
Snippets Groups Projects
Commit 22810941 authored by Michael Davis's avatar Michael Davis
Browse files

[os-generic-queues] Puts back queue specializations

parent 4fe78b26
No related branches found
No related tags found
No related merge requests found
......@@ -135,8 +135,8 @@ struct ContainerTraits
const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, log::LogContext &lc);
static PoppedElementsSummary getElementSummary(const PoppedElement &);
static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria,
ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
static PoppedElementsBatch getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
ElementsToSkipSet &elemtsToSkip, log::LogContext &lc);
static const std::string c_containerTypeName; // = "genericContainer"
static const std::string c_identifyerType; // = "genericId"
......
......@@ -88,55 +88,4 @@ getElementAddress(const Element &e) {
}
#if 0
template <>
class ContainerTraits<ArchiveQueue> {
public:
typedef ArchiveQueue Container;
typedef std::string ContainerAddress;
typedef std::string ElementAddress;
typedef std::string ContainerIdentifier;
static const std::string c_containerTypeName; //= "ArchiveQueue";
static const std::string c_identifyerType; // = "tapepool";
template <class Element>
struct OpFailure {
Element * element;
std::exception_ptr failure;
typedef std::list<OpFailure> list;
};
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, const ContainerIdentifier & contId,
log::LogContext & lc);
static void getLockedAndFetchedNoCreate(Container & cont, ScopedExclusiveLock & contLock, const ContainerIdentifier & cId,
log::LogContext & lc);
static void addReferencesAndCommit(Container & cont, InsertedElement::list & elemMemCont,
AgentReference & agentRef, log::LogContext & lc);
static void addReferencesIfNecessaryAndCommit(Container & cont, InsertedElement::list & elemMemCont,
AgentReference & agentRef, log::LogContext & lc);
class OwnershipSwitchFailure: public cta::exception::Exception {
public:
OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {};
OpFailure<InsertedElement>::list failedElements;
};
typedef std::set<ElementAddress> ElementsToSkipSet;
static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria,
ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
};
#endif
}} // namespace cta::objectstore
/*
/**
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
......@@ -17,6 +17,7 @@
*/
#pragma once
#include "Algorithms.hpp"
#include "RetrieveQueue.hpp"
......@@ -39,7 +40,7 @@ struct ContainerTraitsTypes<RetrieveQueue>
typedef std::list<InsertedElement> list;
};
struct ElementDescriptor {};
typedef RetrieveRequest::JobDump ElementDescriptor;
struct PoppedElement {};
struct PoppedElementsSummary;
......@@ -48,16 +49,18 @@ struct ContainerTraitsTypes<RetrieveQueue>
PopCriteria& operator-= (const PoppedElementsSummary &);
};
struct PoppedElementsSummary {
//PoppedElementsSummary();
bool operator<(const PopCriteria&);
PoppedElementsSummary& operator+=(const PoppedElementsSummary&);
PoppedElementsSummary(const PoppedElementsSummary&);
void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer&);
//PoppedElementsSummary(const PoppedElementsSummary&);
//void addDeltaToLog(const PoppedElementsSummary&, log::ScopedParamContainer&);
};
struct PoppedElementsList {
PoppedElementsList();
void insertBack(PoppedElementsList&&);
};
struct PoppedElementsBatch {
//PoppedElementsBatch();
PoppedElementsList elements;
PoppedElementsSummary summary;
void addToLog(log::ScopedParamContainer&);
......@@ -73,107 +76,4 @@ getElementAddress(const Element &e) {
}
#if 0
template <>
class ContainerTraits<RetrieveQueue> {
public:
typedef RetrieveQueue Container;
typedef std::string ContainerAddress;
typedef std::string ElementAddress;
typedef std::string ContainerIdentifyer;
template <class Element>
struct OpFailure {
Element * element;
std::exception_ptr failure;
typedef std::list<OpFailure> list;
};
typedef RetrieveRequest::JobDump ElementDescriptor;
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, const ContainerIdentifyer & contId,
log::LogContext & lc) {
Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, QueueType::LiveJobs, lc);
}
static void addReferencesAndCommit(Container & cont, InsertedElement::list & elemMemCont,
AgentReference & agentRef, log::LogContext & lc) {
std::list<RetrieveQueue::JobToAdd> jobsToAdd;
for (auto & e: elemMemCont) {
RetrieveRequest & rr = *e.retrieveRequest;
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr)});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
static void removeReferencesAndCommit(Container & cont, OpFailure<InsertedElement>::list & elementsOpFailures) {
std::list<std::string> elementsToRemove;
for (auto & eof: elementsOpFailures) {
elementsToRemove.emplace_back(eof.element->retrieveRequest->getAddressIfSet());
}
cont.removeJobsAndCommit(elementsToRemove);
}
static OpFailure<InsertedElement>::list switchElementsOwnership(InsertedElement::list & elemMemCont, const ContainerAddress & contAddress,
const ContainerAddress & previousOwnerAddress, log::TimingList& timingList, utils::Timer & t, log::LogContext & lc) {
std::list<std::unique_ptr<RetrieveRequest::AsyncOwnerUpdater>> updaters;
for (auto & e: elemMemCont) {
RetrieveRequest & rr = *e.retrieveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(rr.asyncUpdateOwner(copyNb, contAddress, previousOwnerAddress));
}
timingList.insertAndReset("asyncUpdateLaunchTime", t);
auto u = updaters.begin();
auto e = elemMemCont.begin();
OpFailure<InsertedElement>::list ret;
while (e != elemMemCont.end()) {
try {
u->get()->wait();
} catch (...) {
ret.push_back(OpFailure<InsertedElement>());
ret.back().element = &(*e);
ret.back().failure = std::current_exception();
}
u++;
e++;
}
timingList.insertAndReset("asyncUpdateCompletionTime", t);
return ret;
}
class OwnershipSwitchFailure: public cta::exception::Exception {
public:
OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {};
OpFailure<InsertedElement>::list failedElements;
};
class PoppedElementsSummary;
class PopCriteria {
public:
PopCriteria();
PopCriteria& operator-= (const PoppedElementsSummary &);
};
class PoppedElementsList {
public:
PoppedElementsList();
void insertBack(PoppedElementsList &&);
};
class PoppedElementsSummary {
public:
PoppedElementsSummary();
bool operator< (const PopCriteria &);
PoppedElementsSummary& operator+= (const PoppedElementsSummary &);
};
class PoppedElementsBatch {
public:
PoppedElementsBatch();
PoppedElementsList elements;
PoppedElementsSummary summary;
};
CTA_GENERATE_EXCEPTION_CLASS(NoSuchContainer);
};
#endif
}} // namespace cta::objectstore
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment