From ae7175fdcc98698bc8ec8d8afc00634b92c2f183 Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Mon, 23 Jul 2018 17:29:48 +0200 Subject: [PATCH] [os-generic-queues] Adds RetrieveQueueAlgorithms.cpp --- objectstore/RetrieveQueueAlgorithms.cpp | 87 +++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 objectstore/RetrieveQueueAlgorithms.cpp diff --git a/objectstore/RetrieveQueueAlgorithms.cpp b/objectstore/RetrieveQueueAlgorithms.cpp new file mode 100644 index 0000000000..d7b16b8d12 --- /dev/null +++ b/objectstore/RetrieveQueueAlgorithms.cpp @@ -0,0 +1,87 @@ +/** + * The CERN Tape Archive (CTA) project + * Copyright © 2018 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "RetrieveQueueAlgorithms.hpp" + +namespace cta { namespace objectstore { + +template<> +void ContainerTraits<RetrieveQueue>:: +getLockedAndFetched(Container &cont, ScopedExclusiveLock &aqL, AgentReference &agRef, + const ContainerIdentifier &contId, log::LogContext &lc) +{ + Helpers::getLockedAndFetchedQueue<Container>(cont, aqL, agRef, contId, QueueType::LiveJobs, lc); +} + +template<> +void ContainerTraits<RetrieveQueue>:: +addReferencesAndCommit(Container &cont, InsertedElement::list &elemMemCont, AgentReference &agentRef, + log::LogContext &lc) +{ + std::list<RetrieveQueue::JobToAdd> jobsToAdd; + for (auto &e : elemMemCont) { + RetrieveRequest &rr = *e.retrieveRequest; + jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr)}); + } + cont.addJobsAndCommit(jobsToAdd, agentRef, lc); +} + +template<> +void ContainerTraits<RetrieveQueue>:: +removeReferencesAndCommit(Container &cont, OpFailure<InsertedElement>::list &elementsOpFailures) +{ + std::list<std::string> elementsToRemove; + for (auto &eof : elementsOpFailures) { + elementsToRemove.emplace_back(eof.element->retrieveRequest->getAddressIfSet()); + } + cont.removeJobsAndCommit(elementsToRemove); +} + +template<> +auto ContainerTraits<RetrieveQueue>:: +switchElementsOwnership(InsertedElement::list &elemMemCont, const ContainerAddress &contAddress, + const ContainerAddress &previousOwnerAddress, log::TimingList &timingList, utils::Timer &t, + log::LogContext &lc) -> OpFailure<InsertedElement>::list +{ + std::list<std::unique_ptr<RetrieveRequest::AsyncOwnerUpdater>> updaters; + for (auto &e : elemMemCont) { + RetrieveRequest &rr = *e.retrieveRequest; + auto copyNb = e.copyNb; + updaters.emplace_back(rr.asyncUpdateOwner(copyNb, contAddress, previousOwnerAddress)); + } + timingList.insertAndReset("asyncUpdateLaunchTime", t); + auto u = updaters.begin(); + auto e = elemMemCont.begin(); + OpFailure<InsertedElement>::list ret; + while (e != elemMemCont.end()) { + try { + u->get()->wait(); + } catch (...) { + ret.push_back(OpFailure<InsertedElement>()); + ret.back().element = &(*e); + ret.back().failure = std::current_exception(); + } + u++; + e++; + } + timingList.insertAndReset("asyncUpdateCompletionTime", t); + return ret; +} + + +}} // namespace cta::objectstore -- GitLab