Commit 20dd06da authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Set Repack Retrieve Request status as RJS_Succeeded

parent 9fe56e88
......@@ -488,7 +488,7 @@ struct ContainerTraits<ArchiveQueue,ArchiveQueueFailed>::QueueType {
template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToReport>::QueueType {
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReport;
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReportToUser;
};
}} // namespace cta::objectstore
......@@ -100,10 +100,10 @@ JobQueueType ArchiveRequest::getJobQueueType(uint16_t copyNumber) {
throw JobNotQueueable("In ArchiveRequest::getJobQueueType(): Complete jobs are not queueable. They are finished and pend siblings completion.");
case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
// We should report a success...
return JobQueueType::JobsToReport;
return JobQueueType::JobsToReportToUser;
case serializers::ArchiveJobStatus::AJS_ToReportForFailure:
// We should report a failure. The report queue can be shared.
return JobQueueType::JobsToReport;
return JobQueueType::JobsToReportToUser;
case serializers::ArchiveJobStatus::AJS_Failed:
return JobQueueType::FailedJobs;
case serializers::ArchiveJobStatus::AJS_Abandoned:
......@@ -715,7 +715,7 @@ JobQueueType ArchiveRequest::getQueueType(const serializers::ArchiveJobStatus& s
return JobQueueType::JobsToTransfer;
case ArchiveJobStatus::AJS_ToReportForTransfer:
case ArchiveJobStatus::AJS_ToReportForFailure:
return JobQueueType::JobsToReport;
return JobQueueType::JobsToReportToUser;
case ArchiveJobStatus::AJS_Failed:
return JobQueueType::FailedJobs;
default:
......
......@@ -80,6 +80,7 @@ add_library (ctaobjectstore SHARED
RepackRequest.cpp
RepackQueue.cpp
RepackQueuePendingAlgorithms.cpp
RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp
RepackQueueToExpandAlgorithms.cpp
RepackQueueType.cpp
BackendVFS.cpp
......
......@@ -24,10 +24,14 @@ std::string toString(JobQueueType queueType) {
switch (queueType) {
case JobQueueType::FailedJobs:
return "failedJobs";
case JobQueueType::JobsToReport:
return "jobsToReport";
case JobQueueType::JobsToReportToUser:
return "JobsToReportToUser";
case JobQueueType::JobsToTransfer:
return "jobsToTranfer";
case JobQueueType::JobsToReportToRepackForSuccess:
return "JobsToReportToRepackForSuccess";
case JobQueueType::JobsToReportToRepackForFailure:
return "JobsToReportToRepackForFailure";
default:
return "Unknown queue type.";
}
......
......@@ -21,6 +21,6 @@
#include <string>
namespace cta { namespace objectstore {
enum class JobQueueType { JobsToTransfer, FailedJobs, JobsToReport };
enum class JobQueueType { JobsToTransfer, FailedJobs, JobsToReportToUser, JobsToReportToRepackForSuccess, JobsToReportToRepackForFailure };
std::string toString(JobQueueType queueType);
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -20,6 +20,7 @@
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
#include "RetrieveQueue.hpp"
namespace cta { namespace objectstore {
......@@ -79,5 +80,4 @@ class RepackQueuePending: public RepackQueue {
class RepackQueueToExpand: public RepackQueue {
using RepackQueue::RepackQueue;
};
}} // namespace cta::objectstore
......@@ -426,11 +426,11 @@ getElementSummary(const PoppedElement& poppedElement) -> PoppedElementsSummary {
template<>
struct ContainerTraits<RepackQueue,RepackQueuePending>::QueueType{
objectstore::RepackQueueType value = objectstore::RepackQueueType::Pending;
objectstore::RepackQueueType value = objectstore::RepackQueueType::Pending;
};
template<>
struct ContainerTraits<RepackQueue,RepackQueueToExpand>::QueueType{
objectstore::RepackQueueType value = objectstore::RepackQueueType::ToExpand;
objectstore::RepackQueueType value = objectstore::RepackQueueType::ToExpand;
};
}} // namespace cta::objectstore
......@@ -160,4 +160,8 @@ public:
template<typename...Ts> RetrieveQueueFailed(Ts&...args): RetrieveQueue(args...) {}
};
class RetrieveQueueToReportToRepackForSuccess : public RetrieveQueue {
public:
template<typename...Ts> RetrieveQueueToReportToRepackForSuccess(Ts&...args): RetrieveQueue(args...) {}
};
}}
......@@ -471,7 +471,12 @@ struct ContainerTraits<RetrieveQueue,RetrieveQueueFailed>::QueueType{
template<>
struct ContainerTraits<RetrieveQueue,RetrieveQueueToReport>::QueueType{
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReport;
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReportToUser;
};
template<>
struct ContainerTraits<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess>::QueueType{
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToReportToRepackForSuccess;
};
}} // namespace cta::objectstore
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RetrieveQueueAlgorithms.hpp"
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess>::c_containerTypeName = "RetrieveQueueToReportToRepackForSuccess";
template<>
const std::string ContainerTraits<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess>::c_identifierType = "tapeVid";
}}
\ No newline at end of file
......@@ -152,7 +152,7 @@ queueForFailure:;
// We now need to grab the failed queue and queue the request.
RetrieveQueue rq(m_objectStore);
ScopedExclusiveLock rql;
Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, JobQueueType::JobsToReport, lc);
Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(rq, rql, agentReference, bestVid, JobQueueType::JobsToReportToUser, lc);
// Enqueue the job
objectstore::MountPolicySerDeser mp;
std::list<RetrieveQueue::JobToAdd> jta;
......@@ -551,7 +551,7 @@ JobQueueType RetrieveRequest::getQueueType() {
default: break;
}
}
if (hasToReport) return JobQueueType::JobsToReport;
if (hasToReport) return JobQueueType::JobsToReportToUser;
return JobQueueType::FailedJobs;
}
......@@ -708,12 +708,14 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint16_t copyNumber, const std::string
dfi.deserialize(payload.schedulerrequest().diskfileinfo());
retRef.m_retrieveRequest.diskFileInfo = dfi;
retRef.m_retrieveRequest.dstURL = payload.schedulerrequest().dsturl();
retRef.m_retrieveRequest.isRepack = payload.isrepack();
retRef.m_retrieveRequest.errorReportURL = payload.schedulerrequest().retrieveerrorreporturl();
retRef.m_retrieveRequest.requester.name = payload.schedulerrequest().requester().name();
retRef.m_retrieveRequest.requester.group = payload.schedulerrequest().requester().group();
objectstore::ArchiveFileSerDeser af;
af.deserialize(payload.archivefile());
retRef.m_archiveFile = af;
// TODO serialization of payload maybe not necessary
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
}
......@@ -856,7 +858,7 @@ bool RetrieveRequest::isRepack(){
return m_payload.isrepack();
}
void RetrieveRequest::setIsRepack(const bool isRepack){
void RetrieveRequest::setIsRepack(bool isRepack){
checkPayloadWritable();
m_payload.set_isrepack(isRepack);
}
......
......@@ -94,7 +94,7 @@ public:
serializers::RetrieveJobStatus nextStatus;
};
bool isRepack();
void setIsRepack(const bool isRepack);
void setIsRepack(bool isRepack);
private:
/*!
......
......@@ -69,11 +69,11 @@ bool RootEntry::isEmpty() {
if (m_payload.has_schedulerlockpointer() &&
m_payload.schedulerlockpointer().address().size())
return false;
for (auto &qt: {JobQueueType::JobsToTransfer, JobQueueType::JobsToReport, JobQueueType::FailedJobs}) {
for (auto &qt: {JobQueueType::JobsToTransfer, JobQueueType::JobsToReportToUser, JobQueueType::FailedJobs}) {
if (archiveQueuePointers(qt).size())
return false;
}
for (auto &qt: {JobQueueType::JobsToTransfer, JobQueueType::JobsToReport, JobQueueType::FailedJobs}) {
for (auto &qt: {JobQueueType::JobsToTransfer, JobQueueType::JobsToReportToUser, JobQueueType::FailedJobs, JobQueueType::JobsToReportToRepackForSuccess}) {
if (retrieveQueuePointers(qt).size())
return false;
}
......@@ -105,7 +105,7 @@ const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::Arch
switch(queueType) {
case JobQueueType::JobsToTransfer:
return m_payload.livearchivejobsqueuepointers();
case JobQueueType::JobsToReport:
case JobQueueType::JobsToReportToUser:
return m_payload.archivejobstoreportqueuepointers();
case JobQueueType::FailedJobs:
return m_payload.failedarchivejobsqueuepointers();
......@@ -118,7 +118,7 @@ const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::Arch
switch(queueType) {
case JobQueueType::JobsToTransfer:
return m_payload.mutable_livearchivejobsqueuepointers();
case JobQueueType::JobsToReport:
case JobQueueType::JobsToReportToUser:
return m_payload.mutable_archivejobstoreportqueuepointers();
case JobQueueType::FailedJobs:
return m_payload.mutable_failedarchivejobsqueuepointers();
......@@ -131,10 +131,12 @@ const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::Retr
switch(queueType) {
case JobQueueType::JobsToTransfer:
return m_payload.liveretrievejobsqueuepointers();
case JobQueueType::JobsToReport:
case JobQueueType::JobsToReportToUser:
return m_payload.retrievefailurestoreportqueuepointers();
case JobQueueType::FailedJobs:
return m_payload.failedretrievejobsqueuepointers();
case JobQueueType::JobsToReportToRepackForSuccess:
return m_payload.retrieve_queue_to_report_to_repack_for_success_pointers();
default:
throw cta::exception::Exception("In RootEntry::retrieveQueuePointers(): unknown queue type.");
}
......@@ -144,10 +146,12 @@ const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::Retr
switch(queueType) {
case JobQueueType::JobsToTransfer:
return m_payload.mutable_liveretrievejobsqueuepointers();
case JobQueueType::JobsToReport:
case JobQueueType::JobsToReportToUser:
return m_payload.mutable_retrievefailurestoreportqueuepointers();
case JobQueueType::FailedJobs:
return m_payload.mutable_failedretrievejobsqueuepointers();
case JobQueueType::JobsToReportToRepackForSuccess:
return m_payload.mutable_retrieve_queue_to_report_to_repack_for_success_pointers();
default:
throw cta::exception::Exception("In RootEntry::mutableRetrieveQueuePointers(): unknown queue type.");
}
......@@ -177,7 +181,7 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool
std::string archiveQueueNameHeader = "ArchiveQueue";
switch(queueType) {
case JobQueueType::JobsToTransfer: archiveQueueNameHeader+="ToTransfer"; break;
case JobQueueType::JobsToReport: archiveQueueNameHeader+="ToReport"; break;
case JobQueueType::JobsToReportToUser: archiveQueueNameHeader+="ToReport"; break;
case JobQueueType::FailedJobs: archiveQueueNameHeader+="Failed"; break;
default: break;
}
......@@ -307,8 +311,9 @@ std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Ag
std::string retrieveQueueNameHeader = "RetrieveQueue";
switch(queueType) {
case JobQueueType::JobsToTransfer: retrieveQueueNameHeader+="ToTransfer"; break;
case JobQueueType::JobsToReport: retrieveQueueNameHeader+="ToReport"; break;
case JobQueueType::JobsToReportToUser: retrieveQueueNameHeader+="ToReport"; break;
case JobQueueType::FailedJobs: retrieveQueueNameHeader+="Failed"; break;
case JobQueueType::JobsToReportToRepackForSuccess: retrieveQueueNameHeader+="ToReportToRepackForSuccess"; break;
default: break;
}
std::string retrieveQueueAddress = agentRef.nextId(retrieveQueueNameHeader+"-"+vid);
......
......@@ -26,6 +26,7 @@
#include "AgentRegister.hpp"
#include "ArchiveQueue.hpp"
#include "common/log/DummyLogger.hpp"
#include "RetrieveQueue.hpp"
namespace unitTests {
......@@ -297,4 +298,73 @@ TEST (ObjectStore, RootEntrySchedulerGlobalLock) {
ASSERT_FALSE(re.exists());
}
TEST(ObjectStore, RetrieveQueueToReportToRepackForSuccessRootEntryTest){
cta::objectstore::BackendVFS be;
cta::objectstore::EntryLogSerDeser el("user0",
"unittesthost", time(NULL));
cta::log::DummyLogger dl("dummy", "dummyLogger");
cta::log::LogContext lc(dl);
cta::objectstore::AgentReference agr("UnitTests", dl);
cta::objectstore::Agent ag(agr.getAgentAddress(), be);
ag.initialize();
{
// Try to create the root entry and allocate the agent register
cta::objectstore::RootEntry re(be);
re.initialize();
re.insert();
cta::objectstore::ScopedExclusiveLock rel(re);
re.addOrGetAgentRegisterPointerAndCommit(agr, el, lc);
}
ag.insertAndRegisterSelf(lc);
std::string tpAddr1, tpAddr2;
{
// Create the tape vid
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
//Try to retrieve a retrieve queue address that does not exist
ASSERT_THROW(re.getRetrieveQueueAddress("VID1", cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess),
cta::objectstore::RootEntry::NoSuchRetrieveQueue);
tpAddr1 = re.addOrGetRetrieveQueueAndCommit("VID1", agr, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess);
ASSERT_FALSE(re.isEmpty());
// Check that we can read it
cta::objectstore::RetrieveQueueToReportToRepackForSuccess aq(tpAddr1, be);
cta::objectstore::ScopedSharedLock aql(aq);
ASSERT_NO_THROW(aq.fetch());
ASSERT_EQ(aq.getVid(),"VID1");
ASSERT_TRUE(aq.isEmpty());
}
{
// Create another VID
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
tpAddr2 = re.addOrGetRetrieveQueueAndCommit("VID2", agr, cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess);
ASSERT_TRUE(be.exists(tpAddr2));
}
{
// Remove the other VID
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
re.removeRetrieveQueueAndCommit("VID2", cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess, lc);
ASSERT_FALSE(be.exists(tpAddr2));
}
// Unregister the agent
cta::objectstore::ScopedExclusiveLock agl(ag);
ag.removeAndUnregisterSelf(lc);
// Delete the root entry
cta::objectstore::RootEntry re(be);
cta::objectstore::ScopedExclusiveLock lock(re);
re.fetch();
re.removeAgentRegisterAndCommit(lc);
re.removeRetrieveQueueAndCommit("VID1", cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess, lc);
ASSERT_FALSE(be.exists(tpAddr1));
re.removeIfEmpty(lc);
ASSERT_FALSE(re.exists());
}
}
......@@ -118,6 +118,7 @@ message RootEntry {
repeated ArchiveQueuePointer failedarchivejobsqueuepointers = 1062;
repeated RetrieveQueuePointer retrievefailurestoreportqueuepointers = 1063;
repeated RetrieveQueuePointer failedretrievejobsqueuepointers = 1065;
repeated RetrieveQueuePointer retrieve_queue_to_report_to_repack_for_success_pointers = 1066;
repeated ArchiveQueuePointer archivejobstoreportqueuepointers = 1068;
optional DriveRegisterPointer driveregisterpointer = 1070;
optional AgentRegisterPointer agentregisterpointer = 1080;
......@@ -336,6 +337,7 @@ enum RetrieveJobStatus {
RJS_ToTransfer = 1;
RJS_ToReportForFailure = 997;
RJS_Failed = 998;
RJS_Succeeded = 1002; //For Retrieve request created by a Repack request
}
message SchedulerRetrieveRequest {
......
......@@ -393,7 +393,7 @@ void OStoreDB::trimEmptyQueues(log::LogContext& lc) {
RootEntry re(m_objectStore);
ScopedExclusiveLock rel(re);
re.fetch();
for (auto & queueType: { JobQueueType::JobsToTransfer, JobQueueType::JobsToReport, JobQueueType::FailedJobs} ) {
for (auto & queueType: { JobQueueType::JobsToTransfer, JobQueueType::JobsToReportToUser, JobQueueType::FailedJobs} ) {
try {
auto archiveQueueList = re.dumpArchiveQueues(queueType);
for (auto & a: archiveQueueList) {
......@@ -688,7 +688,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::getNextArch
RootEntry re(m_objectStore);
re.fetchNoLock();
while (true) {
auto queueList = re.dumpArchiveQueues(JobQueueType::JobsToReport);
auto queueList = re.dumpArchiveQueues(JobQueueType::JobsToReportToUser);
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret;
if (queueList.empty()) return ret;
// Try to get jobs from the first queue. If it is empty, it will be trimmed,
......@@ -868,6 +868,7 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
rReq->setOwner(m_agentReference->getAgentAddress());
// "Select" an arbitrary copy number. This is needed to serialize the object.
rReq->setActiveCopyNumber(criteria.archiveFile.tapeFiles.begin()->second.copyNb);
rReq->setIsRepack(rqst.isRepack);
rReq->insert();
double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
m_taskQueueSize++;
......@@ -1361,7 +1362,7 @@ getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logCo
RootEntry re(m_objectStore);
re.fetchNoLock();
while(true) {
auto queueList = re.dumpRetrieveQueues(JobQueueType::JobsToReport);
auto queueList = re.dumpRetrieveQueues(JobQueueType::JobsToReportToUser);
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> ret;
if (queueList.empty()) return ret;
......@@ -2344,6 +2345,59 @@ std::set<cta::SchedulerDatabase::RetrieveJob*> OStoreDB::RetrieveMount::finishSe
return ret;
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::batchSucceedRetrieveForRepack()
//------------------------------------------------------------------------------
std::set<cta::SchedulerDatabase::RetrieveJob *> OStoreDB::RetrieveMount::batchSucceedRetrieveForRepack(
std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, cta::log::LogContext & lc)
{
std::set<cta::SchedulerDatabase::RetrieveJob *> ret;
//typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess> RQTRTRAlgos;
for(auto & retrieveJob : jobsBatch){
auto osdbJob = castFromSchedDBJob(retrieveJob);
osdbJob->asyncSucceedForRepack();
auto update_callback = [this,&osdbJob](const std::string &in)->std::string{
// We have a locked and fetched object, so we just need to work on its representation.
cta::objectstore::serializers::ObjectHeader oh;
if (!oh.ParseFromString(in)) {
// Use a the tolerant parser to assess the situation.
oh.ParsePartialFromString(in);
throw cta::exception::Exception(std::string("In RetrieveRequest::asyncUpdateJobOwner(): could not parse header: ")+
oh.InitializationErrorString());
}
if (oh.type() != serializers::ObjectType::RetrieveRequest_t) {
std::stringstream err;
err << "In RetrieveRequest::asyncUpdateJobOwner()::lambda(): wrong object type: " << oh.type();
throw cta::exception::Exception(err.str());
}
serializers::RetrieveRequest payload;
if (!payload.ParseFromString(oh.payload())) {
// Use a the tolerant parser to assess the situation.
payload.ParsePartialFromString(oh.payload());
throw cta::exception::Exception(std::string("In RetrieveRequest::asyncUpdateJobOwner(): could not parse payload: ")+
payload.InitializationErrorString());
}
//payload.set_status(osdbJob->selectedCopyNb,serializers::RetrieveJobStatus::RJS_Succeeded);
auto retrieveJobs = payload.mutable_jobs();
for(auto &job : *retrieveJobs){
if(job.copynb() == osdbJob->selectedCopyNb)
{
job.set_status(serializers::RetrieveJobStatus::RJS_Succeeded);
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
}
}
throw cta::exception::Exception("In OStoreDB::RetrieveMount::batchSucceedRetrieveForRepack::lambda(): copyNb not found");
};
std::function <std::string(const std::string &)> update = update_callback;
cta::objectstore::Backend::AsyncUpdater * updater = this->m_oStoreDB.m_objectStore.asyncUpdate(osdbJob->m_retrieveRequest.getAddressIfSet(), update);
//cta::objectstore::Backend::AsyncUpdater * updater = osdbJob->m_retrieveMount->m_oStoreDB.m_objectStore.asyncUpdate(osdbJob->m_retrieveRequest.getAddressIfSet(), update);
updater->wait();//osdbJob->m_retrieveRequest.getAddressIfSet())<<std::endl;
}
return ret;
}
//------------------------------------------------------------------------------
// OStoreDB::ArchiveMount::setDriveStatus()
//------------------------------------------------------------------------------
......@@ -3109,6 +3163,10 @@ void OStoreDB::RetrieveJob::asyncSucceed() {
m_jobDelete.reset(m_retrieveRequest.asyncDeleteJob());
}
void OStoreDB::RetrieveJob::asyncSucceedForRepack(){
//TODO : put the code to async change retrieve request status as RJS_Success
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::checkSucceed()
//------------------------------------------------------------------------------
......@@ -3121,4 +3179,3 @@ void OStoreDB::RetrieveJob::checkSucceed() {
}
} // namespace cta
......@@ -219,6 +219,7 @@ public:
OStoreDB::RetrieveJob * castFromSchedDBJob(SchedulerDatabase::RetrieveJob * job);
public:
std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, log::LogContext& lc) override;
std::set<cta::SchedulerDatabase::RetrieveJob*> batchSucceedRetrieveForRepack(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override;
};
friend class RetrieveMount;
......@@ -230,6 +231,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
virtual void asyncSucceed() override;
virtual void checkSucceed() override;
virtual void asyncSucceedForRepack() override;
void failTransfer(const std::string& failureReason, log::LogContext& lc) override;
void failReport(const std::string& failureReason, log::LogContext& lc) override;
virtual ~RetrieveJob() override;
......
......@@ -145,8 +145,9 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc
// waitAndFinishSettingJobsBatchRetrieved()
//------------------------------------------------------------------------------
void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::unique_ptr<cta::RetrieveJob> >& successfulRetrieveJobs, cta::log::LogContext& logContext) {
std::list<std::unique_ptr<cta::RetrieveJob> > validatedSuccessfulRetrieveJobs;
std::list<std::unique_ptr<cta::RetrieveJob> > validatedSuccessfulRetrieveJobs; //List to ensure the destruction of the retrieve jobs at the end of this method
std::list<cta::SchedulerDatabase::RetrieveJob *> validatedSuccessfulDBRetrieveJobs;
std::list<cta::SchedulerDatabase::RetrieveJob *> retrieveRepackJobs;
std::unique_ptr<cta::RetrieveJob> job;
double waitUpdateCompletionTime=0;
double jobBatchFinishingTime=0;
......@@ -162,8 +163,13 @@ void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::
if (!job.get()) continue;
files++;
bytes+=job->archiveFile.fileSize;
job->checkComplete();
validatedSuccessfulDBRetrieveJobs.emplace_back(job->m_dbJob.get());
bool isRepack = job->m_dbJob->retrieveRequest.isRepack;
if(!isRepack){
job->checkComplete();
validatedSuccessfulDBRetrieveJobs.emplace_back(job->m_dbJob.get());
} else {
retrieveRepackJobs.emplace_back(job->m_dbJob.get());
}
validatedSuccessfulRetrieveJobs.emplace_back(std::move(job));
job.reset();
}
......@@ -171,9 +177,10 @@ void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::
tl.insertAndReset("waitUpdateCompletionTime",t);
// Complete the cleaning up of the jobs in the mount
m_dbMount->finishSettingJobsBatchSuccessful(validatedSuccessfulDBRetrieveJobs, logContext);
m_dbMount->batchSucceedRetrieveForRepack(retrieveRepackJobs,logContext);
jobBatchFinishingTime=t.secs();
tl.insertOrIncrement("jobBatchFinishingTime",jobBatchFinishingTime);
schedulerDbTime=jobBatchFinishingTime + waitUpdateCompletionTime;
schedulerDbTime=jobBatchFinishingTime + waitUpdateCompletionTime;
tl.insertOrIncrement("schedulerDbTime",schedulerDbTime);
{
cta::log::ScopedParamContainer params(logContext);
......
......@@ -346,6 +346,8 @@ public:
virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0;
virtual std::set<cta::SchedulerDatabase::RetrieveJob *> finishSettingJobsBatchSuccessful(
std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, log::LogContext & lc) = 0;
virtual std::set<cta::SchedulerDatabase::RetrieveJob *> batchSucceedRetrieveForRepack(
std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, cta::log::LogContext & lc) = 0;
virtual ~RetrieveMount() {}
uint32_t nbFilesCurrentlyOnTape;
};
......@@ -358,6 +360,7 @@ public:
uint64_t selectedCopyNb;
virtual void asyncSucceed() = 0;
virtual void checkSucceed() = 0;
virtual void asyncSucceedForRepack() = 0;
virtual void failTransfer(const std::string &failureReason, log::LogContext &lc) = 0;
virtual void failReport(const std::string &failureReason, log::LogContext &lc) = 0;
virtual ~RetrieveJob() {}
......
......@@ -1229,7 +1229,6 @@ TEST_P(SchedulerTest, expandRepackRequest) {
rrp.reportEndOfSession();
rrp.waitThread();
ASSERT_EQ(rrp.allThreadsDone(),true);
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment