Skip to content
Snippets Groups Projects
Commit 37f2c781 authored by Eric Cano's avatar Eric Cano
Browse files

Sepatated algoritm definitions from traits definition.

parent 627a37ba
No related branches found
No related tags found
No related merge requests found
...@@ -29,117 +29,30 @@ ...@@ -29,117 +29,30 @@
namespace cta { namespace objectstore { namespace cta { namespace objectstore {
class EmptyClass {};
class ArchiveQueue;
class ArchiveJob;
/** /**
* Container traits definition. To be specialized class by class. * Container traits definition. To be specialized class by class.
* This is mostly a model. * This is mostly a model.
*/ */
template <class Container> template <class C>
class ContainerTraits{ class ContainerTraits{
public: public:
typedef C Container;
typedef std::string ContainerAddress; typedef std::string ContainerAddress;
typedef std::string ElementAddress; typedef std::string ElementAddress;
typedef std::string ContainerIdentifyer; typedef std::string ContainerIdentifyer;
typedef cta::objectstore::EmptyClass Element; class Element {};
typedef std::list<std::unique_ptr<Element>> ElementMemoryContainer; typedef std::list<std::unique_ptr<Element>> ElementMemoryContainer;
typedef std::list <Element *> ElementPointerContainer; typedef std::list <Element *> ElementPointerContainer;
typedef cta::objectstore::EmptyClass ElementDescriptor; class ElementDescriptor {};
typedef std::list<ElementDescriptor> ElementDescriptorContainer; typedef std::list<ElementDescriptor> ElementDescriptorContainer;
static ElementAddress getElementAddress(const Element & e);
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & contLock, AgentReference & agRef, const ContainerIdentifyer & cId, static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & contLock, AgentReference & agRef, const ContainerIdentifyer & cId,
log::LogContext & lc); log::LogContext & lc);
static void addReferencesAndCommit(Container & cont, ElementMemoryContainer & elemMemCont); static void addReferencesAndCommit(Container & cont, ElementMemoryContainer & elemMemCont);
static ElementPointerContainer switchElementsOwnership(Container & cont, ElementMemoryContainer & elements, log::LogContext & lc); static ElementPointerContainer switchElementsOwnership(Container & cont, ElementMemoryContainer & elements, log::LogContext & lc);
}; };
template <>
class ContainerTraits<ArchiveQueue> {
public:
typedef ArchiveQueue Container;
typedef std::string ContainerAddress;
typedef std::string ElementAddress;
typedef std::string ContainerIdentifyer;
struct Element {
std::unique_ptr<ArchiveRequest> archiveRequest;
uint16_t copyNb;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::MountPolicy mountPolicy;
};
typedef std::list<Element> ElementMemoryContainer;
struct ElementOpFailure {
Element * element;
std::exception_ptr failure;
};
typedef std::list<ElementOpFailure> ElementOpFailureContainer;
typedef ArchiveRequest::JobDump ElementDescriptor;
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
static ElementAddress getElementAddress(const Element & e) { return e.archiveRequest->getAddressIfSet(); }
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, const ContainerIdentifyer & tapePool,
log::LogContext & lc) {
Helpers::getLockedAndFetchedQueue<ArchiveQueue>(cont, aqL, agRef, tapePool, QueueType::LiveJobs, lc);
}
static void addReferencesAndCommit(Container & cont, ElementMemoryContainer & elemMemCont,
AgentReference & agentRef, log::LogContext & lc) {
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
for (auto & e: elemMemCont) {
ArchiveRequest::JobDump jd;
jd.copyNb = e.copyNb;
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
e.mountPolicy, time(NULL)});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
static void removeReferencesAndCommit(Container & cont, ElementOpFailureContainer & elementsOpFailures) {
std::list<std::string> elementsToRemove;
for (auto & eof: elementsOpFailures) {
elementsToRemove.emplace_back(eof.element->archiveRequest->getAddressIfSet());
}
cont.removeJobsAndCommit(elementsToRemove);
}
static ElementOpFailureContainer switchElementsOwnership(ElementMemoryContainer & elemMemCont, Container & cont,
const ContainerAddress & previousOwnerAddress, log::LogContext & lc) {
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: elemMemCont) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, cont.getAddressIfSet(), previousOwnerAddress));
}
auto u = updaters.begin();
auto e = elemMemCont.begin();
ElementOpFailureContainer ret;
while (e != elemMemCont.end()) {
try {
u->get()->wait();
} catch (...) {
ret.push_back(ElementOpFailure());
ret.back().element = &(*e);
ret.back().failure = std::current_exception();
}
u++;
e++;
}
return ret;
}
class OwnershipSwitchFailure: public cta::exception::Exception {
public:
OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {};
ElementOpFailureContainer failedElements;
};
};
template <class C> template <class C>
class ContainerAlgorithms { class ContainerAlgorithms {
public: public:
......
...@@ -20,8 +20,7 @@ ...@@ -20,8 +20,7 @@
#include "RootEntry.hpp" #include "RootEntry.hpp"
#include "AgentReference.hpp" #include "AgentReference.hpp"
#include "Agent.hpp" #include "Agent.hpp"
#include "Algorithms.hpp" #include "ArchiveQueueAlgorithms.hpp"
#include "ArchiveRequest.hpp"
#include "common/log/DummyLogger.hpp" #include "common/log/DummyLogger.hpp"
#include "tests/TestsCompileTimeSwitches.hpp" #include "tests/TestsCompileTimeSwitches.hpp"
#include "catalogue/DummyCatalogue.hpp" #include "catalogue/DummyCatalogue.hpp"
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Algorithms.hpp"
#include "ArchiveQueue.hpp"
namespace cta { namespace objectstore {
template <>
class ContainerTraits<ArchiveQueue> {
public:
typedef ArchiveQueue Container;
typedef std::string ContainerAddress;
typedef std::string ElementAddress;
typedef std::string ContainerIdentifyer;
struct Element {
std::unique_ptr<ArchiveRequest> archiveRequest;
uint16_t copyNb;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::MountPolicy mountPolicy;
};
typedef std::list<Element> ElementMemoryContainer;
struct ElementOpFailure {
Element * element;
std::exception_ptr failure;
};
typedef std::list<ElementOpFailure> ElementOpFailureContainer;
typedef ArchiveRequest::JobDump ElementDescriptor;
typedef std::list<ElementDescriptor> ElementDescriptorContainer;
static ElementAddress getElementAddress(const Element & e) { return e.archiveRequest->getAddressIfSet(); }
static void getLockedAndFetched(Container & cont, ScopedExclusiveLock & aqL, AgentReference & agRef, const ContainerIdentifyer & tapePool,
log::LogContext & lc) {
Helpers::getLockedAndFetchedQueue<ArchiveQueue>(cont, aqL, agRef, tapePool, QueueType::LiveJobs, lc);
}
static void addReferencesAndCommit(Container & cont, ElementMemoryContainer & elemMemCont,
AgentReference & agentRef, log::LogContext & lc) {
std::list<ArchiveQueue::JobToAdd> jobsToAdd;
for (auto & e: elemMemCont) {
ArchiveRequest::JobDump jd;
jd.copyNb = e.copyNb;
jd.tapePool = cont.getTapePool();
jd.owner = cont.getAddressIfSet();
ArchiveRequest & ar = *e.archiveRequest;
jobsToAdd.push_back({jd, ar.getAddressIfSet(), e.archiveFile.archiveFileID, e.archiveFile.fileSize,
e.mountPolicy, time(NULL)});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
static void removeReferencesAndCommit(Container & cont, ElementOpFailureContainer & elementsOpFailures) {
std::list<std::string> elementsToRemove;
for (auto & eof: elementsOpFailures) {
elementsToRemove.emplace_back(eof.element->archiveRequest->getAddressIfSet());
}
cont.removeJobsAndCommit(elementsToRemove);
}
static ElementOpFailureContainer switchElementsOwnership(ElementMemoryContainer & elemMemCont, Container & cont,
const ContainerAddress & previousOwnerAddress, log::LogContext & lc) {
std::list<std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater>> updaters;
for (auto & e: elemMemCont) {
ArchiveRequest & ar = *e.archiveRequest;
auto copyNb = e.copyNb;
updaters.emplace_back(ar.asyncUpdateJobOwner(copyNb, cont.getAddressIfSet(), previousOwnerAddress));
}
auto u = updaters.begin();
auto e = elemMemCont.begin();
ElementOpFailureContainer ret;
while (e != elemMemCont.end()) {
try {
u->get()->wait();
} catch (...) {
ret.push_back(ElementOpFailure());
ret.back().element = &(*e);
ret.back().failure = std::current_exception();
}
u++;
e++;
}
return ret;
}
class OwnershipSwitchFailure: public cta::exception::Exception {
public:
OwnershipSwitchFailure(const std::string & message): cta::exception::Exception(message) {};
ElementOpFailureContainer failedElements;
};
};
}} // namespace cta::objectstore
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment