From 4903ee74203c3a29fb14d718bcdf9c353a802cb1 Mon Sep 17 00:00:00 2001 From: Joao Afonso <joao.afonso@cern.ch> Date: Tue, 20 Dec 2022 12:34:38 +0100 Subject: [PATCH] Do not retry during repack requests --- ReleaseNotes.md | 1 + objectstore/RetrieveRequest.cpp | 26 ++++-- scheduler/SchedulerTest.cpp | 142 ++++++++------------------------ 3 files changed, 52 insertions(+), 117 deletions(-) diff --git a/ReleaseNotes.md b/ReleaseNotes.md index ec0ff572dc..a4b94e8b22 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -10,6 +10,7 @@ - cta/CTA#239 - Add improvments to the cta-change-storage-class tool - cta/CTA#248 - Clean up output from cta-readtp - cta/CTA#251 - Increase free drive STALE threshold to 4 hours +- cta/CTA#218 - Do not retry during repack requests ### Bug Fixes - cta/CTA#234 - Replace stoi with toUint64 in standalone cli tool - cta/CTA#238 - Compilation fails when using cta::common::Configuration::getConfEntInt(...) diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 87e57e9d24..9246087f78 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -522,16 +522,24 @@ void RetrieveRequest::setRetrieveFileQueueCriteria(const cta::common::dataStruct MountPolicySerDeser(criteria.mountPolicy).serialize(*m_payload.mutable_mountpolicy()); m_payload.set_mountpolicyname(criteria.mountPolicy.name); /* - * Explaination about these hardcoded retries : - * The hardcoded RetriesWithinMount will ensure that we will try to retrieve the file 3 times - * in the same mount. - * The hardcoded TotalRetries ensure that we will never try more than 6 times to retrieve a file. - * As totalretries = 6 and retrieswithinmount = 3, this will ensure that the file will be retried by maximum 2 mounts. - * (2 mounts * 3 retrieswithinmount = 6 totalretries) + * Explanation about these hardcoded retries : + * + * For user requests: + * - The hardcoded RetriesWithinMount will ensure that we will try to retrieve the file 3 times in the same mount. + * - The hardcoded TotalRetries ensure that we will never try more than 6 times to retrieve a file. + * - As totalretries = 6 and retrieswithinmount = 3, this will ensure that the file will be retried by maximum 2 mounts. + * (2 mounts * 3 retrieswithinmount = 6 totalretries) + * + * For repack requests: + * - The hardcoded RetriesWithinMount is 1 to ensure that we will only try to retrieve the file 1 time in a mount. + * - The hardcoded TotalRetries is also 1 to ensure that we do not retry the retrieve in a different mount. + * - As totalretries = 1 and retrieswithinmount = 1, this will ensure that the file will be retried by maximum 1 mount. + * (1 mounts * 1 retrieswithinmount = 1 totalretries) */ - const uint32_t hardcodedRetriesWithinMount = 3; - const uint32_t hardcodedTotalRetries = 6; - const uint32_t hardcodedReportRetries = 2; + bool isRepack = getRepackInfo().isRepack; + const uint32_t hardcodedRetriesWithinMount = isRepack ? 1 : 3; + const uint32_t hardcodedTotalRetries = isRepack ? 1 : 6; + const uint32_t hardcodedReportRetries = 2; addJob(tf.copyNb, hardcodedRetriesWithinMount, hardcodedTotalRetries, hardcodedReportRetries); } } diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 6189cfc9d4..9efdc742b9 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -2663,81 +2663,48 @@ TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) { scheduler.waitSchedulerDbSubthreadsComplete(); { - for(int i = 0; i < 5; ++i){ - std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); - ASSERT_NE(nullptr, mount.get()); - ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); - std::unique_ptr<cta::RetrieveMount> retrieveMount; - retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); - ASSERT_NE(nullptr, retrieveMount.get()); - std::unique_ptr<cta::RetrieveJob> retrieveJob; - - //For each tape we will see if the retrieve jobs are not null - auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc); - retrieveJob.reset(jobBatch.front().release()); - ASSERT_NE(nullptr, retrieveJob.get()); - - castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc); - - rrp.startThreads(); - - rrp.reportFailedJob(std::move(retrieveJob),cta::exception::Exception("FailedJob for unit test expandRepackRequestFailedRetrieve"), lc); - - rrp.setDiskDone(); - rrp.setTapeDone(); - - rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting, std::nullopt, lc); - - rrp.reportEndOfSession(lc); - rrp.waitThread(); - ASSERT_EQ(rrp.allThreadsDone(),true); - } - - { - //Verify that the job is in the RetrieveQueueToReportToRepackForFailure - cta::objectstore::RootEntry re(backend); - cta::objectstore::ScopedExclusiveLock sel(re); - re.fetch(); + //Verify that the job is in the RetrieveQueueToReportToRepackForFailure + cta::objectstore::RootEntry re(backend); + cta::objectstore::ScopedExclusiveLock sel(re); + re.fetch(); - //Get the retrieveQueueToReportToRepackForFailure - // The queue is named after the repack request: we need to query the repack index - objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend()); - ri.fetchNoLock(); + //Get the retrieveQueueToReportToRepackForFailure + // The queue is named after the repack request: we need to query the repack index + objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend()); + ri.fetchNoLock(); - std::string retrieveQueueToReportToRepackForFailureAddress = re.getRetrieveQueueAddress(ri.getRepackRequestAddress(vid),JobQueueType::JobsToReportToRepackForFailure); - cta::objectstore::RetrieveQueue rq(retrieveQueueToReportToRepackForFailureAddress,backend); + std::string retrieveQueueToReportToRepackForFailureAddress = re.getRetrieveQueueAddress(ri.getRepackRequestAddress(vid),JobQueueType::JobsToReportToRepackForFailure); + cta::objectstore::RetrieveQueue rq(retrieveQueueToReportToRepackForFailureAddress,backend); - //Fetch the queue so that we can get the retrieveRequests from it - cta::objectstore::ScopedExclusiveLock rql(rq); - rq.fetch(); + //Fetch the queue so that we can get the retrieveRequests from it + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); - ASSERT_EQ(rq.dumpJobs().size(),1); - for(auto& job: rq.dumpJobs()){ - ASSERT_EQ(1,job.copyNb); - ASSERT_EQ(archiveFileSize,job.size); - } + ASSERT_EQ(rq.dumpJobs().size(),1); + for(auto& job: rq.dumpJobs()){ + ASSERT_EQ(1,job.copyNb); + ASSERT_EQ(archiveFileSize,job.size); } + } - { - Scheduler::RepackReportBatch reports = scheduler.getNextRepackReportBatch(lc); - reports.report(lc); - reports = scheduler.getNextRepackReportBatch(lc); - reports.report(lc); - } - { - //After the reporting, the RetrieveQueueToReportToRepackForFailure should not exist anymore. - cta::objectstore::RootEntry re(backend); - cta::objectstore::ScopedExclusiveLock sel(re); - re.fetch(); + { + Scheduler::RepackReportBatch reports = scheduler.getNextRepackReportBatch(lc); + reports.report(lc); + reports = scheduler.getNextRepackReportBatch(lc); + reports.report(lc); + } + { + //After the reporting, the RetrieveQueueToReportToRepackForFailure should not exist anymore. + cta::objectstore::RootEntry re(backend); + cta::objectstore::ScopedExclusiveLock sel(re); + re.fetch(); - //Get the retrieveQueueToReportToRepackForFailure - // The queue is named after the repack request: we need to query the repack index - objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend()); - ri.fetchNoLock(); + //Get the retrieveQueueToReportToRepackForFailure + // The queue is named after the repack request: we need to query the repack index + objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend()); + ri.fetchNoLock(); - ASSERT_THROW(re.getRetrieveQueueAddress(ri.getRepackRequestAddress(vid),JobQueueType::JobsToReportToRepackForFailure),cta::exception::Exception); - } + ASSERT_THROW(re.getRetrieveQueueAddress(ri.getRepackRequestAddress(vid),JobQueueType::JobsToReportToRepackForFailure),cta::exception::Exception); } } } @@ -3225,47 +3192,6 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) { } } - for(int i = 0; i < 5; ++i){ - { - //The failed job should be queued into the ArchiveQueueToTransferForRepack - cta::objectstore::RootEntry re(backend); - re.fetchNoLock(); - - std::string archiveQueueToTransferForRepackAddress = re.getArchiveQueueAddress(s_tapePoolName,JobQueueType::JobsToTransferForRepack); - cta::objectstore::ArchiveQueue aq(archiveQueueToTransferForRepackAddress,backend); - - aq.fetchNoLock(); - - for(auto &job: aq.dumpJobs()){ - ASSERT_EQ(1,job.copyNb); - ASSERT_EQ(archiveFileSize,job.size); - } - } - std::unique_ptr<cta::TapeMount> mount; - mount.reset(scheduler.getNextMount(s_libraryName, driveName, lc).release()); - ASSERT_NE(nullptr, mount.get()); - ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); - std::unique_ptr<cta::ArchiveMount> archiveMount; - archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); - ASSERT_NE(nullptr, archiveMount.get()); - std::unique_ptr<cta::ArchiveJob> archiveJob; - - auto jobBatch = archiveMount->getNextJobBatch(1,archiveFileSize,lc); - archiveJob.reset(jobBatch.front().release()); - ASSERT_NE(nullptr, archiveJob.get()); - - castor::tape::tapeserver::daemon::MigrationReportPacker mrp(archiveMount.get(),lc); - mrp.startThreads(); - - mrp.reportFailedJob(std::move(archiveJob),cta::exception::Exception("FailedJob expandRepackRequestFailedArchive"),lc); - - castor::tape::tapeserver::drive::compressionStats compressStats; - mrp.reportFlush(compressStats,lc); - mrp.reportEndOfSession(lc); - mrp.reportTestGoingToEnd(lc); - mrp.waitThread(); - } - //Test that the failed job is queued in the ArchiveQueueToReportToRepackForFailure { cta::objectstore::RootEntry re(backend); -- GitLab