From 3d7a407ec372584efdaaa83c0f3c9d0bf0a3d38f Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Tue, 4 Jun 2019 11:26:19 +0200 Subject: [PATCH] #471 Added support for activities in RetrieveQueue. --- .../ActivitiesFairShareWeights.cpp | 41 +++++ .../ActivitiesFairShareWeights.hpp | 35 ++++ objectstore/AlgorithmsTest.cpp | 2 +- objectstore/ArchiveQueue.hpp | 29 +-- objectstore/CMakeLists.txt | 7 +- objectstore/GarbageCollector.cpp | 2 +- objectstore/GarbageCollectorTest.cpp | 2 +- objectstore/RetrieveActivityCountMap.cpp | 165 ++++++++++++++++++ objectstore/RetrieveActivityCountMap.hpp | 56 ++++++ objectstore/RetrieveQueue.cpp | 18 ++ objectstore/RetrieveQueue.hpp | 45 ++--- objectstore/RetrieveQueueAlgorithms.hpp | 9 +- objectstore/RetrieveQueueShard.cpp | 26 ++- objectstore/RetrieveQueueShard.hpp | 5 + objectstore/RetrieveQueueTest.cpp | 140 ++++++++++++++- objectstore/RetrieveRequest.cpp | 56 +++++- objectstore/RetrieveRequest.hpp | 4 + objectstore/Sorter.cpp | 2 +- objectstore/Sorter.hpp | 1 + objectstore/cta.proto | 21 ++- scheduler/OStoreDB/MemQueues.cpp | 2 +- scheduler/OStoreDB/OStoreDB.cpp | 44 ++++- scheduler/OStoreDB/OStoreDB.hpp | 4 +- scheduler/OStoreDB/OStoreDBFactory.hpp | 2 +- scheduler/Scheduler.cpp | 5 +- scheduler/Scheduler.hpp | 2 +- scheduler/SchedulerDatabase.hpp | 2 +- 27 files changed, 640 insertions(+), 87 deletions(-) create mode 100644 common/dataStructures/ActivitiesFairShareWeights.cpp create mode 100644 common/dataStructures/ActivitiesFairShareWeights.hpp create mode 100644 objectstore/RetrieveActivityCountMap.cpp create mode 100644 objectstore/RetrieveActivityCountMap.hpp diff --git a/common/dataStructures/ActivitiesFairShareWeights.cpp b/common/dataStructures/ActivitiesFairShareWeights.cpp new file mode 100644 index 0000000000..765fceff15 --- /dev/null +++ b/common/dataStructures/ActivitiesFairShareWeights.cpp @@ -0,0 +1,41 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ActivitiesFairShareWeights.hpp" +#include "common/exception/Exception.hpp" + +namespace cta { namespace common { namespace dataStructures { + +void ActivitiesFairShareWeights::setWeightFromDouble(const std::string & activity, double weight) { + if (weight < 0 || weight > 1) + throw cta::exception::Exception("In ActivitiesFairShareWeights::setWeightFromDouble(): weight out of range."); + activitiesWeights[activity] = weight; +} + +void ActivitiesFairShareWeights::setWeightFromString(const std::string& activity, const std::string& sweight) { + if (sweight.empty()) + throw cta::exception::Exception("In ActivitiesFairShareWeights::setWeightFromString() empty string."); + size_t pos; + double weight = std::stod(sweight, &pos); + if (pos != sweight.size()) + throw cta::exception::Exception("In ActivitiesFairShareWeights::setWeightFromString(): bad format: garbage at the end of string."); + setWeightFromDouble(activity, weight); +} + + +}}} // namespace cta::common::dataStructures. diff --git a/common/dataStructures/ActivitiesFairShareWeights.hpp b/common/dataStructures/ActivitiesFairShareWeights.hpp new file mode 100644 index 0000000000..737f534ed2 --- /dev/null +++ b/common/dataStructures/ActivitiesFairShareWeights.hpp @@ -0,0 +1,35 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <string> +#include <map> + +namespace cta { namespace common { namespace dataStructures { + +struct ActivitiesFairShareWeights { + std::string diskInstance; + std::map<std::string, double> activitiesWeights; + /** set the weight for the activity, checking the value is in ]0, 1] */ + void setWeightFromDouble(const std::string & activity, double weight); + /** set the weight for an activity, first checking the string can be fully converted to a double, and then */ + void setWeightFromString(const std::string & activity, const std::string &sweight); +}; + +}}} \ No newline at end of file diff --git a/objectstore/AlgorithmsTest.cpp b/objectstore/AlgorithmsTest.cpp index 94999d4ed2..8eced3a140 100644 --- a/objectstore/AlgorithmsTest.cpp +++ b/objectstore/AlgorithmsTest.cpp @@ -78,7 +78,7 @@ void fillRetrieveRequests( rqc.mountPolicy.retrievePriority = 1; requestPtrs.emplace_back(new cta::objectstore::RetrieveRequest(rrAddr, be)); requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser>::InsertedElement{ - requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser + requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser, cta::nullopt }); auto &rr = *requests.back().retrieveRequest; rr.initialize(); diff --git a/objectstore/ArchiveQueue.hpp b/objectstore/ArchiveQueue.hpp index f2b68d1a74..efbdc99d60 100644 --- a/objectstore/ArchiveQueue.hpp +++ b/objectstore/ArchiveQueue.hpp @@ -141,28 +141,11 @@ public: static const uint64_t c_maxShardSize = 25000; }; -class ArchiveQueueToTransferForUser: public ArchiveQueue { - using ArchiveQueue::ArchiveQueue; -}; - -class ArchiveQueueToReportForUser: public ArchiveQueue { - using ArchiveQueue::ArchiveQueue; -}; - -class ArchiveQueueFailed: public ArchiveQueue { - using ArchiveQueue::ArchiveQueue; -}; - -class ArchiveQueueToTransferForRepack: public ArchiveQueue{ - using ArchiveQueue::ArchiveQueue; -}; - -class ArchiveQueueToReportToRepackForSuccess : public ArchiveQueue{ - using ArchiveQueue::ArchiveQueue; -}; - -class ArchiveQueueToReportToRepackForFailure: public ArchiveQueue{ - using ArchiveQueue::ArchiveQueue; -}; +class ArchiveQueueToTransferForUser: public ArchiveQueue { using ArchiveQueue::ArchiveQueue; }; +class ArchiveQueueToReportForUser: public ArchiveQueue { using ArchiveQueue::ArchiveQueue; }; +class ArchiveQueueFailed: public ArchiveQueue { using ArchiveQueue::ArchiveQueue; }; +class ArchiveQueueToTransferForRepack: public ArchiveQueue{ using ArchiveQueue::ArchiveQueue; }; +class ArchiveQueueToReportToRepackForSuccess : public ArchiveQueue{ using ArchiveQueue::ArchiveQueue; }; +class ArchiveQueueToReportToRepackForFailure: public ArchiveQueue{ using ArchiveQueue::ArchiveQueue; }; }} diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index 8e2dbcda78..d30eb6aa7f 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -29,7 +29,8 @@ set (CTAProtoFiles PROTOBUF3_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles}) -set (CTAProtoDependants objectstore/Agent.hpp +set (CTAProtoDependants + objectstore/Agent.hpp objectstore/ArchiveRequest.hpp objectstore/CreationLog.hpp objectstore/DriveRegister.hpp @@ -39,6 +40,7 @@ set (CTAProtoDependants objectstore/Agent.hpp objectstore/RepackIndex.hpp objectstore/RepackRequest.hpp objectstore/RepackQueue.hpp + objectstore/RetrieveActivityCountMap.hpp objectstore/RetrieveRequest.hpp objectstore/RootEntry.hpp objectstore/SchedulerGlobalLock.hpp @@ -100,7 +102,8 @@ add_library (ctaobjectstore SHARED GarbageCollector.cpp SchedulerGlobalLock.cpp ValueCountMap.cpp - Helpers.cpp) + Helpers.cpp + RetrieveActivityCountMap.cpp) set_property(TARGET ctaobjectstore PROPERTY SOVERSION "${CTA_SOVERSION}") set_property(TARGET ctaobjectstore PROPERTY VERSION "${CTA_LIBVERSION}") diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index 279a223450..6ff68c09a4 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -559,7 +559,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent& for (auto &tf: rr->getArchiveFile().tapeFiles) { if (tf.vid == vid) { jta.push_back({tf.copyNb, tf.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize, - rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time}); + rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity()}); } } } diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 0f786e4557..249b1f99ce 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -608,7 +608,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) { cta::objectstore::ScopedExclusiveLock rql(rq); rq.fetch(); std::list <cta::objectstore::RetrieveQueue::JobToAdd> jta; - jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time}); + jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time, cta::nullopt}); rq.addJobsAndCommit(jta, agentRef, lc); } if (pass < 5) { pass++; continue; } diff --git a/objectstore/RetrieveActivityCountMap.cpp b/objectstore/RetrieveActivityCountMap.cpp new file mode 100644 index 0000000000..fc393567f6 --- /dev/null +++ b/objectstore/RetrieveActivityCountMap.cpp @@ -0,0 +1,165 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "RetrieveActivityCountMap.hpp" +#include "common/exception/Exception.hpp" + +#include <algorithm> +#include <sstream> +#include <google/protobuf/util/json_util.h> + +namespace cta { namespace objectstore { + +//------------------------------------------------------------------------------ +// Constructor +//------------------------------------------------------------------------------ +RetrieveActivityCountMap::RetrieveActivityCountMap( + google::protobuf::RepeatedPtrField<serializers::RetrieveActivityCountPair>* retrieveActivityCountMap): + m_activityCountMap(*retrieveActivityCountMap) { } + +//------------------------------------------------------------------------------ +// RetrieveActivityCountMap::incCount() +//------------------------------------------------------------------------------ +void RetrieveActivityCountMap::incCount(const RetrieveActivityDescription& activityDescription) { + // Find the entry for this value (might fail) + auto counter = std::find(m_activityCountMap.begin(), m_activityCountMap.end(), activityDescription); + if (counter != m_activityCountMap.end()) { + if (counter->count() < 1) { + std::stringstream err; + err << "In ValueCountMap::incCount: unexpected count value=" << toString(counter->retrieve_activity_weight()) + << " count=" << counter->count(); + throw cta::exception::Exception(err.str()); + } else { + counter->set_count(counter->count()+1); + // Update the weight to the latest version (in case weights got updated since last time). + if (counter->retrieve_activity_weight().creation_time() < activityDescription.creationTime) { + counter->mutable_retrieve_activity_weight()->set_weight(activityDescription.weight); + counter->mutable_retrieve_activity_weight()->set_creation_time(activityDescription.creationTime); + } + } + } else { + // Create the new entry if necessary. + auto newCounter = m_activityCountMap.Add(); + newCounter->mutable_retrieve_activity_weight()->set_priority(activityDescription.priority); + newCounter->mutable_retrieve_activity_weight()->set_disk_instance_name(activityDescription.diskInstanceName); + newCounter->mutable_retrieve_activity_weight()->set_activity(activityDescription.activity); + newCounter->mutable_retrieve_activity_weight()->set_weight(activityDescription.weight); + newCounter->mutable_retrieve_activity_weight()->set_creation_time(activityDescription.creationTime); + newCounter->set_count(1); + } +} + +//------------------------------------------------------------------------------ +// RetrieveActivityCountMap::decCount() +//------------------------------------------------------------------------------ +void RetrieveActivityCountMap::decCount(const RetrieveActivityDescription& activityDescription) { + // Find the entry for this value. Failing is an error. + auto counter = std::find(m_activityCountMap.begin(), m_activityCountMap.end(), activityDescription); + if (counter == m_activityCountMap.end()) { + std::stringstream err; + err << "In RetrieveActivityCountMap::decCount: no entry found for value=" << toString(activityDescription); + throw cta::exception::Exception(err.str()); + } + // Decrement the value and remove the entry if needed. + if (counter->count() < 1) { + std::stringstream err; + err << "In ValueCountMap::decCount: entry with wrong count value=" << toString(activityDescription) << " count=" << counter->count(); + throw cta::exception::Exception(err.str()); + } + counter->set_count(counter->count()-1); + if (!counter->count()) { + auto size=m_activityCountMap.size(); + counter->Swap(&(*(m_activityCountMap.end()-1))); + m_activityCountMap.RemoveLast(); + // Cross check that the size has decreased. + if (size -1 != m_activityCountMap.size()) { + std::stringstream err; + err << "In ValueCountMap::decCount: unexpected size after trimming empty entry. expectedSize=" << size -1 << " newSize=" << m_activityCountMap.size(); + throw cta::exception::Exception(err.str()); + } + // Cross check we cannot find the value. + auto counter2 = std::find(m_activityCountMap.begin(), m_activityCountMap.end(), activityDescription); + if (m_activityCountMap.end() != counter2) { + std::stringstream err; + err << "In ValueCountMap::decCount: still found the value after trimming empty entry. value=" << toString(counter2->retrieve_activity_weight()) << " count=" << counter2->count(); + throw cta::exception::Exception(err.str()); + } + } +} + +//------------------------------------------------------------------------------ +// RetrieveActivityCountMap::getActivities() +//------------------------------------------------------------------------------ +std::list<RetrieveActivityDescription> RetrieveActivityCountMap::getActivities(uint64_t priority) { + std::list<RetrieveActivityDescription> ret; + for (auto & ad: m_activityCountMap) { + if (ad.retrieve_activity_weight().priority() == priority) + ret.push_back({ad.retrieve_activity_weight().priority(), ad.retrieve_activity_weight().disk_instance_name(), + ad.retrieve_activity_weight().activity(), ad.retrieve_activity_weight().creation_time(), + ad.retrieve_activity_weight().weight(), ad.count()}); + } + return ret; +} + + +//------------------------------------------------------------------------------ +// RetrieveActivityCountMap::clear() +//------------------------------------------------------------------------------ +void RetrieveActivityCountMap::clear() { + m_activityCountMap.Clear(); +} + +//------------------------------------------------------------------------------ +// operator==() +//------------------------------------------------------------------------------ +bool operator==(const serializers::RetrieveActivityCountPair & serialized, const RetrieveActivityDescription & memory) { + return (serialized.retrieve_activity_weight().priority() == memory.priority) + && (serialized.retrieve_activity_weight().disk_instance_name() == memory.diskInstanceName) + && (serialized.retrieve_activity_weight().activity() == memory.activity); +} + +//------------------------------------------------------------------------------ +// toString() +//------------------------------------------------------------------------------ +std::string toString(const RetrieveActivityDescription & ad) { + serializers::RetrieveActivityWeight raw; + raw.set_priority(ad.priority); + raw.set_disk_instance_name(ad.diskInstanceName); + raw.set_activity(ad.activity); + raw.set_creation_time(ad.creationTime); + raw.set_weight(ad.weight); + return toString(raw); +} + +//------------------------------------------------------------------------------ +// toString() +//------------------------------------------------------------------------------ +std::string toString(const serializers::RetrieveActivityWeight & raw){ + using namespace google::protobuf::util; + + std::string json; + JsonPrintOptions options; + + options.always_print_primitive_fields = true; + MessageToJsonString(raw, &json, options); + + return json; +} + + +}} // namespace cta::objectstore. \ No newline at end of file diff --git a/objectstore/RetrieveActivityCountMap.hpp b/objectstore/RetrieveActivityCountMap.hpp new file mode 100644 index 0000000000..6391214336 --- /dev/null +++ b/objectstore/RetrieveActivityCountMap.hpp @@ -0,0 +1,56 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "objectstore/cta.pb.h" + +#include <list> + +namespace cta { namespace objectstore { + +struct RetrieveActivityDescription { + uint64_t priority; + std::string diskInstanceName; + std::string activity; + time_t creationTime; + double weight; + uint64_t count; +}; + +/** A helper class allowing manipulation of arrays of ValueCountPairs, used as containers for running + * counters for properties with multiple possible values. When considering the retrieve mounts, all activities + * will be considered for the same mount (and highest priority one will be accounted). So this class does not + * select any and gives the full list in getActivities(). Having multiple activities sharing the drive is not + * expected to be a frequent occurrence. */ +class RetrieveActivityCountMap { +public: + RetrieveActivityCountMap (google::protobuf::RepeatedPtrField<serializers::RetrieveActivityCountPair>* retrieveActivityCountMap); + void incCount(const RetrieveActivityDescription & activityDescription); + void decCount(const RetrieveActivityDescription & activityDescription); + void clear(); + std::list<RetrieveActivityDescription> getActivities(uint64_t priority); +private: + google::protobuf::RepeatedPtrField<serializers::RetrieveActivityCountPair>& m_activityCountMap; +}; + +std::string toString(const RetrieveActivityDescription &); +std::string toString(const serializers::RetrieveActivityWeight &); +bool operator==(const serializers::RetrieveActivityCountPair &, const RetrieveActivityDescription &); + +}} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index 7eada734fc..755e485503 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -22,6 +22,7 @@ #include "EntryLogSerDeser.hpp" #include "ValueCountMap.hpp" #include "AgentReference.hpp" +#include "RetrieveActivityCountMap.hpp" #include <google/protobuf/util/json_util.h> namespace cta { namespace objectstore { @@ -287,6 +288,7 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + RetrieveActivityCountMap retrieveActivityCountMap(m_payload.mutable_activity_map()); // We need to figure out which job will be added to which shard. // We might have to split shards if they would become too big. // For a given jobs, there a 4 possible cases: @@ -462,6 +464,9 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer maxDriveAllowedMap.incCount(j.policy.maxDrivesAllowed); priorityMap.incCount(j.policy.retrievePriority); minRetrieveRequestAgeMap.incCount(j.policy.retrieveMinRequestAge); + if (j.activityDescription) { + retrieveActivityCountMap.incCount(j.activityDescription.value()); + } // oldestjobcreationtime is initialized to 0 when if (m_payload.oldestjobcreationtime()) { if ((uint64_t)j.startTime < m_payload.oldestjobcreationtime()) @@ -568,6 +573,10 @@ RetrieveQueue::JobsSummary RetrieveQueue::getJobsSummary() { ret.priority = priorityMap.maxValue(); ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); ret.minRetrieveRequestAge = minRetrieveRequestAgeMap.minValue(); + RetrieveActivityCountMap retrieveActivityCountMap(m_payload.mutable_activity_map()); + for (auto ra: retrieveActivityCountMap.getActivities(ret.priority)) { + ret.activityCounts.push_back({ra.diskInstanceName, ra.activity, ra.weight, ra.count}); + } } else { ret.maxDrivesAllowed = 0; ret.priority = 0; @@ -646,6 +655,7 @@ void RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemo ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap()); + RetrieveActivityCountMap retrieveActivityCountMap(m_payload.mutable_activity_map()); // Make a working copy of the jobs to remove. We will progressively trim this local list. auto localJobsToRemove = jobsToRemove; // The jobs are expected to be removed from the front shards first (poped in order) @@ -672,6 +682,14 @@ void RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemo maxDriveAllowedMap.decCount(j.maxDrivesAllowed); priorityMap.decCount(j.priority); minRetrieveRequestAgeMap.decCount(j.minRetrieveRequestAge); + if (j.activityDescription) { + // We have up a partial activity description, but this is enough to decCount. + RetrieveActivityDescription activityDescription; + activityDescription.priority = j.priority; + activityDescription.diskInstanceName = j.activityDescription.value().diskInstanceName; + activityDescription.activity = j.activityDescription.value().activity; + retrieveActivityCountMap.decCount(activityDescription); + } } // In all cases, we should update the global statistics. m_payload.set_retrievejobscount(m_payload.retrievejobscount() - removalResult.jobsRemoved); diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index 5db6288534..d9566754e4 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -22,6 +22,7 @@ #include "objectstore/cta.pb.h" #include "RetrieveRequest.hpp" #include "scheduler/RetrieveRequestDump.hpp" +#include "RetrieveActivityCountMap.hpp" namespace cta { namespace objectstore { @@ -65,6 +66,7 @@ public: uint64_t fileSize; cta::common::dataStructures::MountPolicy policy; time_t startTime; + optional<RetrieveActivityDescription> activityDescription; }; void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc); // This version will check for existence of the job in the queue before @@ -82,6 +84,13 @@ public: uint64_t priority; uint64_t minRetrieveRequestAge; uint64_t maxDrivesAllowed; + struct ActivityCount { + std::string diskInstanceName; + std::string activity; + double weight; + uint64_t count; + }; + std::list<ActivityCount> activityCounts; }; JobsSummary getJobsSummary(); struct JobDump { @@ -148,36 +157,12 @@ private: uint64_t m_maxShardSize = c_defaultMaxShardSize; }; -class RetrieveQueueToTransferForUser : public RetrieveQueue { -public: - template<typename...Ts> RetrieveQueueToTransferForUser(Ts&...args): RetrieveQueue(args...) {} -}; - -class RetrieveQueueToReportForUser : public RetrieveQueue { -public: - template<typename...Ts> RetrieveQueueToReportForUser(Ts&...args): RetrieveQueue(args...) {} -}; - -class RetrieveQueueFailed : public RetrieveQueue { -public: - template<typename...Ts> RetrieveQueueFailed(Ts&...args): RetrieveQueue(args...) {} -}; - -class RetrieveQueueToReportToRepackForSuccess : public RetrieveQueue { -public: - template<typename...Ts> RetrieveQueueToReportToRepackForSuccess(Ts&...args): RetrieveQueue(args...) {} -}; - -class RetrieveQueueToReportToRepackForFailure: public RetrieveQueue{ -public: - template<typename...Ts> RetrieveQueueToReportToRepackForFailure(Ts&...args): RetrieveQueue(args...) {} -}; - -class RetrieveQueueToTransferForRepack : public RetrieveQueue { -public: - template<typename...Ts> RetrieveQueueToTransferForRepack(Ts&...args): RetrieveQueue(args...) {} -}; - +class RetrieveQueueToTransferForUser : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; }; +class RetrieveQueueToReportForUser : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; }; +class RetrieveQueueFailed : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; }; +class RetrieveQueueToReportToRepackForSuccess : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; }; +class RetrieveQueueToReportToRepackForFailure: public RetrieveQueue { using RetrieveQueue::RetrieveQueue; }; +class RetrieveQueueToTransferForRepack : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; }; }} diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 1cb0627f45..45dd12db5e 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -29,7 +29,9 @@ struct ContainerTraits<RetrieveQueue,C> { struct ContainerSummary : public RetrieveQueue::JobsSummary { ContainerSummary() : RetrieveQueue::JobsSummary() {} - ContainerSummary(const RetrieveQueue::JobsSummary &c) : RetrieveQueue::JobsSummary({c.jobs,c.bytes,c.oldestJobStartTime,c.priority,c.minRetrieveRequestAge,c.maxDrivesAllowed}) {} + ContainerSummary(const RetrieveQueue::JobsSummary &c) : + RetrieveQueue::JobsSummary({c.jobs,c.bytes,c.oldestJobStartTime,c.priority, + c.minRetrieveRequestAge,c.maxDrivesAllowed,c.activityCounts}) {} void addDeltaToLog(const ContainerSummary&, log::ScopedParamContainer&) const; }; @@ -42,6 +44,7 @@ struct ContainerTraits<RetrieveQueue,C> uint64_t filesize; cta::common::dataStructures::MountPolicy policy; serializers::RetrieveJobStatus status; + optional<RetrieveActivityDescription> activityDescription; typedef std::list<InsertedElement> list; }; @@ -276,7 +279,7 @@ addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemC std::list<RetrieveQueue::JobToAdd> jobsToAdd; for (auto &e : elemMemCont) { RetrieveRequest &rr = *e.retrieveRequest; - jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr)}); + jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription}); } cont.addJobsAndCommit(jobsToAdd, agentRef, lc); } @@ -289,7 +292,7 @@ addReferencesIfNecessaryAndCommit(Container& cont, typename InsertedElement::lis std::list<RetrieveQueue::JobToAdd> jobsToAdd; for (auto &e : elemMemCont) { RetrieveRequest &rr = *e.retrieveRequest; - jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr)}); + jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription}); } cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc); } diff --git a/objectstore/RetrieveQueueShard.cpp b/objectstore/RetrieveQueueShard.cpp index 96d6aee890..76ebe1e5c3 100644 --- a/objectstore/RetrieveQueueShard.cpp +++ b/objectstore/RetrieveQueueShard.cpp @@ -96,12 +96,15 @@ auto RetrieveQueueShard::removeJobs(const std::list<std::string>& jobsToRemove) const auto & j = jl->Get(i); ret.removedJobs.emplace_back(JobInfo()); ret.removedJobs.back().address = j.address(); + ret.removedJobs.back().fSeq = j.fseq(); ret.removedJobs.back().copyNb = j.copynb(); ret.removedJobs.back().maxDrivesAllowed = j.maxdrivesallowed(); ret.removedJobs.back().minRetrieveRequestAge = j.minretrieverequestage(); ret.removedJobs.back().priority = j.priority(); ret.removedJobs.back().size = j.size(); ret.removedJobs.back().startTime = j.starttime(); + if (j.has_activity()) + ret.removedJobs.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() }; ret.bytesRemoved += j.size(); totalSize -= j.size(); ret.jobsRemoved++; @@ -136,7 +139,10 @@ auto RetrieveQueueShard::dumpJobs() -> std::list<JobInfo> { std::list<JobInfo> ret; for (auto &j: m_payload.retrievejobs()) { ret.emplace_back(JobInfo{j.size(), j.address(), (uint16_t)j.copynb(), j.priority(), - j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq()}); + j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq(), nullopt}); + if (j.has_activity()) { + ret.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() }; + } } return ret; } @@ -154,6 +160,12 @@ std::list<RetrieveQueue::JobToAdd> RetrieveQueueShard::dumpJobsToAdd() { ret.back().policy.retrievePriority = j.priority(); ret.back().startTime = j.starttime(); ret.back().retrieveRequestAddress = j.address(); + if (j.has_activity()) { + RetrieveActivityDescription rad; + rad.diskInstanceName = j.disk_instance_name(); + rad.activity = j.activity(); + ret.back().activityDescription = rad; + } } return ret; } @@ -252,6 +264,10 @@ void RetrieveQueueShard::addJob(const RetrieveQueue::JobToAdd& jobToAdd) { j->set_maxdrivesallowed(jobToAdd.policy.maxDrivesAllowed); j->set_priority(jobToAdd.policy.retrievePriority); j->set_minretrieverequestage(jobToAdd.policy.retrieveMinRequestAge); + if (jobToAdd.activityDescription) { + j->set_disk_instance_name(jobToAdd.activityDescription.value().diskInstanceName); + j->set_activity(jobToAdd.activityDescription.value().activity); + } m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize()+jobToAdd.fileSize); // Sort the shard size_t jobIndex = m_payload.retrievejobs_size() - 1; @@ -284,6 +300,10 @@ void RetrieveQueueShard::addJobsThroughCopy(JobsToAddSet& jobsToAdd) { rjp.set_maxdrivesallowed(jobToAdd.policy.maxDrivesAllowed); rjp.set_priority(jobToAdd.policy.retrievePriority); rjp.set_minretrieverequestage(jobToAdd.policy.retrieveMinRequestAge); + if (jobToAdd.activityDescription) { + rjp.set_disk_instance_name(jobToAdd.activityDescription.value().diskInstanceName); + rjp.set_activity(jobToAdd.activityDescription.value().activity); + } i = serializedJobsToAdd.insert(i, rjp); totalSize+=jobToAdd.fileSize; } @@ -296,8 +316,4 @@ void RetrieveQueueShard::addJobsThroughCopy(JobsToAddSet& jobsToAdd) { m_payload.set_retrievejobstotalsize(totalSize); } - - - - }} \ No newline at end of file diff --git a/objectstore/RetrieveQueueShard.hpp b/objectstore/RetrieveQueueShard.hpp index 01c2317408..7da9d10d72 100644 --- a/objectstore/RetrieveQueueShard.hpp +++ b/objectstore/RetrieveQueueShard.hpp @@ -53,6 +53,11 @@ public: uint64_t maxDrivesAllowed; time_t startTime; uint64_t fSeq; + struct ActivityDescription { + std::string diskInstanceName; + std::string activity; + }; + optional<ActivityDescription> activityDescription; }; std::list<JobInfo> dumpJobs(); diff --git a/objectstore/RetrieveQueueTest.cpp b/objectstore/RetrieveQueueTest.cpp index c9f114016e..098114e628 100644 --- a/objectstore/RetrieveQueueTest.cpp +++ b/objectstore/RetrieveQueueTest.cpp @@ -89,8 +89,8 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) { rq.insert(); } { - // Read the queue and insert jobs 3 by 3 (the insertion size is - // expected to be << shard size (5 here). + // Read the queue and insert jobs 10 by 10 (the insertion size is + // expected to be << shard size (25 here). auto jobsToAddNow = jobsToAdd; while (jobsToAddNow.size()) { std::list<cta::objectstore::RetrieveQueue::JobToAdd> jobsBatch; @@ -154,4 +154,140 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) { ASSERT_FALSE(rq.exists()); } +TEST(ObjectStore, RetrieveQueueActivityCounts) { + cta::objectstore::BackendVFS be; + cta::log::DummyLogger dl("dummy", "dummyLogger"); + cta::log::LogContext lc(dl); + cta::objectstore::AgentReference agentRef("unitTest", dl); + std::mt19937 gen((std::random_device())()); + // Create 1000 jobs references. + std::list<cta::objectstore::RetrieveQueue::JobToAdd> jobsToAdd; + const size_t totalJobs = 100, shardSize=25, batchSize=10; + for (size_t i=0; i<totalJobs; i++) { + cta::objectstore::RetrieveQueue::JobToAdd jta; + jta.copyNb = 1; + jta.fSeq = i; + jta.fileSize = 1000; + jta.policy.maxDrivesAllowed = 10; + jta.policy.retrieveMinRequestAge = 10; + jta.policy.retrievePriority = 1; + jta.startTime = ::time(nullptr); + std::stringstream address; + address << "someRequest-" << i; + jta.retrieveRequestAddress = address.str(); + // Some (but not all) jobs will be assigned an activity (and weight). + if (!(i % 3)) { + cta::objectstore::RetrieveActivityDescription ad; + ad.diskInstanceName = "diskInstance"; + ad.creationTime = jta.startTime; + ad.priority = 1; + if (!(i % 2)) { + ad.activity = "A"; + ad.weight = 0.1; + } else { + ad.activity = "B"; + ad.weight = 0.2; + } + jta.activityDescription = ad; + } + jobsToAdd.push_back(jta); + } + // By construction, first job has lowest start time. + auto minStartTime=jobsToAdd.front().startTime; + std::string retrieveQueueAddress = agentRef.nextId("RetrieveQueue"); + { + // Try to create the retrieve queue + cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); + rq.initialize("V12345"); + // Set a small shard size to validate multi shard behaviors + rq.setShardSize(shardSize); + rq.insert(); + } + { + // Read the queue and insert jobs 10 by 10 (the insertion size is + // expected to be << shard size (25 here). + auto jobsToAddNow = jobsToAdd; + while (jobsToAddNow.size()) { + std::list<cta::objectstore::RetrieveQueue::JobToAdd> jobsBatch; + for (size_t i=0; i<batchSize; i++) { + if (jobsToAddNow.size()) { + auto j=std::next(jobsToAddNow.begin(), (std::uniform_int_distribution<size_t>(0, jobsToAddNow.size() -1))(gen)); + jobsBatch.emplace_back(*j); + jobsToAddNow.erase(j); + } + } + cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + rq.addJobsAndCommit(jobsBatch, agentRef, lc); + } + } + { + // Try to read back + cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); + ASSERT_THROW(rq.fetch(), cta::exception::Exception); + cta::objectstore::ScopedExclusiveLock lock(rq); + ASSERT_NO_THROW(rq.fetch()); + // Pop jobs while we can. They should come out in fseq order as there is + // no interleaved push and pop. + auto jobsSummary = rq.getJobsSummary(); + ASSERT_EQ(minStartTime, jobsSummary.oldestJobStartTime); + // File fSeqs are in [0, 99], 34 multiples of 3 (0 included) odds are activity A, evens are B, 17 each. + ASSERT_EQ(2, jobsSummary.activityCounts.size()); + typedef decltype(jobsSummary.activityCounts.front()) acCount; + auto jsA = std::find_if(jobsSummary.activityCounts.begin(), jobsSummary.activityCounts.end(), [](const acCount &ac){return ac.activity == "A"; }); + ASSERT_NE(jobsSummary.activityCounts.end(), jsA); + ASSERT_EQ(17, jsA->count); + ASSERT_EQ(0.1, jsA->weight); + auto jsB = std::find_if(jobsSummary.activityCounts.begin(), jobsSummary.activityCounts.end(), [](const acCount &ac){return ac.activity == "B"; }); + ASSERT_NE(jobsSummary.activityCounts.end(), jsB); + ASSERT_EQ(17, jsB->count); + ASSERT_EQ(0.2, jsB->weight); + uint64_t nextExpectedFseq=0; + while (rq.getJobsSummary().jobs) { + auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>()); + std::set<std::string> jobsToSkip; + std::list<std::string> jobsToDelete; + for (auto &j: candidateJobs.candidates) { + std::stringstream address; + address << "someRequest-" << nextExpectedFseq; + ASSERT_EQ(address.str(), j.address); + jobsToSkip.insert(j.address); + jobsToDelete.emplace_back(j.address); + nextExpectedFseq++; + } + auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip); + if (candidateJobs2.candidateFiles) { + std::stringstream address; + address << "someRequest-" << nextExpectedFseq; + ASSERT_EQ(address.str(), candidateJobs2.candidates.front().address); + } + rq.removeJobsAndCommit(jobsToDelete); + // We should empty the queue in 2 rounds. After the first one, we get the jobs 0-49 out. + auto jobsSummary2 = rq.getJobsSummary(); + if (jobsSummary2.jobs) { + auto jsA2 = std::find_if(jobsSummary2.activityCounts.begin(), jobsSummary2.activityCounts.end(), [](const acCount &ac){return ac.activity == "A"; }); + ASSERT_NE(jobsSummary2.activityCounts.end(), jsA2); + ASSERT_EQ(8, jsA2->count); + ASSERT_EQ(0.1, jsA2->weight); + auto jsB2 = std::find_if(jobsSummary2.activityCounts.begin(), jobsSummary2.activityCounts.end(), [](const acCount &ac){return ac.activity == "B"; }); + ASSERT_NE(jobsSummary2.activityCounts.end(), jsB2); + ASSERT_EQ(9, jsB2->count); + ASSERT_EQ(0.2, jsB2->weight); + } else { + // Of course, we should have no activity. + ASSERT_EQ(0, jobsSummary2.activityCounts.size()); + } + } + ASSERT_EQ(nextExpectedFseq, totalJobs); + } + + // Delete the root entry + cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); + cta::objectstore::ScopedExclusiveLock lock(rq); + rq.fetch(); + rq.removeIfEmpty(lc); + ASSERT_FALSE(rq.exists()); +} + } diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 7325e3fc71..14efe9a7cd 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -158,7 +158,16 @@ queueForFailure:; objectstore::MountPolicySerDeser mp; std::list<RetrieveQueue::JobToAdd> jta; jta.push_back({activeCopyNb, activeFseq, getAddressIfSet(), m_payload.archivefile().filesize(), - mp, (signed)m_payload.schedulerrequest().entrylog().time()}); + mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt}); + if (m_payload.has_activity_weight()) { + RetrieveActivityDescription activityDescription; + activityDescription.priority = m_payload.activity_weight().priority(); + activityDescription.diskInstanceName = m_payload.activity_weight().disk_instance_name(); + activityDescription.activity = m_payload.activity_weight().activity(); + activityDescription.weight = m_payload.activity_weight().weight(); + activityDescription.creationTime = m_payload.activity_weight().creation_time(); + jta.back().activityDescription = activityDescription; + } rq.addJobsIfNecessaryAndCommit(jta, agentReference, lc); auto queueUpdateTime = t.secs(utils::Timer::resetCounter); // We can now make the transition official. @@ -217,7 +226,16 @@ queueForTransfer:; mp.deserialize(m_payload.mountpolicy()); std::list<RetrieveQueue::JobToAdd> jta; jta.push_back({bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(), - mp, (signed)m_payload.schedulerrequest().entrylog().time()}); + mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt}); + if (m_payload.has_activity_weight()) { + RetrieveActivityDescription activityDescription; + activityDescription.priority = m_payload.activity_weight().priority(); + activityDescription.diskInstanceName = m_payload.activity_weight().disk_instance_name(); + activityDescription.activity = m_payload.activity_weight().activity(); + activityDescription.weight = m_payload.activity_weight().weight(); + activityDescription.creationTime = m_payload.activity_weight().creation_time(); + jta.back().activityDescription = activityDescription; + } rq.addJobsIfNecessaryAndCommit(jta, agentReference, lc); auto jobsSummary=rq.getJobsSummary(); auto queueUpdateTime = t.secs(utils::Timer::resetCounter); @@ -432,6 +450,40 @@ void RetrieveRequest::setRetrieveFileQueueCriteria(const cta::common::dataStruct } } +//------------------------------------------------------------------------------ +// RetrieveRequest::setActivityIfNeeded() +//------------------------------------------------------------------------------ +void RetrieveRequest::setActivityIfNeeded(const cta::common::dataStructures::RetrieveRequest& retrieveRequest, + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria) { + checkPayloadWritable(); + if (retrieveRequest.activity) { + auto * activity = m_payload.mutable_activity_weight(); + activity->set_priority(criteria.mountPolicy.retrievePriority); + activity->set_activity(retrieveRequest.activity.value()); + activity->set_disk_instance_name(criteria.activitiesFairShareWeight.diskInstance); + activity->set_weight(criteria.activitiesFairShareWeight.activitiesWeights.at(retrieveRequest.activity.value())); + activity->set_creation_time(retrieveRequest.creationLog.time); + } +} + +//------------------------------------------------------------------------------ +// RetrieveRequest::getActivity() +//------------------------------------------------------------------------------ +optional<RetrieveActivityDescription> RetrieveRequest::getActivity() { + checkPayloadReadable(); + optional<RetrieveActivityDescription> ret; + if (m_payload.has_activity_weight()) { + RetrieveActivityDescription activity; + activity.priority = m_payload.activity_weight().priority(); + activity.diskInstanceName = m_payload.activity_weight().activity(); + activity.activity = m_payload.activity_weight().activity(); + activity.priority = m_payload.activity_weight().weight(); + activity.creationTime = m_payload.activity_weight().creation_time(); + ret = activity; + } + return ret; +} + //------------------------------------------------------------------------------ // RetrieveRequest::dumpJobs() //------------------------------------------------------------------------------ diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 67e287f7c9..ec82378e1b 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -22,6 +22,7 @@ #include "objectstore/cta.pb.h" #include "TapeFileSerDeser.hpp" #include "JobQueueType.hpp" +#include "RetrieveActivityCountMap.hpp" #include <list> #include "common/dataStructures/DiskFileInfo.hpp" #include "common/dataStructures/EntryLog.hpp" @@ -234,6 +235,9 @@ public: cta::common::dataStructures::RetrieveRequest getSchedulerRequest(); void setRetrieveFileQueueCriteria(const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria); + void setActivityIfNeeded(const cta::common::dataStructures::RetrieveRequest & retrieveRequest, + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria); + optional<RetrieveActivityDescription> getActivity(); cta::common::dataStructures::RetrieveFileQueueCriteria getRetrieveFileQueueCriteria(); cta::common::dataStructures::ArchiveFile getArchiveFile(); cta::common::dataStructures::EntryLog getEntryLog(); diff --git a/objectstore/Sorter.cpp b/objectstore/Sorter.cpp index 606e3fa041..c1159267d2 100644 --- a/objectstore/Sorter.cpp +++ b/objectstore/Sorter.cpp @@ -159,7 +159,7 @@ void Sorter::executeRetrieveAlgorithm(const std::string vid, std::string& queueA Sorter::RetrieveJob job = std::get<0>(jobToAdd->jobToQueue); succeededJobs[job.jobDump.copyNb] = jobToAdd; previousOwner = job.previousOwner->getAgentAddress(); - jobsToAdd.push_back({job.retrieveRequest.get(),job.jobDump.copyNb,job.fSeq,job.fileSize,job.mountPolicy,job.jobDump.status}); + jobsToAdd.push_back({job.retrieveRequest.get(),job.jobDump.copyNb,job.fSeq,job.fileSize,job.mountPolicy,job.jobDump.status,job.activityDescription}); } try{ algo.referenceAndSwitchOwnershipIfNecessary(vid,previousOwner,queueAddress,jobsToAdd,lc); diff --git a/objectstore/Sorter.hpp b/objectstore/Sorter.hpp index ddf8d57c58..b68b1a8e46 100644 --- a/objectstore/Sorter.hpp +++ b/objectstore/Sorter.hpp @@ -125,6 +125,7 @@ public: uint64_t fSeq; common::dataStructures::MountPolicy mountPolicy; cta::objectstore::JobQueueType jobQueueType; + optional<RetrieveActivityDescription> activityDescription; }; /** diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 9b8fefea13..a80c6d002a 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -390,15 +390,24 @@ message RetrieveRequestRepackInfo { } // The different timings of the lifecycle of a RetrieveRequest (creation time, first select time, request complete) -message LifecycleTimings{ +message LifecycleTimings { optional uint64 creation_time = 9160 [default = 0]; optional uint64 first_selected_time = 9161 [default = 0]; optional uint64 completed_time = 9162 [default = 0]; } +message RetrieveActivityWeight { + required uint64 priority = 9170; + required string disk_instance_name = 9171; + required string activity = 9172; + required double weight = 9173; + required int64 creation_time = 9174; +} + message RetrieveRequest { required SchedulerRetrieveRequest schedulerrequest = 9150; required MountPolicy mountpolicy = 9151; + optional RetrieveActivityWeight activity_weight = 9160; required ArchiveFile archivefile = 9152; required uint32 activecopynb = 9153; repeated RetrieveJob jobs = 9154; @@ -449,6 +458,7 @@ message ArchiveQueue { } message RetrieveJobPointer { + // The retrieve job pointer needs to hold the sufficient information for all the running counters of the queue (priority, activity...) required uint64 size = 3101; required string address = 3102; required uint32 copynb = 3103; @@ -457,6 +467,9 @@ message RetrieveJobPointer { required uint64 minretrieverequestage = 3105; required uint64 maxdrivesallowed = 3106; required uint64 starttime = 3108; + // For activity (if present), we need disk instance and activity name (priority is always provided) + optional string disk_instance_name = 3109; + optional string activity = 3110; } message RetrieveQueueShardPointer { @@ -472,12 +485,18 @@ message RetrieveQueueShard { required uint64 retrievejobstotalsize = 10501; } +message RetrieveActivityCountPair { + required RetrieveActivityWeight retrieve_activity_weight = 10600; + required uint64 count = 10601; +} + message RetrieveQueue { required string vid = 10100; repeated RetrieveQueueShardPointer retrievequeueshards = 10111; repeated ValueCountPair prioritymap = 10131; repeated ValueCountPair minretrieverequestagemap = 10132; repeated ValueCountPair maxdrivesallowedmap = 10133; + repeated RetrieveActivityCountPair activity_map = 10136; required uint64 retrievejobstotalsize = 10140; required uint64 retrievejobscount = 10145; required uint64 oldestjobcreationtime = 10150; diff --git a/scheduler/OStoreDB/MemQueues.cpp b/scheduler/OStoreDB/MemQueues.cpp index 8bef86b8a4..7a1cafd3a8 100644 --- a/scheduler/OStoreDB/MemQueues.cpp +++ b/scheduler/OStoreDB/MemQueues.cpp @@ -55,7 +55,7 @@ void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::special if (j.copyNb == job.copyNb) { auto criteria = request.getRetrieveFileQueueCriteria(); jtal.push_back({j.copyNb, j.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize, - criteria.mountPolicy, request.getEntryLog().time}); + criteria.mountPolicy, request.getEntryLog().time, request.getActivity()}); request.setActiveCopyNumber(j.copyNb); request.setOwner(queueAddress); goto jobAdded; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index b378b9e318..a5d1f27de1 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -31,6 +31,7 @@ #include "objectstore/Sorter.hpp" #include "objectstore/Helpers.hpp" #include "common/exception/Exception.hpp" +#include "common/exception/UserError.hpp" #include "common/utils/utils.hpp" #include "scheduler/LogicalLibrary.hpp" #include "common/dataStructures/MountPolicy.hpp" @@ -1058,7 +1059,8 @@ void OStoreDB::setRetrieveJobBatchReportedToUser(std::list<cta::SchedulerDatabas ); insertedElements.emplace_back(CaRQF::InsertedElement{ &j.job->m_retrieveRequest, tf_it->copyNb, tf_it->fSeq, tf_it->compressedSize, - common::dataStructures::MountPolicy(), serializers::RetrieveJobStatus::RJS_Failed + common::dataStructures::MountPolicy(), serializers::RetrieveJobStatus::RJS_Failed, + j.job->m_activityDescription }); } try { @@ -1084,17 +1086,37 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue //------------------------------------------------------------------------------ // OStoreDB::queueRetrieve() //------------------------------------------------------------------------------ -std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst, +std::string OStoreDB::queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, log::LogContext &logContext) { assertAgentAddressSet(); auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>(); cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind); - auto *mutexForHelgrindAddr = mutexForHelgrind.release(); cta::utils::Timer timer; // Get the best vid from the cache std::set<std::string> candidateVids; for (auto & tf:criteria.archiveFile.tapeFiles) candidateVids.insert(tf.vid); std::string bestVid=Helpers::selectBestRetrieveQueue(candidateVids, m_catalogue, m_objectStore); + // Check that the activity is fine (if applying: disk instance uses them or it is sent). + if (rqst.activity || criteria.activitiesFairShareWeight.activitiesWeights.size()) { + // Activity is set. It should exist in the catlogue + if (rqst.activity) { + try { + criteria.activitiesFairShareWeight.activitiesWeights.at(rqst.activity.value()); + } catch (std::out_of_range &) { + throw cta::exception::UserError(std::string("Unknown fair share activity \"") + rqst.activity.value() + "\" for disk instance \"" + + criteria.activitiesFairShareWeight.diskInstance + "\""); + } + } else { + try { + criteria.activitiesFairShareWeight.activitiesWeights.at("default"); + rqst.activity = "default"; + } catch (std::out_of_range &) { + throw cta::exception::UserError( + std::string("Missing fair share activity \"default\" while queuing with undefined activity for disk instance \"") + + criteria.activitiesFairShareWeight.diskInstance + "\""); + } + } + } // Check that the requested retrieve job (for the provided vid) exists, and record the copynb. uint64_t bestCopyNb; for (auto & tf: criteria.archiveFile.tapeFiles) { @@ -1115,6 +1137,7 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR rReq->initialize(); rReq->setSchedulerRequest(rqst); rReq->setRetrieveFileQueueCriteria(criteria); + rReq->setActivityIfNeeded(rqst, criteria); rReq->setCreationTime(rqst.creationLog.time); // Find the job corresponding to the vid (and check we indeed have one). auto jobs = rReq->getJobs(); @@ -1158,6 +1181,7 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR .add("insertionTime", insertionTime); delayIfNecessary(logContext); auto rReqPtr = rReq.release(); + auto *mutexForHelgrindAddr = mutexForHelgrind.release(); auto * et = new EnqueueingTask([rReqPtr, job, bestVid, mutexForHelgrindAddr, this]{ std::unique_ptr<cta::threading::Mutex> mutexForHelgrind(mutexForHelgrindAddr); std::unique_ptr<objectstore::RetrieveRequest> rReq(rReqPtr); @@ -3527,7 +3551,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD insertedRequests.push_back(RQTRTRFSAlgo::InsertedElement{&req->m_retrieveRequest, req->selectedCopyNb, req->archiveFile.tapeFiles.at(req->selectedCopyNb).fSeq, req->archiveFile.fileSize, cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, - serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess}); + serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription}); requestToJobMap[&req->m_retrieveRequest] = req; } RQTRTRFSAlgo rQTRTRFSAlgo(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); @@ -4406,7 +4430,8 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ - &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_Failed + &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, + serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription }); m_retrieveRequest.commit(); rel.release(); @@ -4472,7 +4497,8 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log:: CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ - &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_ToTransferForUser + &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_ToTransferForUser, + m_activityDescription }); CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); @@ -4535,7 +4561,8 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ - &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure + &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, + serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure, m_activityDescription }); caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc); log::ScopedParamContainer params(lc); @@ -4553,7 +4580,8 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); CaRqtr::InsertedElement::list insertedElements; insertedElements.push_back(CaRqtr::InsertedElement{ - &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_Failed + &m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, + serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription }); caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc); log::ScopedParamContainer params(lc); diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 97c4ad316a..a91002d995 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -29,6 +29,7 @@ #include "objectstore/ArchiveRequest.hpp" #include "objectstore/DriveRegister.hpp" #include "objectstore/RetrieveRequest.hpp" +#include "objectstore/RetrieveActivityCountMap.hpp" #include "objectstore/RepackQueue.hpp" #include "objectstore/RepackRequest.hpp" #include "objectstore/SchedulerGlobalLock.hpp" @@ -256,6 +257,7 @@ public: std::unique_ptr<objectstore::RetrieveRequest::AsyncJobDeleter> m_jobDelete; std::unique_ptr<objectstore::RetrieveRequest::AsyncJobSucceedForRepackReporter> m_jobSucceedForRepackReporter; objectstore::RetrieveRequest::RepackInfo m_repackInfo; + optional<objectstore::RetrieveActivityDescription> m_activityDescription; }; static RetrieveJob * castFromSchedDBJob(SchedulerDatabase::RetrieveJob * job); @@ -294,7 +296,7 @@ public: CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies); CTA_GENERATE_EXCEPTION_CLASS(TapeCopyNumberOutOfRange); - std::string queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst, + std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) override; std::list<RetrieveRequestDump> getRetrieveRequestsByVid(const std::string& vid) const override; diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index b578aedce0..bb9cb26357 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -202,7 +202,7 @@ public: return m_OStoreDB.getRetrieveQueueStatistics(criteria, vidsToConsider); } - std::string queueRetrieve(const common::dataStructures::RetrieveRequest& rqst, + std::string queueRetrieve(common::dataStructures::RetrieveRequest& rqst, const common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) override { return m_OStoreDB.queueRetrieve(rqst, criteria, logContext); } diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 8c9ba58557..0fc7634220 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -203,7 +203,7 @@ void Scheduler::queueArchiveRequestForRepackBatch(std::list<cta::objectstore::Ar //------------------------------------------------------------------------------ void Scheduler::queueRetrieve( const std::string &instanceName, - const common::dataStructures::RetrieveRequest &request, + common::dataStructures::RetrieveRequest &request, log::LogContext & lc) { using utils::postEllipsis; using utils::midEllipsis; @@ -250,10 +250,11 @@ void Scheduler::queueRetrieve( .add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed) .add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge) .add("policyPriority", queueCriteria.mountPolicy.retrievePriority); + if (request.activity) + spc.add("activity", request.activity.value()); lc.log(log::INFO, "Queued retrieve request"); } - //------------------------------------------------------------------------------ // deleteArchive //------------------------------------------------------------------------------ diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index d19820ed86..375c1113ee 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -147,7 +147,7 @@ public: * Throws a UserError exception in case of wrong request parameters (ex. unknown file id) * Throws a (Non)RetryableError exception in case something else goes wrong with the request */ - void queueRetrieve(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request, + void queueRetrieve(const std::string &instanceName, cta::common::dataStructures::RetrieveRequest &request, log::LogContext &lc); /** diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index c58874553c..0122d89061 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -270,7 +270,7 @@ public: * @param logContext context allowing logging db operation * @return the selected vid (mostly for logging) */ - virtual std::string queueRetrieve(const cta::common::dataStructures::RetrieveRequest &rqst, + virtual std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest &rqst, const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) = 0; /** -- GitLab