Commit 4350d29e authored by Eric Cano's avatar Eric Cano
Browse files

#533: Added support for disk system in queues and object store.

parent cbe37199
......@@ -78,7 +78,7 @@ void fillRetrieveRequests(
rqc.mountPolicy.retrievePriority = 1;
requestPtrs.emplace_back(new cta::objectstore::RetrieveRequest(rrAddr, be));
requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser>::InsertedElement{
requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser, cta::nullopt
requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser, cta::nullopt, cta::nullopt
});
auto &rr = *requests.back().retrieveRequest;
rr.initialize();
......
......@@ -75,12 +75,12 @@ add_library (ctaobjectstore SHARED
ArchiveQueueToTransferForRepackAlgorithms.cpp
RetrieveQueue.cpp
RetrieveQueueShard.cpp
RetrieveQueueToTransferAlgorithms.cpp
RetrieveQueueToTransferForUserAlgorithms.cpp
RetrieveQueueToTransferForRepackAlgorithms.cpp
RetrieveQueueToReportAlgorithms.cpp
RetrieveQueueFailedAlgorithms.cpp
RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp
RetrieveQueueToReportToRepackForFailureAlgorithms.cpp
RetrieveQueueToTransferForRepackAlgorithms.cpp
JobQueueType.cpp
Sorter.cpp
ArchiveRequest.cpp
......
......@@ -559,7 +559,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateRetrieveJobs(Agent&
for (auto &tf: rr->getArchiveFile().tapeFiles) {
if (tf.vid == vid) {
jta.push_back({tf.copyNb, tf.fSeq, rr->getAddressIfSet(), rr->getArchiveFile().fileSize,
rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity()});
rr->getRetrieveFileQueueCriteria().mountPolicy, rr->getEntryLog().time, rr->getActivity(), rr->getDiskSystemName()});
}
}
}
......
......@@ -608,7 +608,8 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
cta::objectstore::ScopedExclusiveLock rql(rq);
rq.fetch();
std::list <cta::objectstore::RetrieveQueue::JobToAdd> jta;
jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy, sReq.creationLog.time, cta::nullopt});
jta.push_back({1,rqc.archiveFile.tapeFiles.front().fSeq, rr.getAddressIfSet(), rqc.archiveFile.fileSize, rqc.mountPolicy,
sReq.creationLog.time, cta::nullopt, cta::nullopt});
rq.addJobsAndCommit(jta, agentRef, lc);
}
if (pass < 5) { pass++; continue; }
......
......@@ -531,7 +531,7 @@ auto RetrieveQueue::addJobsIfNecessaryAndCommit(std::list<JobToAdd> & jobsToAdd,
}
shardsDumps.emplace_back(std::list<JobDump>());
for (auto & j: s->dumpJobs()) {
shardsDumps.back().emplace_back(JobDump({j.address, j.copyNb, j.size}));
shardsDumps.back().emplace_back(JobDump({j.address, j.copyNb, j.size, j.activityDescription, j.diskSystemName}));
}
nextShard:
s++;
......@@ -606,7 +606,7 @@ auto RetrieveQueue::dumpJobs() -> std::list<JobDump> {
goto nextShard;
}
for (auto & j: s->dumpJobs()) {
ret.emplace_back(JobDump{j.address, j.copyNb, j.size});
ret.emplace_back(JobDump{j.address, j.copyNb, j.size, j.activityDescription, j.diskSystemName});
}
nextShard:
s++; sf++;
......
......@@ -66,7 +66,8 @@ public:
uint64_t fileSize;
cta::common::dataStructures::MountPolicy policy;
time_t startTime;
optional<RetrieveActivityDescription> activityDescription;
optional<RetrieveActivityDescription> activityDescription;
optional<std::string> diskSystemName;
};
void addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentReference & agentReference, log::LogContext & lc);
// This version will check for existence of the job in the queue before
......@@ -97,6 +98,12 @@ public:
std::string address;
uint32_t copyNb;
uint64_t size;
struct ActivityDescription {
std::string diskInstanceName;
std::string activity;
};
optional<ActivityDescription> activity;
optional<std::string> diskSystemName;
};
std::list<JobDump> dumpJobs();
struct CandidateJobList {
......
......@@ -45,6 +45,7 @@ struct ContainerTraits<RetrieveQueue,C>
cta::common::dataStructures::MountPolicy policy;
serializers::RetrieveJobStatus status;
optional<RetrieveActivityDescription> activityDescription;
optional<std::string> diskSystemName;
typedef std::list<InsertedElement> list;
};
......@@ -59,6 +60,8 @@ struct ContainerTraits<RetrieveQueue,C>
std::string errorReportURL;
SchedulerDatabase::RetrieveJob::ReportType reportType;
RetrieveRequest::RepackInfo repackInfo;
optional<RetrieveQueue::JobDump::ActivityDescription> activity;
optional<std::string> diskSystemName;
};
struct PoppedElementsSummary;
struct PopCriteria {
......@@ -279,7 +282,7 @@ addReferencesAndCommit(Container &cont, typename InsertedElement::list &elemMemC
std::list<RetrieveQueue::JobToAdd> jobsToAdd;
for (auto &e : elemMemCont) {
RetrieveRequest &rr = *e.retrieveRequest;
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription});
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription, e.diskSystemName});
}
cont.addJobsAndCommit(jobsToAdd, agentRef, lc);
}
......@@ -292,7 +295,7 @@ addReferencesIfNecessaryAndCommit(Container& cont, typename InsertedElement::lis
std::list<RetrieveQueue::JobToAdd> jobsToAdd;
for (auto &e : elemMemCont) {
RetrieveRequest &rr = *e.retrieveRequest;
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription});
jobsToAdd.push_back({e.copyNb, e.fSeq, rr.getAddressIfSet(), e.filesize, e.policy, ::time(nullptr), e.activityDescription, e.diskSystemName});
}
cont.addJobsIfNecessaryAndCommit(jobsToAdd, agentRef, lc);
}
......@@ -387,6 +390,11 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container
e.archiveFile = u.get()->getArchiveFile();
e.rr = u.get()->getRetrieveRequest();
e.repackInfo = u.get()->getRepackInfo();
auto & rad = u.get()->getRetrieveActivityDescription();
if (rad) {
e.activity = RetrieveQueue::JobDump::ActivityDescription{ rad.value().diskInstanceName, rad.value().activity };
}
e.diskSystemName = u.get()->getDiskSystemName();
switch(u.get()->getJobStatus()) {
case serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure:
e.reportType = SchedulerDatabase::RetrieveJob::ReportType::FailureReport;
......@@ -492,7 +500,7 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
common::dataStructures::ArchiveFile(),
common::dataStructures::RetrieveRequest(),
"", SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired,
RetrieveRequest::RepackInfo()
RetrieveRequest::RepackInfo(), cjfq.activity, cjfq.diskSystemName
});
ret.summary.files++;
}
......
......@@ -69,7 +69,16 @@ RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t
ret.remainingFilesAfterCandidates = m_payload.retrievejobs_size();
for (auto & j: m_payload.retrievejobs()) {
if (!retrieveRequestsToSkip.count(j.address())) {
ret.candidates.push_back({j.address(), (uint16_t)j.copynb(), j.size()});
ret.candidates.push_back({j.address(), (uint16_t)j.copynb(), j.size(), nullopt, nullopt});
if (j.has_activity()) {
RetrieveQueue::JobDump::ActivityDescription ad;
ad.activity = j.activity();
ad.diskInstanceName = j.disk_instance_name();
ret.candidates.back().activity = ad;
}
if (j.has_destination_disk_system_name()) {
ret.candidates.back().diskSystemName = j.destination_disk_system_name();
}
ret.candidateBytes += j.size();
ret.candidateFiles ++;
}
......@@ -104,7 +113,9 @@ auto RetrieveQueueShard::removeJobs(const std::list<std::string>& jobsToRemove)
ret.removedJobs.back().size = j.size();
ret.removedJobs.back().startTime = j.starttime();
if (j.has_activity())
ret.removedJobs.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() };
ret.removedJobs.back().activityDescription = RetrieveQueue::JobDump::ActivityDescription{ j.disk_instance_name(), j.activity() };
if (j.has_destination_disk_system_name())
ret.removedJobs.back().diskSystemName = j.destination_disk_system_name();
ret.bytesRemoved += j.size();
totalSize -= j.size();
ret.jobsRemoved++;
......@@ -139,9 +150,12 @@ auto RetrieveQueueShard::dumpJobs() -> std::list<JobInfo> {
std::list<JobInfo> ret;
for (auto &j: m_payload.retrievejobs()) {
ret.emplace_back(JobInfo{j.size(), j.address(), (uint16_t)j.copynb(), j.priority(),
j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq(), nullopt});
j.minretrieverequestage(), j.maxdrivesallowed(), (time_t)j.starttime(), j.fseq(), nullopt, nullopt});
if (j.has_activity()) {
ret.back().activityDescription = JobInfo::ActivityDescription{ j.disk_instance_name(), j.activity() };
ret.back().activityDescription = RetrieveQueue::JobDump::ActivityDescription{ j.disk_instance_name(), j.activity() };
}
if (j.has_destination_disk_system_name()) {
ret.back().diskSystemName = j.destination_disk_system_name();
}
}
return ret;
......
......@@ -53,11 +53,8 @@ public:
uint64_t maxDrivesAllowed;
time_t startTime;
uint64_t fSeq;
struct ActivityDescription {
std::string diskInstanceName;
std::string activity;
};
optional<ActivityDescription> activityDescription;
optional<RetrieveQueue::JobDump::ActivityDescription> activityDescription;
optional<std::string> diskSystemName;
};
std::list<JobInfo> dumpJobs();
......
......@@ -56,7 +56,9 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
common::dataStructures::RetrieveRequest(),
"",
SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired,
RetrieveRequest::RepackInfo()
RetrieveRequest::RepackInfo(),
cjfq.activity,
cjfq.diskSystemName
});
ret.summary.bytes += cjfq.size;
ret.summary.files++;
......
......@@ -158,7 +158,7 @@ queueForFailure:;
objectstore::MountPolicySerDeser mp;
std::list<RetrieveQueue::JobToAdd> jta;
jta.push_back({activeCopyNb, activeFseq, getAddressIfSet(), m_payload.archivefile().filesize(),
mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt});
mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt, nullopt});
if (m_payload.has_activity_weight()) {
RetrieveActivityDescription activityDescription;
activityDescription.priority = m_payload.activity_weight().priority();
......@@ -226,7 +226,7 @@ queueForTransfer:;
mp.deserialize(m_payload.mountpolicy());
std::list<RetrieveQueue::JobToAdd> jta;
jta.push_back({bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(),
mp, (signed)m_payload.schedulerrequest().entrylog().time(), nullopt});
mp, (signed)m_payload.schedulerrequest().entrylog().time(), getActivity(), getDiskSystemName()});
if (m_payload.has_activity_weight()) {
RetrieveActivityDescription activityDescription;
activityDescription.priority = m_payload.activity_weight().priority();
......@@ -484,6 +484,25 @@ optional<RetrieveActivityDescription> RetrieveRequest::getActivity() {
return ret;
}
//------------------------------------------------------------------------------
// RetrieveRequest::setDiskSystemName()
//------------------------------------------------------------------------------
void RetrieveRequest::setDiskSystemName(const std::string& diskSystemName) {
checkPayloadWritable();
m_payload.set_disk_system_name(diskSystemName);
}
//------------------------------------------------------------------------------
// RetrieveRequest::getDiskSystemName()
//------------------------------------------------------------------------------
optional<std::string> RetrieveRequest::getDiskSystemName() {
checkPayloadReadable();
optional<std::string> ret;
if (m_payload.has_disk_system_name())
ret = m_payload.disk_system_name();
return ret;
}
//------------------------------------------------------------------------------
// RetrieveRequest::dumpJobs()
//------------------------------------------------------------------------------
......@@ -852,6 +871,15 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string
af.deserialize(payload.archivefile());
retRef.m_archiveFile = af;
retRef.m_jobStatus = j.status();
if (payload.has_activity_weight()) {
retRef.m_retrieveActivityDescription = RetrieveActivityDescription{
payload.activity_weight().priority(), payload.activity_weight().disk_instance_name(),
payload.activity_weight().activity(), payload.activity_weight().creation_time(),
payload.activity_weight().weight(), 0
};
}
if (payload.has_disk_system_name())
retRef.m_diskSystemName = payload.disk_system_name();
RetrieveRequest::updateLifecycleTiming(payload,j);
LifecycleTimingsSerDeser lifeCycleSerDeser;
lifeCycleSerDeser.deserialize(payload.lifecycle_timings());
......@@ -902,6 +930,20 @@ const RetrieveRequest::RepackInfo& RetrieveRequest::AsyncJobOwnerUpdater::getRep
return m_repackInfo;
}
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveActivityDescription()
//------------------------------------------------------------------------------
const optional<RetrieveActivityDescription>& RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveActivityDescription() {
return m_retrieveActivityDescription;
}
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncJobOwnerUpdater::getDiskSystemName()
//------------------------------------------------------------------------------
const optional<std::string>& RetrieveRequest::AsyncJobOwnerUpdater::getDiskSystemName() {
return m_diskSystemName;
}
//------------------------------------------------------------------------------
// RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveRequest()
//------------------------------------------------------------------------------
......
......@@ -220,6 +220,8 @@ public:
const common::dataStructures::RetrieveRequest &getRetrieveRequest();
const common::dataStructures::ArchiveFile &getArchiveFile();
const RepackInfo &getRepackInfo();
const optional<RetrieveActivityDescription> &getRetrieveActivityDescription();
const optional<std::string> &getDiskSystemName();
private:
std::function<std::string(const std::string &)> m_updaterCallback;
std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater;
......@@ -227,6 +229,8 @@ public:
common::dataStructures::ArchiveFile m_archiveFile;
RepackInfo m_repackInfo;
serializers::RetrieveJobStatus m_jobStatus;
optional<RetrieveActivityDescription> m_retrieveActivityDescription;
optional<std::string> m_diskSystemName;
};
// An owner updater factory. The owner MUST be previousOwner for the update to be executed.
AsyncJobOwnerUpdater *asyncUpdateJobOwner(uint32_t copyNumber, const std::string &owner, const std::string &previousOwner);
......@@ -238,6 +242,8 @@ public:
void setActivityIfNeeded(const cta::common::dataStructures::RetrieveRequest & retrieveRequest,
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria);
optional<RetrieveActivityDescription> getActivity();
void setDiskSystemName(const std::string & diskSystemName);
optional<std::string> getDiskSystemName();
cta::common::dataStructures::RetrieveFileQueueCriteria getRetrieveFileQueueCriteria();
cta::common::dataStructures::ArchiveFile getArchiveFile();
cta::common::dataStructures::EntryLog getEntryLog();
......
......@@ -159,7 +159,7 @@ void Sorter::executeRetrieveAlgorithm(const std::string vid, std::string& queueA
Sorter::RetrieveJob job = std::get<0>(jobToAdd->jobToQueue);
succeededJobs[job.jobDump.copyNb] = jobToAdd;
previousOwner = job.previousOwner->getAgentAddress();
jobsToAdd.push_back({job.retrieveRequest.get(),job.jobDump.copyNb,job.fSeq,job.fileSize,job.mountPolicy,job.jobDump.status,job.activityDescription});
jobsToAdd.push_back({job.retrieveRequest.get(),job.jobDump.copyNb,job.fSeq,job.fileSize,job.mountPolicy,job.jobDump.status,job.activityDescription,job.diskSystemName});
}
try{
algo.referenceAndSwitchOwnershipIfNecessary(vid,previousOwner,queueAddress,jobsToAdd,lc);
......
......@@ -126,6 +126,7 @@ public:
common::dataStructures::MountPolicy mountPolicy;
cta::objectstore::JobQueueType jobQueueType;
optional<RetrieveActivityDescription> activityDescription;
optional<std::string> diskSystemName;
};
/**
......
......@@ -422,6 +422,7 @@ message RetrieveRequest {
required bool isrepack = 9157;
optional RetrieveRequestRepackInfo repack_info = 9158;
optional LifecycleTimings lifecycle_timings = 9159;
optional string disk_system_name = 9161;
}
message ValueCountPair {
......@@ -476,6 +477,7 @@ message RetrieveJobPointer {
// For activity (if present), we need disk instance and activity name (priority is always provided)
optional string disk_instance_name = 3109;
optional string activity = 3110;
optional string destination_disk_system_name = 3111;
}
message RetrieveQueueShardPointer {
......
......@@ -55,7 +55,7 @@ void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::special
if (j.copyNb == job.copyNb) {
auto criteria = request.getRetrieveFileQueueCriteria();
jtal.push_back({j.copyNb, j.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize,
criteria.mountPolicy, request.getEntryLog().time, request.getActivity()});
criteria.mountPolicy, request.getEntryLog().time, request.getActivity(), request.getDiskSystemName()});
request.setActiveCopyNumber(j.copyNb);
request.setOwner(queueAddress);
goto jobAdded;
......
......@@ -1103,7 +1103,7 @@ void OStoreDB::setRetrieveJobBatchReportedToUser(std::list<cta::SchedulerDatabas
insertedElements.emplace_back(CaRQF::InsertedElement{
&j.job->m_retrieveRequest, tf_it->copyNb, tf_it->fSeq, tf_it->compressedSize,
common::dataStructures::MountPolicy(), serializers::RetrieveJobStatus::RJS_Failed,
j.job->m_activityDescription
j.job->m_activityDescription, j.job->m_diskSystemName
});
}
try {
......@@ -1129,8 +1129,10 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue
//------------------------------------------------------------------------------
// OStoreDB::queueRetrieve()
//------------------------------------------------------------------------------
std::string OStoreDB::queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, log::LogContext &logContext) {
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria,
const optional<std::string> diskSystemName, log::LogContext& logContext) {
assertAgentAddressSet();
auto mutexForHelgrind = cta::make_unique<cta::threading::Mutex>();
cta::threading::MutexLocker mlForHelgrind(*mutexForHelgrind);
......@@ -1182,6 +1184,7 @@ std::string OStoreDB::queueRetrieve(cta::common::dataStructures::RetrieveRequest
rReq->setRetrieveFileQueueCriteria(criteria);
rReq->setActivityIfNeeded(rqst, criteria);
rReq->setCreationTime(rqst.creationLog.time);
if (diskSystemName) rReq->setDiskSystemName(diskSystemName.value());
// Find the job corresponding to the vid (and check we indeed have one).
auto jobs = rReq->getJobs();
objectstore::RetrieveRequest::JobDump job;
......@@ -3439,9 +3442,8 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo(
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::getNextJobBatch()
//------------------------------------------------------------------------------
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount::
getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext &logContext)
{
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount::getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, disk::DiskSystemList& diskSystemList, log::LogContext& logContext) {
typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> RQAlgos;
RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
......@@ -3607,7 +3609,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD
insertedRequests.push_back(RQTRTRFSAlgo::InsertedElement{&req->m_retrieveRequest, req->selectedCopyNb,
req->archiveFile.tapeFiles.at(req->selectedCopyNb).fSeq, req->archiveFile.fileSize,
cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,
serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription});
serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess, req->m_activityDescription, req->m_diskSystemName});
requestToJobMap[&req->m_retrieveRequest] = req;
}
RQTRTRFSAlgo rQTRTRFSAlgo(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
......@@ -4487,7 +4489,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log::
CaRqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaRqtr::InsertedElement{
&m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy,
serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription
serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription, m_diskSystemName
});
m_retrieveRequest.commit();
rel.release();
......@@ -4554,7 +4556,7 @@ void OStoreDB::RetrieveJob::failTransfer(const std::string &failureReason, log::
CaRqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaRqtr::InsertedElement{
&m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy, serializers::RetrieveJobStatus::RJS_ToTransferForUser,
m_activityDescription
m_activityDescription, m_diskSystemName
});
CaRqtr caRqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
......@@ -4618,7 +4620,7 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo
CaRqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaRqtr::InsertedElement{
&m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy,
serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure, m_activityDescription
serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure, m_activityDescription, m_diskSystemName
});
caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc);
log::ScopedParamContainer params(lc);
......@@ -4637,7 +4639,7 @@ void OStoreDB::RetrieveJob::failReport(const std::string &failureReason, log::Lo
CaRqtr::InsertedElement::list insertedElements;
insertedElements.push_back(CaRqtr::InsertedElement{
&m_retrieveRequest, tf.copyNb, tf.fSeq, af.fileSize, rfqc.mountPolicy,
serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription
serializers::RetrieveJobStatus::RJS_Failed, m_activityDescription, m_diskSystemName
});
caRqtr.referenceAndSwitchOwnership(tf.vid, insertedElements, lc);
log::ScopedParamContainer params(lc);
......
......@@ -217,7 +217,8 @@ public:
OStoreDB & m_oStoreDB;
public:
const MountInfo & getMountInfo() override;
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) override;
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested,
disk::DiskSystemList& diskSystemList, log::LogContext& logContext) override;
void complete(time_t completionTime) override;
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override;
......@@ -258,6 +259,7 @@ public:
std::unique_ptr<objectstore::RetrieveRequest::AsyncJobSucceedForRepackReporter> m_jobSucceedForRepackReporter;
objectstore::RetrieveRequest::RepackInfo m_repackInfo;
optional<objectstore::RetrieveActivityDescription> m_activityDescription;
optional<std::string> m_diskSystemName;
};
static RetrieveJob * castFromSchedDBJob(SchedulerDatabase::RetrieveJob * job);
......@@ -296,8 +298,9 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies);
CTA_GENERATE_EXCEPTION_CLASS(TapeCopyNumberOutOfRange);
std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) override;
std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria,
const optional<std::string> diskSystemName, log::LogContext& logContext) override;
std::list<RetrieveRequestDump> getRetrieveRequestsByVid(const std::string& vid) const override;
......
......@@ -202,9 +202,11 @@ public:
return m_OStoreDB.getRetrieveQueueStatistics(criteria, vidsToConsider);
}
std::string queueRetrieve(common::dataStructures::RetrieveRequest& rqst,
const common::dataStructures::RetrieveFileQueueCriteria &criteria, log::LogContext &logContext) override {
return m_OStoreDB.queueRetrieve(rqst, criteria, logContext);
std::string queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst,
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria,
const optional<std::string> diskSystemName, log::LogContext& logContext) override {
return m_OStoreDB.queueRetrieve(rqst, criteria, diskSystemName, logContext);
}
......
......@@ -110,7 +110,7 @@ std::string cta::RetrieveMount::getMediaType() const
}
//------------------------------------------------------------------------------
// getVo()
// getVendor()
//------------------------------------------------------------------------------
std::string cta::RetrieveMount::getVendor() const
{
......@@ -121,6 +121,9 @@ std::string cta::RetrieveMount::getVendor() const
return sVendor.str();
}
//------------------------------------------------------------------------------
// getCapacityInBytes()
//------------------------------------------------------------------------------
uint64_t cta::RetrieveMount::getCapacityInBytes() const {
if(!m_dbMount.get())
throw exception::Exception("In cta::RetrieveMount::getVendor(): got NULL dbMount");
......@@ -134,9 +137,12 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc
log::LogContext& logContext) {
if (!m_sessionRunning)
throw SessionNotRunning("In RetrieveMount::getNextJobBatch(): trying to get job from complete/not started session");
// Get the current file systems list from the catalogue
disk::DiskSystemList diskSystemList;
if (m_catalogue) diskSystemList = m_catalogue->getDiskSystems();
// Try and get a new job from the DB
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch(m_dbMount->getNextJobBatch(filesRequested,
bytesRequested, logContext));
bytesRequested, diskSystemList, logContext));
std::list<std::unique_ptr<RetrieveJob>> ret;
// We prepare the response
for (auto & sdrj: dbJobBatch) {
......@@ -227,6 +233,17 @@ cta::disk::DiskReporter* cta::RetrieveMount::createDiskReporter(std::string& URL
return m_reporterFactory.createDiskReporter(URL);
}
//------------------------------------------------------------------------------
// setCatalogue()
//------------------------------------------------------------------------------
void cta::RetrieveMount::setCatalogue(catalogue::Catalogue* catalogue) {
if (m_catalogue)
throw exception::Exception("In RetrieveMount::setCatalogue(): catalogue already set.");
if (!catalogue)
throw exception::Exception("In RetrieveMount::setCatalogue(): trying to set a null catalogue.");
m_catalogue = catalogue;
}
//------------------------------------------------------------------------------
// tapeComplete()
//------------------------------------------------------------------------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment