Commit 1c4b90dc authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Created a Sorter to queue Archive or Retrieve Jobs

Queueing of Archive Jobs is done and unit tested
Queueing of Retrieve Requests is not completely done yet
parent 402e9008
......@@ -19,6 +19,7 @@
#include "common/dataStructures/MountPolicy.hpp"
#include "common/dataStructures/utils.hpp"
#include "common/exception/Exception.hpp"
#include "MountPolicy.hpp"
namespace cta {
namespace common {
......@@ -60,6 +61,20 @@ bool MountPolicy::operator!=(const MountPolicy &rhs) const {
return !operator==(rhs);
}
MountPolicy MountPolicy::operator=(const MountPolicy& other){
if(this != &other){
this->archiveMinRequestAge = other.archiveMinRequestAge;
this->archivePriority = other.archivePriority;
this->comment = other.comment;
this->creationLog = other.creationLog;
this->lastModificationLog = other.lastModificationLog;
this->maxDrivesAllowed = other.maxDrivesAllowed;
this->name = other.name;
this->retrieveMinRequestAge = other.retrieveMinRequestAge;
this->retrievePriority = other.retrievePriority;
}
return *this;
}
//------------------------------------------------------------------------------
// operator<<
//------------------------------------------------------------------------------
......
......@@ -39,6 +39,8 @@ struct MountPolicy {
bool operator==(const MountPolicy &rhs) const;
bool operator!=(const MountPolicy &rhs) const;
MountPolicy operator=(const MountPolicy& other);
std::string name;
uint64_t archivePriority;
......
......@@ -29,6 +29,7 @@ namespace cta { namespace objectstore {
class GenericObject;
class AgentReference;
class GarbageCollector;
class Sorter;
/**
* Class containing agent information and managing the update of the
......@@ -42,6 +43,7 @@ class GarbageCollector;
class Agent: public ObjectOps<serializers::Agent, serializers::Agent_t> {
friend class AgentReference;
friend class GarbageCollector;
friend class Sorter;
public:
CTA_GENERATE_EXCEPTION_CLASS(AgentStillOwnsObjects);
Agent(GenericObject & go);
......
......@@ -116,7 +116,7 @@ public:
log::TimingList timingList;
utils::Timer t;
ContainerTraits<Q,C>::getLockedAndFetched(cont, contLock, m_agentReference, contId, lc);
contAddress = cont.getAddressIfSet();
contAddress = cont.getAddressIfSet();//TODO : It would be better to return this value
auto contSummaryBefore = ContainerTraits<Q,C>::getContainerSummary(cont);
timingList.insertAndReset("queueLockFetchTime", t);
ContainerTraits<Q,C>::addReferencesIfNecessaryAndCommit(cont, elements, m_agentReference, lc);
......
......@@ -495,7 +495,7 @@ struct ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::QueueType {
template<>
struct ContainerTraits<ArchiveQueue, ArchiveQueueToTransferForRepack>::QueueType{
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransfer;
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransferForRepack;
};
template<>
......
......@@ -96,6 +96,8 @@ JobQueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) {
switch (j.status()) {
case serializers::ArchiveJobStatus::AJS_ToTransfer:
return JobQueueType::JobsToTransfer;
case serializers::ArchiveJobStatus::AJS_ToTransferForRepack:
return JobQueueType::JobsToTransferForRepack;
case serializers::ArchiveJobStatus::AJS_Complete:
throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Complete jobs are not queueable. They are finished and pend siblings completion.");
case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
......@@ -534,8 +536,8 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16
// If a status change was requested, do it.
if (newStatus) j->set_status(*newStatus);
// We also need to gather all the job content for the user to get in-memory
// representation.
// TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequest.
// representation.getLockedAndFetchedJobQueue
// TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequesgetLockedAndFetchedJobQueuet.
// We could try and refactor this.
retRef.m_archiveFile.archiveFileID = payload.archivefileid();
retRef.m_archiveFile.checksumType = payload.checksumtype();
......
......@@ -41,6 +41,7 @@
#include <iostream>
#endif
#include <valgrind/helgrind.h>
#include <iostream>
namespace cta { namespace objectstore {
......
......@@ -75,6 +75,7 @@ add_library (ctaobjectstore SHARED
RetrieveQueueFailedAlgorithms.cpp
RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp
JobQueueType.cpp
Sorter.cpp
ArchiveRequest.cpp
RetrieveRequest.cpp
DriveRegister.cpp
......@@ -108,6 +109,7 @@ set(ObjectStoreUnitTests
RetrieveQueueTest.cpp
GarbageCollectorTest.cpp
AlgorithmsTest.cpp
SorterTest.cpp
)
add_library(ctaobjectstoreunittests SHARED ${ObjectStoreUnitTests})
......
......@@ -385,6 +385,8 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
fetchedObjects.clear();
}
//TODO : We should record the VID in the ArchiveRequest object to allow the requeueing in the proper report queue (currently, the report queue is selected
//by tapepool, which works but is not the most efficient way to report the request (contention problem)
void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& agent, AgentReference& agentReference, Backend & objectStore,
log::LogContext & lc) {
// We can now start updating the objects efficiently. We still need to re-fetch them locked
......
......@@ -24,6 +24,7 @@
#include "AgentRegister.hpp"
#include "JobQueueType.hpp"
#include "common/log/LogContext.hpp"
#include "Sorter.hpp"
/**
* Plan => Garbage collector keeps track of the agents.
......@@ -58,6 +59,7 @@ public:
std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <ArchiveRequest>>> archiveQueuesAndRequests;
std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr <RetrieveRequest>>> retrieveQueuesAndRequests;
std::list<std::shared_ptr<GenericObject>> otherObjects;
//Sorter m_sorter;
/// Fill up the fetchedObjects with objects of interest.
void fetchOwnedObjects(Agent & agent, std::list<std::shared_ptr<GenericObject>> & fetchedObjects, Backend & objectStore,
log::LogContext & lc);
......@@ -71,13 +73,16 @@ public:
// Lock, fetch and update other objects
void lockFetchAndUpdateOtherObjects(Agent & agent, AgentReference & agentReference, Backend & objectStore,
cta::catalogue::Catalogue & catalogue, log::LogContext & lc);
//Sorter& getSorter();
};
private:
Backend & m_objectStore;
catalogue::Catalogue & m_catalogue;
AgentReference & m_ourAgentReference;
AgentRegister m_agentRegister;
std::map<std::string, AgentWatchdog * > m_watchedAgents;
//void garbageCollectArchiveRequests(Agent& agent, OwnedObjectSorter &ownedObjectSorter,log::LogContext & lc);
};
}}
......@@ -21,6 +21,6 @@
#include <string>
namespace cta { namespace objectstore {
enum class JobQueueType { JobsToTransfer, FailedJobs, JobsToReportToUser, JobsToReportToRepackForSuccess, JobsToReportToRepackForFailure };
enum class JobQueueType { JobsToTransfer, FailedJobs, JobsToReportToUser, JobsToReportToRepackForSuccess, JobsToReportToRepackForFailure, JobsToTransferForRepack };
std::string toString(JobQueueType queueType);
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -548,7 +548,7 @@ JobQueueType RetrieveRequest::getQueueType() {
case serializers::RetrieveJobStatus::RJS_ToTransfer:
return JobQueueType::JobsToTransfer;
break;
case serializers::RetrieveJobStatus::RJS_Succeeded:
case serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess:
return JobQueueType::JobsToReportToRepackForSuccess;
break;
case serializers::RetrieveJobStatus::RJS_ToReportForFailure:
......@@ -860,7 +860,7 @@ RetrieveRequest::AsyncJobSucceedForRepackReporter * RetrieveRequest::asyncReport
if(job.copynb() == copyNb)
{
//Change the status to RJS_Succeed
job.set_status(serializers::RetrieveJobStatus::RJS_Succeeded);
job.set_status(serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess);
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
}
......
......@@ -183,6 +183,7 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool
case JobQueueType::JobsToTransfer: archiveQueueNameHeader+="ToTransfer"; break;
case JobQueueType::JobsToReportToUser: archiveQueueNameHeader+="ToReport"; break;
case JobQueueType::FailedJobs: archiveQueueNameHeader+="Failed"; break;
case JobQueueType::JobsToTransferForRepack: archiveQueueNameHeader+="ToTransferForRepack"; break;
case JobQueueType::JobsToReportToRepackForSuccess: archiveQueueNameHeader+="ToReportToRepackForSuccess"; break;
default: break;
}
......
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Sorter.hpp"
#include "Helpers.hpp"
#include "common/threading/MutexLocker.hpp"
#include <iostream>
namespace cta { namespace objectstore {
Sorter::Sorter(AgentReference& agentReference, Backend& objectstore, catalogue::Catalogue& catalogue):m_agentReference(agentReference),m_objectstore(objectstore),m_catalogue(catalogue){}
Sorter::~Sorter() {
}
void Sorter::insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, ArchiveRequest::JobDump& jobToInsert, log::LogContext & lc){
std::shared_ptr<ArchiveJobQueueInfo> ajqi = std::make_shared<ArchiveJobQueueInfo>(ArchiveJobQueueInfo());
ajqi->archiveRequest = archiveRequest;
Sorter::ArchiveJob jobToAdd;
jobToAdd.archiveRequest = archiveRequest;
jobToAdd.archiveFileId = archiveRequest->getArchiveFile().archiveFileID;
jobToAdd.jobDump.copyNb = jobToInsert.copyNb;
jobToAdd.fileSize = archiveRequest->getArchiveFile().fileSize;
jobToAdd.mountPolicy = archiveRequest->getMountPolicy();
jobToAdd.jobDump.owner = jobToInsert.owner;//TODO : Maybe should be passed in parameter of this method
jobToAdd.startTime = archiveRequest->getEntryLog().time;
jobToAdd.jobDump.tapePool = jobToInsert.tapePool;
ajqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>());
threading::MutexLocker mapLocker(m_mutex);
m_archiveQueuesAndRequests[std::make_tuple(jobToInsert.tapePool, archiveRequest->getJobQueueType(jobToInsert.copyNb))].emplace_back(ajqi);
}
void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, log::LogContext & lc){
std::set<std::string> candidateVids = getCandidateVids(*retrieveRequest);
//We need to select the best VID to queue the RetrieveJob in the best queue
if(candidateVids.empty()){
std::shared_ptr<RetrieveJobQueueInfo> rjqi = std::make_shared<RetrieveJobQueueInfo>(RetrieveJobQueueInfo());
rjqi->retrieveRequest = retrieveRequest;
//The first copy of the ArchiveFile will be queued
cta::common::dataStructures::TapeFile jobTapeFile = retrieveRequest->getArchiveFile().tapeFiles.begin()->second;
Sorter::RetrieveJob jobToAdd;
jobToAdd.jobDump.copyNb = jobTapeFile.copyNb;
jobToAdd.fSeq = jobTapeFile.fSeq;
jobToAdd.fileSize = retrieveRequest->getArchiveFile().fileSize;
jobToAdd.mountPolicy = retrieveRequest->getRetrieveFileQueueCriteria().mountPolicy;
jobToAdd.retrieveRequest = retrieveRequest;
jobToAdd.startTime = retrieveRequest->getEntryLog().time;
rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>());
try{
threading::MutexLocker mapLocker(m_mutex);
m_retrieveQueuesAndRequests[std::make_tuple(retrieveRequest->getArchiveFile().tapeFiles.begin()->second.vid, retrieveRequest->getQueueType())].emplace_back(rjqi);
} catch (cta::exception::Exception &ex){
log::ScopedParamContainer params(lc);
params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID)
.add("exceptionMessage", ex.getMessageValue());
lc.log(log::ERR, "In Sorter::insertRetrieveRequest() Failed to determine destination queue for retrieve request.");
throw ex;
}
}
std::string bestVid = getBestVidForQueueingRetrieveRequest(*retrieveRequest, candidateVids ,lc);
for (auto & tf: retrieveRequest->getArchiveFile().tapeFiles) {
if (tf.second.vid == bestVid) {
goto vidFound;
}
}
{
std::stringstream err;
err << "In Sorter::insertRetrieveRequest(): no tape file for requested vid. archiveId=" << retrieveRequest->getArchiveFile().archiveFileID
<< " vid=" << bestVid;
throw RetrieveRequestHasNoCopies(err.str());
}
vidFound:
std::shared_ptr<RetrieveJobQueueInfo> rjqi = std::make_shared<RetrieveJobQueueInfo>(RetrieveJobQueueInfo());
rjqi->retrieveRequest = retrieveRequest;
log::ScopedParamContainer params(lc);
size_t copyNb = std::numeric_limits<size_t>::max();
uint64_t fSeq = std::numeric_limits<uint64_t>::max();
for (auto & tc: retrieveRequest->getArchiveFile().tapeFiles) { if (tc.second.vid==bestVid) { copyNb=tc.first; fSeq=tc.second.fSeq; } }
Sorter::RetrieveJob jobToAdd;
jobToAdd.jobDump.copyNb = copyNb;
jobToAdd.fSeq = fSeq;
jobToAdd.fileSize = retrieveRequest->getArchiveFile().fileSize;
jobToAdd.mountPolicy = retrieveRequest->getRetrieveFileQueueCriteria().mountPolicy;
jobToAdd.retrieveRequest = retrieveRequest;
jobToAdd.startTime = retrieveRequest->getEntryLog().time;
threading::MutexLocker mapLocker(m_mutex);
//We are sure that we want to transfer jobs
rjqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>());
m_retrieveQueuesAndRequests[std::make_tuple(bestVid,JobQueueType::JobsToTransfer)].emplace_back(rjqi);
params.add("fileId", retrieveRequest->getArchiveFile().archiveFileID)
.add("copyNb", copyNb)
.add("tapeVid", bestVid)
.add("fSeq", fSeq);
lc.log(log::INFO, "Selected vid to be queued for retrieve request.");
}
std::set<std::string> Sorter::getCandidateVids(RetrieveRequest &request){
using serializers::RetrieveJobStatus;
std::set<std::string> candidateVids;
for (auto & j: request.dumpJobs()) {
if(j.status == RetrieveJobStatus::RJS_ToTransfer) {
candidateVids.insert(request.getArchiveFile().tapeFiles.at(j.copyNb).vid);
}
}
return candidateVids;
}
std::string Sorter::getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc){
std::string vid;
try{
vid = Helpers::selectBestRetrieveQueue(candidateVids,m_catalogue,m_objectstore);
} catch (Helpers::NoTapeAvailableForRetrieve & ex) {
log::ScopedParamContainer params(lc);
params.add("fileId", retrieveRequest.getArchiveFile().archiveFileID);
lc.log(log::INFO, "In Sorter::getVidForQueueingRetrieveRequest(): No available tape found.");
throw ex;
}
return vid;
}
bool Sorter::flushOneArchive(log::LogContext &lc) {
threading::MutexLocker locker(m_mutex);
for(auto & kv: m_archiveQueuesAndRequests){
if(!kv.second.empty()){
queueArchiveRequests(std::get<0>(kv.first),std::get<1>(kv.first),kv.second,lc);
return true;
}
}
return false;
}
bool Sorter::flushOneRetrieve(log::LogContext &lc){
return true;
}
Sorter::MapArchive Sorter::getAllArchive(){
return m_archiveQueuesAndRequests;
}
Sorter::MapRetrieve Sorter::getAllRetrieve(){
return m_retrieveQueuesAndRequests;
}
void Sorter::queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc){
for(auto& archiveJobInfo: archiveJobInfos){
double queueLockFetchTime=0;
double queueProcessAndCommitTime=0;
double requestsUpdatePreparationTime=0;
double requestsUpdatingTime = 0;
utils::Timer t;
uint64_t filesBefore=0;
uint64_t bytesBefore=0;
ArchiveQueue aq(m_objectstore);
ScopedExclusiveLock rql;
Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(aq,rql, m_agentReference, tapePool, jobQueueType, lc);
queueLockFetchTime = t.secs(utils::Timer::resetCounter);
auto jobsSummary=aq.getJobsSummary();
filesBefore=jobsSummary.jobs;
bytesBefore=jobsSummary.bytes;
// Prepare the list of requests to add to the queue (if needed).
std::list<ArchiveQueue::JobToAdd> jta;
// We have the queue. We will loop on the requests, add them to the list. We will launch their updates
// after committing the queue.
Sorter::ArchiveJob jobToQueue = std::get<0>(archiveJobInfo->jobToQueue);
std::promise<void>& jobPromise = std::get<1>(archiveJobInfo->jobToQueue);
jta.push_back({jobToQueue.jobDump,jobToQueue.archiveRequest->getAddressIfSet(),jobToQueue.archiveFileId,jobToQueue.fileSize,jobToQueue.mountPolicy,jobToQueue.startTime});
auto addedJobs = aq.addJobsIfNecessaryAndCommit(jta,m_agentReference,lc);
queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter);
if(!addedJobs.files){
try{
throw cta::exception::Exception("In Sorter::queueArchiveRequests, no job have been added with addJobsIfNecessaryAndCommit().");
} catch (cta::exception::Exception &e){
jobPromise.set_exception(std::current_exception());
continue;
}
}
// We will keep individual references for the job update we launch so that we make
// our life easier downstream.
struct ARUpdatedParams {
std::unique_ptr<ArchiveRequest::AsyncJobOwnerUpdater> updater;
std::shared_ptr<ArchiveRequest> archiveRequest;
uint16_t copyNb;
};
ARUpdatedParams arUpdaterParams;
arUpdaterParams.archiveRequest = archiveJobInfo->archiveRequest;
arUpdaterParams.copyNb = jobToQueue.jobDump.copyNb;
//Here the new owner is the agentReference of the process that runs the sorter, ArchiveRequest has no owner, the jobs have
arUpdaterParams.updater.reset(archiveJobInfo->archiveRequest->asyncUpdateJobOwner(jobToQueue.jobDump.copyNb,m_agentReference.getAgentAddress(),jobToQueue.jobDump.owner,cta::nullopt));
requestsUpdatePreparationTime = t.secs(utils::Timer::resetCounter);
try{
arUpdaterParams.updater->wait();
//No problem, the job has been inserted into the queue, log it.
jobPromise.set_value();
log::ScopedParamContainer params(lc);
params.add("archiveRequestObject", archiveJobInfo->archiveRequest->getAddressIfSet())
.add("copyNb", arUpdaterParams.copyNb)
.add("fileId",arUpdaterParams.updater->getArchiveFile().archiveFileID)
.add("tapePool",tapePool)
.add("archiveQueueObject",aq.getAddressIfSet())
.add("previousOwner",jobToQueue.jobDump.owner);
lc.log(log::INFO, "In Sorter::queueArchiveRequests(): queued archive job.");
} catch (cta::exception::Exception &e){
jobPromise.set_exception(std::current_exception());
continue;
}
requestsUpdatingTime = t.secs(utils::Timer::resetCounter);
{
log::ScopedParamContainer params(lc);
auto jobsSummary = aq.getJobsSummary();
params.add("tapePool", tapePool)
.add("archiveQueueObject", aq.getAddressIfSet())
/*.add("filesAdded", filesQueued)
.add("bytesAdded", bytesQueued)
.add("filesAddedInitially", filesQueued)
.add("bytesAddedInitially", bytesQueued)*/
/*.add("filesDequeuedAfterErrors", filesDequeued)
.add("bytesDequeuedAfterErrors", bytesDequeued)*/
.add("filesBefore", filesBefore)
.add("bytesBefore", bytesBefore)
.add("filesAfter", jobsSummary.jobs)
.add("bytesAfter", jobsSummary.bytes)
.add("queueLockFetchTime", queueLockFetchTime)
.add("queuePreparationTime", queueProcessAndCommitTime)
.add("requestsUpdatePreparationTime", requestsUpdatePreparationTime)
.add("requestsUpdatingTime", requestsUpdatingTime);
//.add("queueRecommitTime", queueRecommitTime);
lc.log(log::INFO, "In Sorter::queueArchiveRequests(): "
"Queued an archiveRequest");
}
}
archiveJobInfos.clear();
}
void Sorter::queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& retrieveJobsInfo, log::LogContext &lc){
/*for(auto& retrieveJobInfo: retrieveJobsInfo){
double queueLockFetchTime=0;
double queueProcessAndCommitTime=0;
//double requestsUpdatePreparationTime=0;
//double requestsUpdatingTime = 0;
utils::Timer t;
uint64_t filesBefore=0;
uint64_t bytesBefore=0;
RetrieveQueue rq(m_objectstore);
ScopedExclusiveLock rql;
Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq,rql,m_agentReference,vid,jobQueueType,lc);
queueLockFetchTime = t.secs(utils::Timer::resetCounter);
auto jobsSummary=rq.getJobsSummary();
filesBefore=jobsSummary.jobs;
bytesBefore=jobsSummary.bytes;
Sorter::RetrieveJob jobToQueue = std::get<0>(retrieveJobInfo->jobToQueue);
std::promise<void>& jobPromise = std::get<1>(retrieveJobInfo->jobToQueue);
std::list<RetrieveQueue::JobToAdd> jta;
jta.push_back({jobToQueue.jobDump.copyNb,jobToQueue.fSeq,jobToQueue.retrieveRequest->getAddressIfSet(),jobToQueue.fileSize,jobToQueue.mountPolicy,jobToQueue.startTime});
auto addedJobs = rq.addJobsIfNecessaryAndCommit(jta, m_agentReference, lc);
queueProcessAndCommitTime = t.secs(utils::Timer::resetCounter);
if(!addedJobs.files){
throw cta::exception::Exception("In Sorter::queueRetrieveRequests(), failed of adding a job to the retrieve queue through addJobsIfNecessaryAndCommit()");
}
// We will keep individual references for each job update we launch so that we make
// our life easier downstream.
struct RRUpdatedParams {
std::unique_ptr<RetrieveRequest::AsyncJobOwnerUpdater> updater;
std::shared_ptr<RetrieveRequest> retrieveRequest;
uint64_t copyNb;
};
{
std::list<RRUpdatedParams> rrUpdatersParams;
}
if(queueLockFetchTime && filesBefore && bytesBefore &&queueProcessAndCommitTime){}
}*/
}
}}
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef SORTER_HPP
#define SORTER_HPP
#include <map>
#include <tuple>
#include "JobQueueType.hpp"
#include <memory>
#include "ArchiveRequest.hpp"
#include "RetrieveRequest.hpp"
#include "common/log/LogContext.hpp"
#include "Agent.hpp"
#include <future>
#include "common/threading/Mutex.hpp"
#include "GenericObject.hpp"
#include "catalogue/Catalogue.hpp"
#include "common/dataStructures/ArchiveJob.hpp"
#include "RetrieveQueue.hpp"
#include "ArchiveQueue.hpp"
namespace cta { namespace objectstore {
struct ArchiveJobQueueInfo;
struct RetrieveJobQueueInfo;
class Sorter {
public:
CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies);
Sorter(AgentReference& agentReference,Backend &objectstore, catalogue::Catalogue& catalogue);
virtual ~Sorter();
void insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, ArchiveRequest::JobDump& jobToInsert,log::LogContext & lc);
/**
*
* @param retrieveRequest
* @param lc
* @throws TODO : explain what is the exception thrown by this method
*/
void insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequest, log::LogContext & lc);
typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<ArchiveJobQueueInfo>>> MapArchive;
typedef std::map<std::tuple<std::string, JobQueueType>, std::list<std::shared_ptr<RetrieveJobQueueInfo>>> MapRetrieve;
bool flushOneRetrieve(log::LogContext &lc);
bool flushOneArchive(log::LogContext &lc);
MapArchive getAllArchive();
MapRetrieve getAllRetrieve();
struct ArchiveJob{
std::shared_ptr<ArchiveRequest> archiveRequest;
ArchiveRequest::JobDump jobDump;
uint64_t archiveFileId;
time_t startTime;
uint64_t fileSize;
common::dataStructures::MountPolicy mountPolicy;
};
struct RetrieveJob{
std::shared_ptr<RetrieveRequest> retrieveRequest;
RetrieveRequest::JobDump jobDump;
uint64_t archiveFileId;
time_t startTime;
uint64_t fileSize;
uint64_t fSeq;
common::dataStructures::MountPolicy mountPolicy;
};
private:
AgentReference &m_agentReference;
Backend &m_objectstore;
catalogue::Catalogue &m_catalogue;
MapArchive m_archiveQueuesAndRequests;
MapRetrieve m_retrieveQueuesAndRequests;
threading::Mutex m_mutex;
const unsigned int c_maxBatchSize = 500;
std::set<std::string> getCandidateVids(RetrieveRequest &request);
std::string getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc);
void queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& requests, log::LogContext &lc);
void queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc);
};
struct ArchiveJobQueueInfo{