Commit 45f542e9 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Created all the queues mentionned in #447 issue. The sorter can use them now.

Renamed some status (#447 issue)
Harmonized the name of the Archive and Retrieve queues in the cta.proto file and made the changes in the C++ code
parent d31f94f2
......@@ -35,7 +35,7 @@
namespace unitTests {
void fillRetrieveRequests(
typename cta::objectstore::ContainerAlgorithms<cta::objectstore::RetrieveQueue,cta::objectstore::RetrieveQueueToTransfer>::InsertedElement::list &requests,
typename cta::objectstore::ContainerAlgorithms<cta::objectstore::RetrieveQueue,cta::objectstore::RetrieveQueueToTransferForUser>::InsertedElement::list &requests,
std::list<std::unique_ptr<cta::objectstore::RetrieveRequest> >& requestPtrs, //List to avoid memory leak on ArchiveQueueAlgorithms test
cta::objectstore::BackendVFS &be,
cta::objectstore::AgentReference &agentRef)
......@@ -73,8 +73,8 @@ void fillRetrieveRequests(
rqc.mountPolicy.retrieveMinRequestAge = 1;
rqc.mountPolicy.retrievePriority = 1;
requestPtrs.emplace_back(new cta::objectstore::RetrieveRequest(rrAddr, be));
requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement{
requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransfer
requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser>::InsertedElement{
requestPtrs.back().get(), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransferForUser
});
auto &rr = *requests.back().retrieveRequest;
rr.initialize();
......@@ -117,7 +117,7 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
rel.release();
agent.initialize();
agent.insertAndRegisterSelf(lc);
ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransfer>::InsertedElement::list requests;
ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser>::InsertedElement::list requests;
std::list<std::unique_ptr<cta::objectstore::ArchiveRequest>> archiveRequests;
for (size_t i=0; i<10; i++) {
std::string arAddr = agentRef.nextId("ArchiveRequest");
......@@ -136,7 +136,7 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
aFile.fileSize = 667;
aFile.storageClass = "sc";
archiveRequests.emplace_back(new cta::objectstore::ArchiveRequest(arAddr, be));
requests.emplace_back(ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransfer>::InsertedElement{archiveRequests.back().get(), 1, aFile, mp,
requests.emplace_back(ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser>::InsertedElement{archiveRequests.back().get(), 1, aFile, mp,
cta::nullopt});
auto & ar=*requests.back().archiveRequest;
auto copyNb = requests.back().copyNb;
......@@ -151,10 +151,10 @@ TEST(ObjectStore, ArchiveQueueAlgorithms) {
ar.setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr)));
ar.insert();
}
ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransfer> archiveAlgos(be, agentRef);
ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser> archiveAlgos(be, agentRef);
archiveAlgos.referenceAndSwitchOwnership("Tapepool", requests, lc);
// Now get the requests back
ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::PopCriteria popCriteria;
ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PopCriteria popCriteria;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
popCriteria.files = 100;
auto poppedJobs = archiveAlgos.popNextBatch("Tapepool", popCriteria, lc);
......@@ -188,7 +188,7 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
agent.initialize();
agent.insertAndRegisterSelf(lc);
std::list<std::unique_ptr<RetrieveRequest> > requestsPtrs;
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests;
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser>::InsertedElement::list requests;
fillRetrieveRequests(requests, requestsPtrs, be, agentRef); //memory leak here
{
......@@ -207,18 +207,18 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
rel2.release();
agent2.initialize();
agent2.insertAndRegisterSelf(lc);
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests2;
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser>::InsertedElement::list requests2;
std::list<std::unique_ptr<RetrieveRequest> > requestsPtrs2;
fillRetrieveRequests(requests2, requestsPtrs2,be2, agentRef2);
auto a1 = agentRef2.getAgentAddress();
auto a2 = agentRef2.getAgentAddress();
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos2(be2, agentRef2);
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> retrieveAlgos2(be2, agentRef2);
retrieveAlgos2.referenceAndSwitchOwnershipIfNecessary("VID",
a2, a1, requests2, lc);
}
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos(be, agentRef);
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> retrieveAlgos(be, agentRef);
try {
ASSERT_EQ(requests.size(), 10);
......@@ -226,19 +226,19 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
agentRef.getAgentAddress(), requests, lc);
// Now get the requests back
ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PopCriteria popCriteria;
ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::PopCriteria popCriteria;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
popCriteria.files = 100;
auto poppedJobs = retrieveAlgos.popNextBatch("VID", popCriteria, lc);
ASSERT_EQ(poppedJobs.summary.files, 10);
// Validate that the summary has the same information as the popped elements
ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PoppedElementsSummary s;
ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::PoppedElementsSummary s;
for(auto &e: poppedJobs.elements) {
s += ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::getElementSummary(e);
s += ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::getElementSummary(e);
}
ASSERT_EQ(s, poppedJobs.summary);
} catch (ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::OwnershipSwitchFailure & ex) {
} catch (ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::OwnershipSwitchFailure & ex) {
for (auto & e: ex.failedElements) {
try {
throw e.failure;
......
......@@ -221,7 +221,7 @@ void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReferen
RootEntry re(m_objectStore);
ScopedSharedLock rel (re);
re.fetch();
auto tpd=re.dumpArchiveQueues(JobQueueType::JobsToTransfer);
auto tpd=re.dumpArchiveQueues(JobQueueType::JobsToTransferForUser);
for (auto tp=tpd.begin(); tp!=tpd.end(); tp++) {
if (tp->address == getAddressIfSet()) {
setOwner(re.getAddressIfSet());
......
......@@ -141,11 +141,11 @@ public:
static const uint64_t c_maxShardSize = 25000;
};
class ArchiveQueueToTransfer: public ArchiveQueue {
class ArchiveQueueToTransferForUser: public ArchiveQueue {
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToReport: public ArchiveQueue {
class ArchiveQueueToReportForUser: public ArchiveQueue {
using ArchiveQueue::ArchiveQueue;
};
......@@ -160,5 +160,9 @@ class ArchiveQueueToTransferForRepack: public ArchiveQueue{
class ArchiveQueueToReportToRepackForSuccess : public ArchiveQueue{
using ArchiveQueue::ArchiveQueue;
};
class ArchiveQueueToReportToRepackForFailure: public ArchiveQueue{
using ArchiveQueue::ArchiveQueue;
};
}}
......@@ -407,10 +407,10 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container
e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL();
switch(u->get()->getJobStatus()) {
case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
case serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer:
e->reportType = SchedulerDatabase::ArchiveJob::ReportType::CompletionReport;
break;
case serializers::ArchiveJobStatus::AJS_ToReportForFailure:
case serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure:
e->reportType = SchedulerDatabase::ArchiveJob::ReportType::FailureReport;
break;
default:
......@@ -442,7 +442,7 @@ getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary {
// ArchiveQueue full specialisations for ContainerTraits.
template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::PopCriteria {
struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PopCriteria {
uint64_t files;
uint64_t bytes;
PopCriteria(uint64_t f = 0, uint64_t b = 0) : files(f), bytes(b) {}
......@@ -455,7 +455,7 @@ struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::PopCriteria {
};
template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::PoppedElementsSummary {
struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PoppedElementsSummary {
uint64_t files;
uint64_t bytes;
PoppedElementsSummary(uint64_t f = 0, uint64_t b = 0) : files(f), bytes(b) {}
......@@ -479,8 +479,8 @@ struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::PoppedElementsSumma
};
template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::QueueType {
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransfer;
struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::QueueType {
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransferForUser;
};
template<>
......@@ -489,7 +489,7 @@ struct ContainerTraits<ArchiveQueue,ArchiveQueueFailed>::QueueType {
};
template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::QueueType {
struct ContainerTraits<ArchiveQueue,ArchiveQueueToReportForUser>::QueueType {
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReportToUser;
};
......@@ -502,4 +502,10 @@ template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForSuccess>::QueueType{
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReportToRepackForSuccess;
};
template<>
struct ContainerTraits<ArchiveQueue, ArchiveQueueToReportToRepackForFailure>::QueueType{
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReportToRepackForFailure;
};
}} // namespace cta::objectstore
......@@ -30,4 +30,12 @@ const std::string ContainerTraits<ArchiveQueue,ArchiveQueueFailed>::c_containerT
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueFailed>::c_identifierType = "tapepool";
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueFailed>::
getContainerSummary(Container& cont) -> ContainerSummary {
ContainerSummary ret;
ret.JobsSummary::operator=(cont.getJobsSummary());
return ret;
}
}} // namespace cta::objectstore
......@@ -25,19 +25,19 @@ namespace cta { namespace objectstore {
// ArchiveQueueToReport full specialisations for ContainerTraits.
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::c_containerTypeName = "ArchiveQueueToReport";
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToReportForUser>::c_containerTypeName = "ArchiveQueueToReportForUser";
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::c_identifierType = "tapepool";
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToReportForUser>::c_identifierType = "tapepool";
template<>
void ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::PoppedElementsBatch::
void ContainerTraits<ArchiveQueue,ArchiveQueueToReportForUser>::PoppedElementsBatch::
addToLog(log::ScopedParamContainer &params) {
params.add("files", summary.files);
}
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::
auto ContainerTraits<ArchiveQueue,ArchiveQueueToReportForUser>::
getPoppingElementsCandidates(Container& cont, PopCriteria& unfulfilledCriteria, ElementsToSkipSet& elemtsToSkip,
log::LogContext& lc) -> PoppedElementsBatch
{
......@@ -61,14 +61,14 @@ getPoppingElementsCandidates(Container& cont, PopCriteria& unfulfilledCriteria,
}
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::PopCriteria::
auto ContainerTraits<ArchiveQueue,ArchiveQueueToReportForUser>::PopCriteria::
operator-=(const PoppedElementsSummary& pes) -> PopCriteria & {
files -= pes.files;
return *this;
}
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::
auto ContainerTraits<ArchiveQueue,ArchiveQueueToReportForUser>::
getContainerSummary(Container& cont) -> ContainerSummary {
ContainerSummary ret;
ret.JobsSummary::operator=(cont.getJobsSummary());
......
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ArchiveQueueAlgorithms.hpp"
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForFailure>::c_containerTypeName = "ArchiveQueueToReportToRepackForFailure";
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForFailure>::c_identifierType = "tapepool";
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForFailure>::
getContainerSummary(Container& cont) -> ContainerSummary {
ContainerSummary ret;
ret.JobsSummary::operator=(cont.getJobsSummary());
return ret;
}
}}
\ No newline at end of file
......@@ -20,5 +20,40 @@
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForSuccess>::c_containerTypeName = "ArchiveQueueToReportToRepackForSuccess";
}
}
\ No newline at end of file
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForSuccess>::c_identifierType = "tapepool";
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForSuccess>::
getContainerSummary(Container& cont) -> ContainerSummary {
ContainerSummary ret;
ret.JobsSummary::operator=(cont.getJobsSummary());
return ret;
}
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForSuccess>::
getPoppingElementsCandidates(Container& cont, PopCriteria& unfulfilledCriteria, ElementsToSkipSet& elemtsToSkip,
log::LogContext& lc) -> PoppedElementsBatch
{
PoppedElementsBatch ret;
auto candidateJobsFromQueue=cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files, elemtsToSkip);
for (auto &cjfq: candidateJobsFromQueue.candidates) {
ret.elements.emplace_back(PoppedElement());
PoppedElement & elem = ret.elements.back();
elem.archiveRequest = cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore);
elem.copyNb = cjfq.copyNb;
elem.bytes = cjfq.size;
elem.archiveFile = common::dataStructures::ArchiveFile();
elem.srcURL = "";
elem.archiveReportURL = "";
elem.errorReportURL = "";
elem.latestError = "";
elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::Report;
ret.summary.files++;
}
return ret;
}
}}
\ No newline at end of file
......@@ -24,13 +24,13 @@ namespace cta { namespace objectstore {
// ArchiveQueue full specialisations for ContainerTraits.
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::c_containerTypeName = "ArchiveQueueToTransfer";
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::c_containerTypeName = "ArchiveQueueToTransferForUser";
template<>
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::c_identifierType = "tapepool";
const std::string ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::c_identifierType = "tapepool";
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::
auto ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::
getContainerSummary(Container& cont) -> ContainerSummary {
ContainerSummary ret;
ret.JobsSummary::operator=(cont.getJobsSummary());
......@@ -38,7 +38,7 @@ getContainerSummary(Container& cont) -> ContainerSummary {
}
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::
auto ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::
getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip,
log::LogContext& lc) -> PoppedElementsBatch
{
......@@ -54,7 +54,7 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
}
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::
auto ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::
getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary {
PoppedElementsSummary ret;
ret.bytes = poppedElement.bytes;
......@@ -63,7 +63,7 @@ getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary {
}
template<>
void ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>::PoppedElementsBatch::
void ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PoppedElementsBatch::
addToLog(log::ScopedParamContainer &params) {
params.add("bytes", summary.bytes)
.add("files", summary.files);
......
......@@ -73,7 +73,7 @@ void ArchiveRequest::addJob(uint32_t copyNumber,
checkPayloadWritable();
auto *j = m_payload.add_jobs();
j->set_copynb(copyNumber);
j->set_status(serializers::ArchiveJobStatus::AJS_ToTransfer);
j->set_status(serializers::ArchiveJobStatus::AJS_ToTransferForUser);
j->set_tapepool(tapepool);
j->set_owner(initialOwner);
j->set_archivequeueaddress("");
......@@ -94,22 +94,26 @@ JobQueueType ArchiveRequest::getJobQueueType(uint32_t copyNumber) {
for (auto &j: m_payload.jobs()) {
if (j.copynb() == copyNumber) {
switch (j.status()) {
case serializers::ArchiveJobStatus::AJS_ToTransfer:
return JobQueueType::JobsToTransfer;
case serializers::ArchiveJobStatus::AJS_ToTransferForUser:
return JobQueueType::JobsToTransferForUser;
case serializers::ArchiveJobStatus::AJS_ToTransferForRepack:
return JobQueueType::JobsToTransferForRepack;
case serializers::ArchiveJobStatus::AJS_Complete:
throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Complete jobs are not queueable. They are finished and pend siblings completion.");
case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
case serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer:
// We should report a success...
return JobQueueType::JobsToReportToUser;
case serializers::ArchiveJobStatus::AJS_ToReportForFailure:
case serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure:
// We should report a failure. The report queue can be shared.
return JobQueueType::JobsToReportToUser;
case serializers::ArchiveJobStatus::AJS_Failed:
return JobQueueType::FailedJobs;
case serializers::ArchiveJobStatus::AJS_Abandoned:
throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Abandoned jobs are not queueable. They are finished and pend siblings completion.");
case serializers::ArchiveJobStatus::AJS_ToReportToRepackForSuccess:
return JobQueueType::JobsToReportToRepackForSuccess;
case serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure:
return JobQueueType::JobsToReportToRepackForFailure;
}
}
}
......@@ -140,7 +144,7 @@ auto ArchiveRequest::addTransferFailure(uint32_t copyNumber,
return determineNextStep(copyNumber, JobEvent::TransferFailed, lc);
} else {
EnqueueingNextStep ret;
ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToTransfer;
ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToTransferForUser;
// Decide if we want the job to have a chance to come back to this mount (requeue) or not. In the latter
// case, the job will remain owned by this session and get garbage collected.
if (j.retrieswithinmount() >= j.maxretrieswithinmount())
......@@ -379,8 +383,8 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
auto * jl = m_payload.mutable_jobs();
bool anythingGarbageCollected=false;
using serializers::ArchiveJobStatus;
std::set<ArchiveJobStatus> statusesImplyingQueueing ({ArchiveJobStatus::AJS_ToTransfer, ArchiveJobStatus::AJS_ToReportForTransfer,
ArchiveJobStatus::AJS_ToReportForFailure, ArchiveJobStatus::AJS_Failed});
std::set<ArchiveJobStatus> statusesImplyingQueueing ({ArchiveJobStatus::AJS_ToTransferForUser, ArchiveJobStatus::AJS_ToReportToUserForTransfer,
ArchiveJobStatus::AJS_ToReportToUserForFailure, ArchiveJobStatus::AJS_Failed});
for (auto j=jl->begin(); j!=jl->end(); j++) {
auto owner=j->owner();
auto status=j->status();
......@@ -643,7 +647,7 @@ ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTran
auto * jl = payload.mutable_jobs();
bool otherJobsToTransfer = false;
for (auto j=jl->begin(); j!=jl->end(); j++) {
if (j->copynb() != copyNumber && j->status() == serializers::ArchiveJobStatus::AJS_ToTransfer)
if (j->copynb() != copyNumber && j->status() == serializers::ArchiveJobStatus::AJS_ToTransferForUser)
otherJobsToTransfer = true;
}
for (auto j=jl->begin(); j!=jl->end(); j++) {
......@@ -657,7 +661,7 @@ ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTran
} else {
// We will report success with this job as it is the last to transfer and no report (for failure)
// happened.
j->set_status(serializers::ArchiveJobStatus::AJS_ToReportForTransfer);
j->set_status(serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer);
retRef.m_doReportTransferSuccess = true;
}
oh.set_payload(payload.SerializePartialAsString());
......@@ -713,10 +717,10 @@ std::string ArchiveRequest::getJobOwner(uint32_t copyNumber) {
JobQueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& status) {
using serializers::ArchiveJobStatus;
switch(status) {
case ArchiveJobStatus::AJS_ToTransfer:
return JobQueueType::JobsToTransfer;
case ArchiveJobStatus::AJS_ToReportForTransfer:
case ArchiveJobStatus::AJS_ToReportForFailure:
case ArchiveJobStatus::AJS_ToTransferForUser:
return JobQueueType::JobsToTransferForUser;
case ArchiveJobStatus::AJS_ToReportToUserForTransfer:
case ArchiveJobStatus::AJS_ToReportToUserForFailure:
return JobQueueType::JobsToReportToUser;
case ArchiveJobStatus::AJS_Failed:
return JobQueueType::FailedJobs;
......@@ -730,11 +734,11 @@ JobQueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& s
//------------------------------------------------------------------------------
std::string ArchiveRequest::statusToString(const serializers::ArchiveJobStatus& status) {
switch(status) {
case serializers::ArchiveJobStatus::AJS_ToTransfer:
case serializers::ArchiveJobStatus::AJS_ToTransferForUser:
return "ToTransfer";
case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
case serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer:
return "ToReportForTransfer";
case serializers::ArchiveJobStatus::AJS_ToReportForFailure:
case serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure:
return "ToReportForFailure";
case serializers::ArchiveJobStatus::AJS_Complete:
return "Complete";
......@@ -792,7 +796,7 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE
// Check status compatibility with event.
switch (jobEvent) {
case JobEvent::TransferFailed:
if (*currentStatus != ArchiveJobStatus::AJS_ToTransfer) {
if (*currentStatus != ArchiveJobStatus::AJS_ToTransferForUser) {
// Wrong status, but the context leaves no ambiguity. Just warn.
log::ScopedParamContainer params(lc);
params.add("event", eventToString(jobEvent))
......@@ -802,7 +806,7 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE
}
break;
case JobEvent::ReportFailed:
if (*currentStatus != ArchiveJobStatus::AJS_ToReportForFailure && *currentStatus != ArchiveJobStatus::AJS_ToReportForTransfer) {
if (*currentStatus != ArchiveJobStatus::AJS_ToReportToUserForFailure && *currentStatus != ArchiveJobStatus::AJS_ToReportToUserForTransfer) {
// Wrong status, but end status will be the same anyway.
log::ScopedParamContainer params(lc);
params.add("event", eventToString(jobEvent))
......@@ -819,7 +823,7 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE
if (!m_payload.reportdecided()) {
m_payload.set_reportdecided(true);
ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReport;
ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToReportForFailure;
ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure;
} else {
ret.nextStep = EnqueueingNextStep::NextStep::StoreInFailedJobsContainer;
ret.nextStatus = serializers::ArchiveJobStatus::AJS_Failed;
......
......@@ -69,6 +69,7 @@ add_library (ctaobjectstore SHARED
ArchiveQueueToReportAlgorithms.cpp
ArchiveQueueFailedAlgorithms.cpp
ArchiveQueueToReportToRepackForSuccessAlgorithms.cpp
ArchiveQueueToReportToRepackForFailureAlgorithms.cpp
ArchiveQueueToTransferForRepackAlgorithms.cpp
RetrieveQueue.cpp
RetrieveQueueShard.cpp
......@@ -76,6 +77,8 @@ add_library (ctaobjectstore SHARED
RetrieveQueueToReportAlgorithms.cpp
RetrieveQueueFailedAlgorithms.cpp
RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp
RetrieveQueueToReportToRepackForFailureAlgorithms.cpp
RetrieveQueueToTransferForRepackAlgorithms.cpp
JobQueueType.cpp
Sorter.cpp
ArchiveRequest.cpp
......
......@@ -329,7 +329,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
// Get the list of vids for non failed tape files.
std::set<std::string> candidateVids;
for (auto & j: rr->dumpJobs()) {
if(j.status==RetrieveJobStatus::RJS_ToTransfer) {
if(j.status==RetrieveJobStatus::RJS_ToTransferForUser) {
candidateVids.insert(rr->getArchiveFile().tapeFiles.at(j.copyNb).vid);
}
}
......@@ -358,7 +358,7 @@ void GarbageCollector::OwnedObjectSorter::sortFetchedObjects(Agent& agent, std::
otherObjects.emplace_back(new GenericObject(rr->getAddressIfSet(), objectStore));
break;
}
retrieveQueuesAndRequests[std::make_tuple(vid, JobQueueType::JobsToTransfer)].emplace_back(rr);
retrieveQueuesAndRequests[std::make_tuple(vid, JobQueueType::JobsToTransferForUser)].emplace_back(rr);
log::ScopedParamContainer params3(lc);
// Find copyNb for logging
size_t copyNb = std::numeric_limits<size_t>::max();
......@@ -408,7 +408,7 @@ void GarbageCollector::OwnedObjectSorter::lockFetchAndUpdateArchiveJobs(Agent& a
requestsList.pop_front();
}
utils::Timer t;
typedef ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransfer> AqAlgos;
typedef ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser> AqAlgos;
AqAlgos aqcl(objectStore, agentReference);
decltype(aqcl)::InsertedElement::list jobsToAdd;
for (auto & ar: currentJobBatch) {
......
......@@ -341,7 +341,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
re.fetch();
std::stringstream tapePoolName;
tapePoolName << "TapePool" << i;
tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef, cta::objectstore::JobQueueType::JobsToTransfer);
tpAddr[i] = re.addOrGetArchiveQueueAndCommit(tapePoolName.str(), agentRef, cta::objectstore::JobQueueType::JobsToTransferForUser);
cta::objectstore::ArchiveQueue aq(tpAddr[i], be);
}
// Create the various ATFR's, stopping one step further each time.
......@@ -463,7 +463,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
std::list<std::string> tapePools = { "TapePool0", "TapePool1" };
for (auto & tp: tapePools) {
// Empty queue
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp, cta::objectstore::JobQueueType::JobsToTransfer), be);
cta::objectstore::ArchiveQueue aq(re.getArchiveQueueAddress(tp, cta::objectstore::JobQueueType::JobsToTransferForUser), be);
cta::objectstore::ScopedExclusiveLock aql(aq);
aq.fetch();
std::list<std::string> ajtr;
......@@ -473,7 +473,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
aq.removeJobsAndCommit(ajtr);
aql.release();
// Remove queues from root
re.removeArchiveQueueAndCommit(tp, cta::objectstore::JobQueueType::JobsToTransfer, lc);
re.removeArchiveQueueAndCommit(tp, cta::objectstore::JobQueueType::JobsToTransferForUser, lc);
}
ASSERT_NO_THROW(re.removeAgentRegisterAndCommit(lc));
......@@ -533,7 +533,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
re.fetch();
std::stringstream vid;
vid << "Tape" << i;
tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef, cta::objectstore::JobQueueType::JobsToTransfer);
tAddr[i] = re.addOrGetRetrieveQueueAndCommit(vid.str(), agentRef, cta::objectstore::JobQueueType::JobsToTransferForUser);
cta::objectstore::RetrieveQueue rq(tAddr[i], be);
}
// Create the various ATFR's, stopping one step further each time.
......@@ -646,7 +646,7 @@ TEST(ObjectStore, GarbageCollectorRetrieveRequest) {
std::list<std::string> retrieveQueues = { "Tape0", "Tape1" };