Commit bda5f4b7 authored by Eric Cano's avatar Eric Cano
Browse files

#471 Added support for activities in RetrieveQueue.

parent c73c52a1
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ActivitiesFairShareWeights.hpp"
#include "common/exception/Exception.hpp"
namespace cta { namespace common { namespace dataStructures {
void ActivitiesFairShareWeights::setWeightFromDouble(const std::string & activity, double weight) {
if (weight < 0 || weight > 1)
throw cta::exception::Exception("In ActivitiesFairShareWeights::setWeightFromDouble(): weight out of range.");
activitiesWeights[activity] = weight;
}
void ActivitiesFairShareWeights::setWeightFromString(const std::string& activity, const std::string& sweight) {
if (sweight.empty())
throw cta::exception::Exception("In ActivitiesFairShareWeights::setWeightFromString() empty string.");
size_t pos;
double weight = std::stod(sweight, &pos);
if (pos != sweight.size())
throw cta::exception::Exception("In ActivitiesFairShareWeights::setWeightFromString(): bad format: garbage at the end of string.");
setWeightFromDouble(activity, weight);
}
}}} // namespace cta::common::dataStructures.
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <string>
#include <map>
namespace cta { namespace common { namespace dataStructures {
struct ActivitiesFairShareWeights {
std::string diskInstance;
std::map<std::string, double> activitiesWeights;
/** set the weight for the activity, checking the value is in ]0, 1] */
void setWeightFromDouble(const std::string & activity, double weight);
/** set the weight for an activity, first checking the string can be fully converted to a double, and then */
void setWeightFromString(const std::string & activity, const std::string &sweight);
};
}}}
\ No newline at end of file
......@@ -78,7 +78,7 @@ void fillRetrieveRequests(
rqc.mountPolicy.retrievePriority = 1;
requestPtrs.emplace_back(new cta::objectstore::RetrieveRequest(rrAddr, be));
requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser>::InsertedElement{
requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser
requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser, cta::nullopt
});
auto &rr = *requests.back().retrieveRequest;
rr.initialize();
......
......@@ -141,28 +141,11 @@ public:
static const uint64_t c_maxShardSize = 25000;
};
class ArchiveQueueToTransferForUser: public ArchiveQueue {
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToReportForUser: public ArchiveQueue {
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueFailed: public ArchiveQueue {
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToTransferForRepack: public ArchiveQueue{
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToReportToRepackForSuccess : public ArchiveQueue{
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToReportToRepackForFailure: public ArchiveQueue{
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToTransferForUser: public ArchiveQueue { using ArchiveQueue::ArchiveQueue; };
class ArchiveQueueToReportForUser: public ArchiveQueue { using ArchiveQueue::ArchiveQueue; };
class ArchiveQueueFailed: public ArchiveQueue { using ArchiveQueue::ArchiveQueue; };
class ArchiveQueueToTransferForRepack: public ArchiveQueue{ using ArchiveQueue::ArchiveQueue; };
class ArchiveQueueToReportToRepackForSuccess : public ArchiveQueue{ using ArchiveQueue::ArchiveQueue; };
class ArchiveQueueToReportToRepackForFailure: public ArchiveQueue{ using ArchiveQueue::ArchiveQueue; };
}}
......@@ -29,7 +29,8 @@ set (CTAProtoFiles
PROTOBUF3_GENERATE_CPP(CTAProtoSources CTAProtoHeaders ${CTAProtoFiles})
set (CTAProtoDependants objectstore/Agent.hpp
set (CTAProtoDependants
objectstore/Agent.hpp
objectstore/ArchiveRequest.hpp
objectstore/CreationLog.hpp
objectstore/DriveRegister.hpp
......@@ -39,6 +40,7 @@ set (CTAProtoDependants objectstore/Agent.hpp
objectstore/RepackIndex.hpp
objectstore/RepackRequest.hpp
objectstore/RepackQueue.hpp
objectstore/RetrieveActivityCountMap.hpp
objectstore/RetrieveRequest.hpp
objectstore/RootEntry.hpp
objectstore/SchedulerGlobalLock.hpp
......@@ -100,7 +102,8 @@ add_library (ctaobjectstore SHARED
GarbageCollector.cpp
SchedulerGlobalLock.cpp
ValueCountMap.cpp
Helpers.cpp)
Helpers.cpp
RetrieveActivityCountMap.cpp)
set_property(TARGET ctaobjectstore PROPERTY SOVERSION "${CTA_SOVERSION}")
set_property(TARGET ctaobjectstore PROPERTY VERSION "${CTA_LIBVERSION}")
......
......@@ -559,7 +559,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent&
for (auto &tf: rr->getArchiveFile().tapeFiles) {
if (tf.vid == vid) {
jta.push_back({tf.copyNb, tf.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize,
rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time});
rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity()});
}
}
}
......
......@@ -608,7 +608,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
std::list <cta::objectstore::RetrieveQueue::JobToAdd> jta;
jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time});
jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time, cta::nullopt});
rq.addJobsAndCommit(jta, agentRef, lc);
}
if (pass < 5) { pass++; continue; }
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RetrieveActivityCountMap.hpp"
#include "common/exception/Exception.hpp"
#include <algorithm>
#include <sstream>
#include <google/protobuf/util/json_util.h>
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
RetrieveActivityCountMap::RetrieveActivityCountMap(
google::protobuf::RepeatedPtrField<serializers::RetrieveActivityCountPair>* retrieveActivityCountMap):
m_activityCountMap(*retrieveActivityCountMap) { }
//------------------------------------------------------------------------------
// RetrieveActivityCountMap::incCount()
//------------------------------------------------------------------------------
void RetrieveActivityCountMap::incCount(const RetrieveActivityDescription& activityDescription) {
// Find the entry for this value (might fail)
auto counter = std::find(m_activityCountMap.begin(), m_activityCountMap.end(), activityDescription);
if (counter != m_activityCountMap.end()) {
if (counter->count() < 1) {
std::stringstream err;
err << "In ValueCountMap::incCount: unexpected count value=" << toString(counter->retrieve_activity_weight())
<< " count=" << counter->count();
throw cta::exception::Exception(err.str());
} else {
counter->set_count(counter->count()+1);
// Update the weight to the latest version (in case weights got updated since last time).
if (counter->retrieve_activity_weight().creation_time() < activityDescription.creationTime) {
counter->mutable_retrieve_activity_weight()->set_weight(activityDescription.weight);
counter->mutable_retrieve_activity_weight()->set_creation_time(activityDescription.creationTime);
}
}
} else {
// Create the new entry if necessary.
auto newCounter = m_activityCountMap.Add();
newCounter->mutable_retrieve_activity_weight()->set_priority(activityDescription.priority);
newCounter->mutable_retrieve_activity_weight()->set_disk_instance_name(activityDescription.diskInstanceName);
newCounter->mutable_retrieve_activity_weight()->set_activity(activityDescription.activity);
newCounter->mutable_retrieve_activity_weight()->set_weight(activityDescription.weight);
newCounter->mutable_retrieve_activity_weight()->set_creation_time(activityDescription.creationTime);
newCounter->set_count(1);
}
}
//------------------------------------------------------------------------------
// RetrieveActivityCountMap::decCount()
//------------------------------------------------------------------------------
void RetrieveActivityCountMap::decCount(const RetrieveActivityDescription& activityDescription) {
// Find the entry for this value. Failing is an error.
auto counter = std::find(m_activityCountMap.begin(), m_activityCountMap.end(), activityDescription);
if (counter == m_activityCountMap.end()) {
std::stringstream err;
err << "In RetrieveActivityCountMap::decCount: no entry found for value=" << toString(activityDescription);
throw cta::exception::Exception(err.str());
}
// Decrement the value and remove the entry if needed.
if (counter->count() < 1) {
std::stringstream err;
err << "In ValueCountMap::decCount: entry with wrong count value=" << toString(activityDescription) << " count=" << counter->count();
throw cta::exception::Exception(err.str());
}
counter->set_count(counter->count()-1);
if (!counter->count()) {
auto size=m_activityCountMap.size();
counter->Swap(&(*(m_activityCountMap.end()-1)));
m_activityCountMap.RemoveLast();
// Cross check that the size has decreased.
if (size -1 != m_activityCountMap.size()) {
std::stringstream err;
err << "In ValueCountMap::decCount: unexpected size after trimming empty entry. expectedSize=" << size -1 << " newSize=" << m_activityCountMap.size();
throw cta::exception::Exception(err.str());
}
// Cross check we cannot find the value.
auto counter2 = std::find(m_activityCountMap.begin(), m_activityCountMap.end(), activityDescription);
if (m_activityCountMap.end() != counter2) {
std::stringstream err;
err << "In ValueCountMap::decCount: still found the value after trimming empty entry. value=" << toString(counter2->retrieve_activity_weight()) << " count=" << counter2->count();
throw cta::exception::Exception(err.str());
}
}
}
//------------------------------------------------------------------------------
// RetrieveActivityCountMap::getActivities()
//------------------------------------------------------------------------------
std::list<RetrieveActivityDescription> RetrieveActivityCountMap::getActivities(uint64_t priority) {
std::list<RetrieveActivityDescription> ret;
for (auto & ad: m_activityCountMap) {
if (ad.retrieve_activity_weight().priority() == priority)
ret.push_back({ad.retrieve_activity_weight().priority(), ad.retrieve_activity_weight().disk_instance_name(),
ad.retrieve_activity_weight().activity(), ad.retrieve_activity_weight().creation_time(),
ad.retrieve_activity_weight().weight(), ad.count()});
}
return ret;
}
//------------------------------------------------------------------------------
// RetrieveActivityCountMap::clear()
//------------------------------------------------------------------------------
void RetrieveActivityCountMap::clear() {
m_activityCountMap.Clear();
}
//------------------------------------------------------------------------------
// operator==()
//------------------------------------------------------------------------------
bool operator==(const serializers::RetrieveActivityCountPair & serialized, const RetrieveActivityDescription & memory) {
return (serialized.retrieve_activity_weight().priority() == memory.priority)
&& (serialized.retrieve_activity_weight().disk_instance_name() == memory.diskInstanceName)
&& (serialized.retrieve_activity_weight().activity() == memory.activity);
}
//------------------------------------------------------------------------------
// toString()
//------------------------------------------------------------------------------
std::string toString(const RetrieveActivityDescription & ad) {
serializers::RetrieveActivityWeight raw;
raw.set_priority(ad.priority);
raw.set_disk_instance_name(ad.diskInstanceName);
raw.set_activity(ad.activity);
raw.set_creation_time(ad.creationTime);
raw.set_weight(ad.weight);
return toString(raw);
}
//------------------------------------------------------------------------------
// toString()
//------------------------------------------------------------------------------
std::string toString(const serializers::RetrieveActivityWeight & raw){
using namespace google::protobuf::util;
std::string json;
JsonPrintOptions options;
options.always_print_primitive_fields = true;
MessageToJsonString(raw, &json, options);
return json;
}
}} // namespace cta::objectstore.
\ No newline at end of file
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "objectstore/cta.pb.h"
#include <list>
namespace cta { namespace objectstore {
struct RetrieveActivityDescription {
uint64_t priority;
std::string diskInstanceName;
std::string activity;
time_t creationTime;
double weight;
uint64_t count;
};
/** A helper class allowing manipulation of arrays of ValueCountPairs, used as containers for running
* counters for properties with multiple possible values. When considering the retrieve mounts, all activities
* will be considered for the same mount (and highest priority one will be accounted). So this class does not
* select any and gives the full list in getActivities(). Having multiple activities sharing the drive is not
* expected to be a frequent occurrence. */
class RetrieveActivityCountMap {
public:
RetrieveActivityCountMap (google::protobuf::RepeatedPtrField<serializers::RetrieveActivityCountPair>* retrieveActivityCountMap);
void incCount(const RetrieveActivityDescription & activityDescription);
void decCount(const RetrieveActivityDescription & activityDescription);
void clear();
std::list<RetrieveActivityDescription> getActivities(uint64_t priority);
private:
google::protobuf::RepeatedPtrField<serializers::RetrieveActivityCountPair>& m_activityCountMap;
};
std::string toString(const RetrieveActivityDescription &);
std::string toString(const serializers::RetrieveActivityWeight &);
bool operator==(const serializers::RetrieveActivityCountPair &, const RetrieveActivityDescription &);
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -22,6 +22,7 @@
#include "EntryLogSerDeser.hpp"
#include "ValueCountMap.hpp"
#include "AgentReference.hpp"
#include "RetrieveActivityCountMap.hpp"
#include <google/protobuf/util/json_util.h>
namespace cta { namespace objectstore {
......@@ -287,6 +288,7 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap());
RetrieveActivityCountMap retrieveActivityCountMap(m_payload.mutable_activity_map());
// We need to figure out which job will be added to which shard.
// We might have to split shards if they would become too big.
// For a given jobs, there a 4 possible cases:
......@@ -462,6 +464,9 @@ void RetrieveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefer
maxDriveAllowedMap.incCount(j.policy.maxDrivesAllowed);
priorityMap.incCount(j.policy.retrievePriority);
minRetrieveRequestAgeMap.incCount(j.policy.retrieveMinRequestAge);
if (j.activityDescription) {
retrieveActivityCountMap.incCount(j.activityDescription.value());
}
// oldestjobcreationtime is initialized to 0 when
if (m_payload.oldestjobcreationtime()) {
if ((uint64_t)j.startTime < m_payload.oldestjobcreationtime())
......@@ -568,6 +573,10 @@ RetrieveQueue::JobsSummary RetrieveQueue::getJobsSummary() {
ret.priority = priorityMap.maxValue();
ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap());
ret.minRetrieveRequestAge = minRetrieveRequestAgeMap.minValue();
RetrieveActivityCountMap retrieveActivityCountMap(m_payload.mutable_activity_map());
for (auto ra: retrieveActivityCountMap.getActivities(ret.priority)) {
ret.activityCounts.push_back({ra.diskInstanceName, ra.activity, ra.weight, ra.count});
}
} else {
ret.maxDrivesAllowed = 0;
ret.priority = 0;
......@@ -646,6 +655,7 @@ void RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemo
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
ValueCountMap minRetrieveRequestAgeMap(m_payload.mutable_minretrieverequestagemap());
RetrieveActivityCountMap retrieveActivityCountMap(m_payload.mutable_activity_map());
// Make a working copy of the jobs to remove. We will progressively trim this local list.
auto localJobsToRemove = jobsToRemove;
// The jobs are expected to be removed from the front shards first (poped in order)
......@@ -672,6 +682,14 @@ void RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemo
maxDriveAllowedMap.decCount(j.maxDrivesAllowed);
priorityMap.decCount(j.priority);
minRetrieveRequestAgeMap.decCount(j.minRetrieveRequestAge);
if (j.activityDescription) {
// We have up a partial activity description, but this is enough to decCount.
RetrieveActivityDescription activityDescription;
activityDescription.priority = j.priority;
activityDescription.diskInstanceName = j.activityDescription.value().diskInstanceName;
activityDescription.activity = j.activityDescription.value().activity;
retrieveActivityCountMap.decCount(activityDescription);
}
}
// In all cases, we should update the global statistics.
m_payload.set_retrievejobscount(m_payload.retrievejobscount() - removalResult.jobsRemoved);
......
......@@ -22,6 +22,7 @@
#include "objectstore/cta.pb.h"
#include "RetrieveRequest.hpp"
#include "scheduler/RetrieveRequestDump.hpp"
#include "RetrieveActivityCountMap.hpp"
namespace cta { namespace objectstore {
......@@ -65,6 +66,7 @@ public:
uint64_t fileSize;
cta::common::dataStructures::MountPolicy policy;
time_t startTime;
optional<RetrieveActivityDescription> activityDescription;
};
void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc);
// This version will check for existence of the job in the queue before
......@@ -82,6 +84,13 @@ public:
uint64_t priority;
uint64_t minRetrieveRequestAge;
uint64_t maxDrivesAllowed;
struct ActivityCount {
std::string diskInstanceName;
std::string activity;
double weight;
uint64_t count;
};
std::list<ActivityCount> activityCounts;
};
JobsSummary getJobsSummary();
struct JobDump {
......@@ -148,36 +157,12 @@ private:
uint64_t m_maxShardSize = c_defaultMaxShardSize;
};
class RetrieveQueueToTransferForUser : public RetrieveQueue {
public:
template<typename...Ts> RetrieveQueueToTransferForUser(Ts&...args): RetrieveQueue(args...) {}
};
class RetrieveQueueToReportForUser : public RetrieveQueue {
public:
template<typename...Ts> RetrieveQueueToReportForUser(Ts&...args): RetrieveQueue(args...) {}
};
class RetrieveQueueFailed : public RetrieveQueue {
public:
template<typename...Ts> RetrieveQueueFailed(Ts&...args): RetrieveQueue(args...) {}
};
class RetrieveQueueToReportToRepackForSuccess : public RetrieveQueue {
public:
template<typename...Ts> RetrieveQueueToReportToRepackForSuccess(Ts&...args): RetrieveQueue(args...) {}
};
class RetrieveQueueToReportToRepackForFailure: public RetrieveQueue{
public:
template<typename...Ts> RetrieveQueueToReportToRepackForFailure(Ts&...args): RetrieveQueue(args...) {}
};
class RetrieveQueueToTransferForRepack : public RetrieveQueue {
public:
template<typename...Ts> RetrieveQueueToTransferForRepack(Ts&...args): RetrieveQueue(args...) {}
};
class RetrieveQueueToTransferForUser : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; };
class RetrieveQueueToReportForUser : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; };
class RetrieveQueueFailed : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; };
class RetrieveQueueToReportToRepackForSuccess : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; };
class RetrieveQueueToReportToRepackForFailure: public RetrieveQueue { using RetrieveQueue::RetrieveQueue; };
class RetrieveQueueToTransferForRepack : public RetrieveQueue { using RetrieveQueue::RetrieveQueue; };
}}
......@@ -29,7 +29,9 @@ struct ContainerTraits<RetrieveQueue,C>
{
struct ContainerSummary : public RetrieveQueue::JobsSummary {
ContainerSummary() : RetrieveQueue::JobsSummary() {}
ContainerSummary(const RetrieveQueue::JobsSummary &c) : RetrieveQueue::JobsSummary({c.jobs,c.bytes,c.oldestJobStartTime,c.priority,c.minRetrieveRequestAge,c.maxDrivesAllowed}) {}
ContainerSummary(const RetrieveQueue::JobsSummary &c) :
RetrieveQueue::JobsSummary({c.jobs,c.bytes,c.oldestJobStartTime,c.priority,
c.minRetrieveRequestAge,c.maxDrivesAllowed,c.activityCounts}) {}
void addDeltaToLog(const ContainerSummary&, log::ScopedParamContainer&) const;
};
......@@ -42,6 +44,7 @@ struct ContainerTraits<RetrieveQueue,C>
uint64_t filesize;
cta::common::dataStructures::MountPolicy policy;
serializers::RetrieveJobStatus status;
optional<RetrieveActivityDescription> activityDescription;
typedef std::list<InsertedElement> list;
};
......@@ -276,7 +279,7 @@ addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemC
std::list<RetrieveQueue::JobToAdd> jobsToAdd;
for (auto &e : elemMemCont) {
RetrieveRequest &rr = *e.retrieveRequest;
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr)});
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
......@@ -289,7 +292,7 @@ addReferencesIfNecessaryAndCommit(Container& cont, typename InsertedElement::lis
std::list<RetrieveQueue::JobToAdd> jobsToAdd;
for (auto &e : elemMemCont) {
RetrieveRequest &rr = *e.retrieveRequest;
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr)});
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription});
}
cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc);
}
......
......@@ -96,12 +96,15 @@ auto RetrieveQueueShard::removeJobs(const std::list<std::string>& jobsToRemove)
const auto & j = jl->Get(i);
ret.removedJobs.emplace_back(JobInfo());
ret.removedJobs.back().address = j.address();
ret.removedJobs.back().fSeq = j.fseq();
ret.removedJobs.back().copyNb = j.copynb();
ret.removedJobs.back().maxDrivesAllowed = j.maxdrivesallowed();
ret.removedJobs.back().minRetrieveRequestAge = j.minretrieverequestage();
ret.removedJobs.back().priority = j.priority();
ret.removedJobs.back().size = j.size();
ret.removedJobs.back().startTime = j.starttime();
if (j.has_activity())
ret.removedJobs.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() };
ret.bytesRemoved += j.size();
totalSize -= j.size();
ret.jobsRemoved++;
......@@ -136,7 +139,10 @@ auto RetrieveQueueShard::dumpJobs() -> std::list<JobInfo> {
std::list<JobInfo> ret;
for (auto &j: m_payload.retrievejobs()) {
ret.emplace_back(JobInfo{j.size(), j.address(), (uint16_t)j.copynb(), j.priority(),
j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq()});
j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq(), nullopt});
if (j.has_activity()) {
ret.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() };
}
}
return ret;
}
......@@ -154,6 +160,12 @@ std::list<RetrieveQueue::JobToAdd> RetrieveQueueShard::dumpJobsToAdd() {
ret.back().policy.retrievePriority = j.priority();
ret.back().startTime = j.starttime();
ret.back().retrieveRequestAddress = j.