Skip to content
Snippets Groups Projects
Commit f6b0786b authored by Eric Cano's avatar Eric Cano
Browse files

Moved ArchiveMount::getNextJobsBatch to generic algoritms.

parent d78aebe0
No related branches found
No related tags found
No related merge requests found
...@@ -120,6 +120,7 @@ public: ...@@ -120,6 +120,7 @@ public:
m_backend(backend), m_agentReference(agentReference) {} m_backend(backend), m_agentReference(agentReference) {}
typedef typename ContainerTraits<C>::InsertedElement InsertedElement; typedef typename ContainerTraits<C>::InsertedElement InsertedElement;
typedef typename ContainerTraits<C>::PopCriteria PopCriteria;
/** Reference objects in the container and then switch their ownership them. Objects /** Reference objects in the container and then switch their ownership them. Objects
* are provided existing and owned by algorithm's agent. Returns a list of * are provided existing and owned by algorithm's agent. Returns a list of
...@@ -175,7 +176,8 @@ public: ...@@ -175,7 +176,8 @@ public:
typename ContainerTraits<C>::ElementsToSkipSet elementsToSkip; typename ContainerTraits<C>::ElementsToSkipSet elementsToSkip;
log::TimingList timingList; log::TimingList timingList;
utils::Timer t, totalTime; utils::Timer t, totalTime;
while (ret.summary < popCriteria) { bool unexpectedException = false;
while (!unexpectedException && ret.summary < popCriteria) {
typename ContainerTraits<C>::PoppedElementsSummary previousSummary = ret.summary; typename ContainerTraits<C>::PoppedElementsSummary previousSummary = ret.summary;
log::TimingList localTimingList; log::TimingList localTimingList;
// Get a container if it exists // Get a container if it exists
...@@ -233,7 +235,7 @@ public: ...@@ -233,7 +235,7 @@ public:
std::list<typename ContainerTraits<C>::ElementAddress> elementsToDereferenceFromAgent; std::list<typename ContainerTraits<C>::ElementAddress> elementsToDereferenceFromAgent;
for (auto &e: failedOwnershipSwitchElements) { for (auto &e: failedOwnershipSwitchElements) {
try { try {
throw e.failure; std::rethrow_exception(e.failure);
} catch (Backend::NoSuchObject &) { } catch (Backend::NoSuchObject &) {
elementsToDereferenceFromAgent.push_back(ContainerTraits<C>::getElementAddress(*e.element)); elementsToDereferenceFromAgent.push_back(ContainerTraits<C>::getElementAddress(*e.element));
elementsNotToReport.insert(ContainerTraits<C>::getElementAddress(*e.element)); elementsNotToReport.insert(ContainerTraits<C>::getElementAddress(*e.element));
...@@ -249,6 +251,8 @@ public: ...@@ -249,6 +251,8 @@ public:
elementsToDereferenceFromAgent.push_back(ContainerTraits<C>::getElementAddress(*e.element)); elementsToDereferenceFromAgent.push_back(ContainerTraits<C>::getElementAddress(*e.element));
elementsNotToReport.insert(ContainerTraits<C>::getElementAddress(*e.element)); elementsNotToReport.insert(ContainerTraits<C>::getElementAddress(*e.element));
elementsToSkip.insert(ContainerTraits<C>::getElementAddress(*e.element)); elementsToSkip.insert(ContainerTraits<C>::getElementAddress(*e.element));
// If we get this kind of situation, we do not try to carry on, as it becomes too complex.
unexpectedException = true;
} }
} }
// We are done with the sorting. Apply the decisions... // We are done with the sorting. Apply the decisions...
......
...@@ -172,7 +172,8 @@ auto ContainerTraits<ArchiveQueue>::getPoppingElementsCandidates(Container& cont ...@@ -172,7 +172,8 @@ auto ContainerTraits<ArchiveQueue>::getPoppingElementsCandidates(Container& cont
PoppedElementsBatch ret; PoppedElementsBatch ret;
auto candidateJobsFromQueue=cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip); auto candidateJobsFromQueue=cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip);
for (auto &cjfq: candidateJobsFromQueue.candidates) { for (auto &cjfq: candidateJobsFromQueue.candidates) {
ret.elements.emplace_back(PoppedElement{cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore), cjfq.copyNb, cjfq.size}); ret.elements.emplace_back(PoppedElement{cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore), cjfq.copyNb, cjfq.size,
common::dataStructures::ArchiveFile(), "", "", "", });
ret.summary.bytes += cjfq.size; ret.summary.bytes += cjfq.size;
ret.summary.files++; ret.summary.files++;
} }
...@@ -219,6 +220,10 @@ auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(PoppedElementsBatch ...@@ -219,6 +220,10 @@ auto ContainerTraits<ArchiveQueue>::switchElementsOwnership(PoppedElementsBatch
while (e != popedElementBatch.elements.end()) { while (e != popedElementBatch.elements.end()) {
try { try {
u->get()->wait(); u->get()->wait();
e->archiveFile = u->get()->getArchiveFile();
e->archiveReportURL = u->get()->getArchiveReportURL();
e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL();
} catch (...) { } catch (...) {
ret.push_back(OpFailure<PoppedElement>()); ret.push_back(OpFailure<PoppedElement>());
ret.back().element = &(*e); ret.back().element = &(*e);
......
...@@ -87,6 +87,10 @@ public: ...@@ -87,6 +87,10 @@ public:
std::unique_ptr<ArchiveRequest> archiveRequest; std::unique_ptr<ArchiveRequest> archiveRequest;
uint16_t copyNb; uint16_t copyNb;
uint64_t bytes; uint64_t bytes;
common::dataStructures::ArchiveFile archiveFile;
std::string archiveReportURL;
std::string errorReportURL;
std::string srcURL;
}; };
class PoppedElementsSummary; class PoppedElementsSummary;
class PopCriteria { class PopCriteria {
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment