From 81a4a751f9235f49937c6fd877a089fc4f60173a Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Tue, 8 Dec 2015 10:58:37 +0100 Subject: [PATCH] Added retrying for failed achives. --- objectstore/ArchiveToFileRequest.cpp | 19 ++++--- objectstore/ArchiveToFileRequest.hpp | 6 +- scheduler/ArchiveJob.cpp | 2 +- scheduler/OStoreDB/OStoreDB.cpp | 18 +++++- scheduler/RetrieveJobStatus.hpp | 29 ---------- scheduler/SchedulerTest.cpp | 82 ++++++++++++++++++++++++++++ 6 files changed, 109 insertions(+), 47 deletions(-) delete mode 100644 scheduler/RetrieveJobStatus.hpp diff --git a/objectstore/ArchiveToFileRequest.cpp b/objectstore/ArchiveToFileRequest.cpp index e890246171..92df2d9576 100644 --- a/objectstore/ArchiveToFileRequest.cpp +++ b/objectstore/ArchiveToFileRequest.cpp @@ -96,10 +96,8 @@ void cta::objectstore::ArchiveToFileRequest::setJobFailureLimits(uint16_t copyNu throw NoSuchJob("In ArchiveToFileRequest::setJobFailureLimits(): job not found"); } -auto cta::objectstore::ArchiveToFileRequest::addJobFailure(uint16_t copyNumber, - uint64_t mountId) - -> FailuresCount { - FailuresCount ret; +bool cta::objectstore::ArchiveToFileRequest::addJobFailure(uint16_t copyNumber, + uint64_t mountId) { checkPayloadWritable(); auto * jl = m_payload.mutable_jobs(); // Find the job and update the number of failures (and return the new count) @@ -113,10 +111,14 @@ auto cta::objectstore::ArchiveToFileRequest::addJobFailure(uint16_t copyNumber, } j->set_totalretries(j->totalretries() + 1); } - j->set_status(serializers::AJS_PendingMount); - ret.failuresWithinMount = j->retrieswithinmount(); - ret.totalFailures = j->totalretries(); - return ret; + if (j->totalretries() >= j->maxtotalretries()) { + j->set_status(serializers::AJS_Failed); + finishIfNecessary(); + return true; + } else { + j->set_status(serializers::AJS_PendingMount); + return false; + } } throw NoSuchJob ("In ArchiveToFileRequest::addJobFailure(): could not find job"); } @@ -341,7 +343,6 @@ void cta::objectstore::ArchiveToFileRequest::setJobOwner( throw NoSuchJob("In ArchiveToFileRequest::setJobOwner: no such job"); } - bool cta::objectstore::ArchiveToFileRequest::finishIfNecessary() { checkPayloadWritable(); // This function is typically called after changing the status of one job diff --git a/objectstore/ArchiveToFileRequest.hpp b/objectstore/ArchiveToFileRequest.hpp index e6ab99a332..a6d6101d5f 100644 --- a/objectstore/ArchiveToFileRequest.hpp +++ b/objectstore/ArchiveToFileRequest.hpp @@ -45,11 +45,7 @@ public: void setJobSelected(uint16_t copyNumber, const std::string & owner); void setJobPending(uint16_t copyNumber); bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job - struct FailuresCount { - uint16_t failuresWithinMount; - uint16_t totalFailures; - }; - FailuresCount addJobFailure(uint16_t copyNumber, uint64_t sessionId); + bool addJobFailure(uint16_t copyNumber, uint64_t sessionId); //< returns true the job failed serializers::ArchiveJobStatus getJobStatus(uint16_t copyNumber); // Handling of the consequences of a job status change for the entire request. // This function returns true if the request got finished. diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index c3ffa0aa66..dc221566e7 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -64,7 +64,7 @@ void cta::ArchiveJob::complete() { // failed //------------------------------------------------------------------------------ void cta::ArchiveJob::failed(const cta::exception::Exception &ex) { - throw std::runtime_error("cta::ArchiveJob::failed(): not implemented"); + m_dbJob->fail(); } //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 84489f95c3..c1a55222c7 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1652,8 +1652,19 @@ void OStoreDB::ArchiveJob::fail() { // Lock the archive request. Fail the job. objectstore::ScopedExclusiveLock atfrl(m_atfr); m_atfr.fetch(); - m_atfr.addJobFailure(m_copyNb, m_mountId); - // Return the job to its original tape pool's queue + // Add a job failure. If the job is failed, we will delete it. + if (m_atfr.addJobFailure(m_copyNb, m_mountId)) { + // The job will not be retried. Either another jobs for the same request is + // queued and keeps the request referenced or the request has been deleted. + // In any case, we can forget it. + objectstore::ScopedExclusiveLock al(m_agent); + m_agent.fetch(); + m_agent.removeFromOwnership(m_atfr.getAddressIfSet()); + m_agent.commit(); + m_jobOwned = false; + return; + } + // The job still has a chance, return it to its original tape pool's queue objectstore::RootEntry re(m_objectStore); objectstore::ScopedSharedLock rel(re); re.fetch(); @@ -1680,6 +1691,7 @@ void OStoreDB::ArchiveJob::fail() { m_agent.fetch(); m_agent.removeFromOwnership(m_atfr.getAddressIfSet()); m_agent.commit(); + m_jobOwned = false; return; } } @@ -1734,7 +1746,7 @@ OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress, void OStoreDB::RetrieveJob::fail() { throw NotImplemented(""); - } +} OStoreDB::RetrieveJob::~RetrieveJob() { if (m_jobOwned) { diff --git a/scheduler/RetrieveJobStatus.hpp b/scheduler/RetrieveJobStatus.hpp deleted file mode 100644 index 935205f7a3..0000000000 --- a/scheduler/RetrieveJobStatus.hpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - * The CERN Tape Retrieve (CTA) project - * Copyright (C) 2015 CERN - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -namespace cta { - enum class RetrieveJobStatus { - LinkingToTape = 0, - Pending = 1, - Selected = 2, - Complete = 3, - Failed = 99 - }; -} diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index da3dc222b2..71a5fc8418 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -2492,6 +2492,88 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { } } +TEST_P(SchedulerTest, retry_archive_until_max_reached) { + using namespace cta; + + Scheduler &scheduler = getScheduler(); + + ASSERT_NO_THROW(scheduler.setOwner(s_adminOnAdminHost, "/", s_user)); + + const std::string storageClassName = "TestStorageClass"; + const uint16_t nbCopies = 1; + const std::string storageClassComment = "Storage-class comment"; + ASSERT_NO_THROW(scheduler.createStorageClass(s_adminOnAdminHost, storageClassName, + nbCopies, storageClassComment)); + + const std::string dirPath = "/grandparent"; + const uint16_t mode = 0777; + ASSERT_NO_THROW(scheduler.createDir(s_userOnUserHost, dirPath, mode)); + ASSERT_NO_THROW(scheduler.setDirStorageClass(s_userOnUserHost, dirPath, + storageClassName)); + + const std::string tapePoolName = "TestTapePool"; + const uint16_t nbPartialTapes = 1; + const std::string tapePoolComment = "Tape-pool comment"; + ASSERT_NO_THROW(scheduler.createTapePool(s_adminOnAdminHost, tapePoolName, + nbPartialTapes, tapePoolComment)); + MountCriteriaByDirection mcbd(MountCriteria(1,1,0,1), MountCriteria(1,1,0,1)); + ASSERT_NO_THROW(scheduler.setTapePoolMountCriteria("TestTapePool", mcbd)); + + const std::string libraryName = "TestLogicalLibrary"; + const std::string libraryComment = "Library comment"; + ASSERT_NO_THROW(scheduler.createLogicalLibrary(s_adminOnAdminHost, libraryName, + libraryComment)); + { + std::list<LogicalLibrary> libraries; + ASSERT_NO_THROW(libraries = scheduler.getLogicalLibraries( + s_adminOnAdminHost)); + ASSERT_EQ(1, libraries.size()); + + LogicalLibrary logicalLibrary; + ASSERT_NO_THROW(logicalLibrary = libraries.front()); + ASSERT_EQ(libraryName, logicalLibrary.name); + ASSERT_EQ(libraryComment, logicalLibrary.creationLog.comment); + } + + const std::string vid = "TestVid"; + const uint64_t capacityInBytes = 12345678; + const std::string tapeComment = "Tape comment"; + ASSERT_NO_THROW(scheduler.createTape(s_adminOnAdminHost, vid, libraryName, + tapePoolName, capacityInBytes, tapeComment)); + + const uint16_t copyNb = 1; + const std::string archiveRouteComment = "Archive-route comment"; + ASSERT_NO_THROW(scheduler.createArchiveRoute(s_adminOnAdminHost, storageClassName, + copyNb, tapePoolName, archiveRouteComment)); + + std::list<std::string> remoteFiles; + remoteFiles.push_back(s_remoteFileRawPath1); + const std::string archiveFile = "/grandparent/parent_file"; + ASSERT_NO_THROW(scheduler.queueArchiveRequest(s_userOnUserHost, remoteFiles, archiveFile)); + + { + // Emulate a tape server by asking for a mount and then a file + std::unique_ptr<cta::TapeMount> mount; + ASSERT_NO_THROW(mount.reset(scheduler.getNextMount(libraryName, "drive0").release())); + ASSERT_NE((cta::TapeMount*)NULL, mount.get()); + ASSERT_EQ(cta::MountType::ARCHIVE, mount.get()->getMountType()); + std::unique_ptr<cta::ArchiveMount> archiveMount; + ASSERT_NO_THROW(archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release()))); + ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get()); + // The file should be retried 10 times + for (int i=0; i<5; i++) { + std::unique_ptr<cta::ArchiveJob> archiveJob; + ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release())); + ASSERT_NE((cta::ArchiveJob*)NULL, archiveJob.get()); + ASSERT_NO_THROW(archiveJob->failed(cta::exception::Exception("Archive failed"))); + } + // Then the request should be gone + std::unique_ptr<cta::ArchiveJob> archiveJob; + ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release())); + ASSERT_EQ((cta::ArchiveJob*)NULL, archiveJob.get()); + } +} + TEST_P(SchedulerTest, retrieve_non_existing_file) { using namespace cta; -- GitLab