Commit b5534086 authored by Eric Cano's avatar Eric Cano
Browse files

Added retrying for failed achives.

parent b94a73ee
......@@ -96,10 +96,8 @@ void cta::objectstore::ArchiveToFileRequest::setJobFailureLimits(uint16_t copyNu
throw NoSuchJob("In ArchiveToFileRequest::setJobFailureLimits(): job not found");
}
auto cta::objectstore::ArchiveToFileRequest::addJobFailure(uint16_t copyNumber,
uint64_t mountId)
-> FailuresCount {
FailuresCount ret;
bool cta::objectstore::ArchiveToFileRequest::addJobFailure(uint16_t copyNumber,
uint64_t mountId) {
checkPayloadWritable();
auto * jl = m_payload.mutable_jobs();
// Find the job and update the number of failures (and return the new count)
......@@ -113,10 +111,14 @@ auto cta::objectstore::ArchiveToFileRequest::addJobFailure(uint16_t copyNumber,
}
j->set_totalretries(j->totalretries() + 1);
}
j->set_status(serializers::AJS_PendingMount);
ret.failuresWithinMount = j->retrieswithinmount();
ret.totalFailures = j->totalretries();
return ret;
if (j->totalretries() >= j->maxtotalretries()) {
j->set_status(serializers::AJS_Failed);
finishIfNecessary();
return true;
} else {
j->set_status(serializers::AJS_PendingMount);
return false;
}
}
throw NoSuchJob ("In ArchiveToFileRequest::addJobFailure(): could not find job");
}
......@@ -341,7 +343,6 @@ void cta::objectstore::ArchiveToFileRequest::setJobOwner(
throw NoSuchJob("In ArchiveToFileRequest::setJobOwner: no such job");
}
bool cta::objectstore::ArchiveToFileRequest::finishIfNecessary() {
checkPayloadWritable();
// This function is typically called after changing the status of one job
......
......@@ -45,11 +45,7 @@ public:
void setJobSelected(uint16_t copyNumber, const std::string & owner);
void setJobPending(uint16_t copyNumber);
bool setJobSuccessful(uint16_t copyNumber); //< returns true if this is the last job
struct FailuresCount {
uint16_t failuresWithinMount;
uint16_t totalFailures;
};
FailuresCount addJobFailure(uint16_t copyNumber, uint64_t sessionId);
bool addJobFailure(uint16_t copyNumber, uint64_t sessionId); //< returns true the job failed
serializers::ArchiveJobStatus getJobStatus(uint16_t copyNumber);
// Handling of the consequences of a job status change for the entire request.
// This function returns true if the request got finished.
......
......@@ -64,7 +64,7 @@ void cta::ArchiveJob::complete() {
// failed
//------------------------------------------------------------------------------
void cta::ArchiveJob::failed(const cta::exception::Exception &ex) {
throw std::runtime_error("cta::ArchiveJob::failed(): not implemented");
m_dbJob->fail();
}
//------------------------------------------------------------------------------
......
......@@ -1652,8 +1652,19 @@ void OStoreDB::ArchiveJob::fail() {
// Lock the archive request. Fail the job.
objectstore::ScopedExclusiveLock atfrl(m_atfr);
m_atfr.fetch();
m_atfr.addJobFailure(m_copyNb, m_mountId);
// Return the job to its original tape pool's queue
// Add a job failure. If the job is failed, we will delete it.
if (m_atfr.addJobFailure(m_copyNb, m_mountId)) {
// The job will not be retried. Either another jobs for the same request is
// queued and keeps the request referenced or the request has been deleted.
// In any case, we can forget it.
objectstore::ScopedExclusiveLock al(m_agent);
m_agent.fetch();
m_agent.removeFromOwnership(m_atfr.getAddressIfSet());
m_agent.commit();
m_jobOwned = false;
return;
}
// The job still has a chance, return it to its original tape pool's queue
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
......@@ -1680,6 +1691,7 @@ void OStoreDB::ArchiveJob::fail() {
m_agent.fetch();
m_agent.removeFromOwnership(m_atfr.getAddressIfSet());
m_agent.commit();
m_jobOwned = false;
return;
}
}
......@@ -1734,7 +1746,7 @@ OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress,
void OStoreDB::RetrieveJob::fail() {
throw NotImplemented("");
}
}
OStoreDB::RetrieveJob::~RetrieveJob() {
if (m_jobOwned) {
......
/*
* The CERN Tape Retrieve (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
namespace cta {
enum class RetrieveJobStatus {
LinkingToTape = 0,
Pending = 1,
Selected = 2,
Complete = 3,
Failed = 99
};
}
......@@ -2492,6 +2492,88 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
}
}
TEST_P(SchedulerTest, retry_archive_until_max_reached) {
using namespace cta;
Scheduler &scheduler = getScheduler();
ASSERT_NO_THROW(scheduler.setOwner(s_adminOnAdminHost, "/", s_user));
const std::string storageClassName = "TestStorageClass";
const uint16_t nbCopies = 1;
const std::string storageClassComment = "Storage-class comment";
ASSERT_NO_THROW(scheduler.createStorageClass(s_adminOnAdminHost, storageClassName,
nbCopies, storageClassComment));
const std::string dirPath = "/grandparent";
const uint16_t mode = 0777;
ASSERT_NO_THROW(scheduler.createDir(s_userOnUserHost, dirPath, mode));
ASSERT_NO_THROW(scheduler.setDirStorageClass(s_userOnUserHost, dirPath,
storageClassName));
const std::string tapePoolName = "TestTapePool";
const uint16_t nbPartialTapes = 1;
const std::string tapePoolComment = "Tape-pool comment";
ASSERT_NO_THROW(scheduler.createTapePool(s_adminOnAdminHost, tapePoolName,
nbPartialTapes, tapePoolComment));
MountCriteriaByDirection mcbd(MountCriteria(1,1,0,1), MountCriteria(1,1,0,1));
ASSERT_NO_THROW(scheduler.setTapePoolMountCriteria("TestTapePool", mcbd));
const std::string libraryName = "TestLogicalLibrary";
const std::string libraryComment = "Library comment";
ASSERT_NO_THROW(scheduler.createLogicalLibrary(s_adminOnAdminHost, libraryName,
libraryComment));
{
std::list<LogicalLibrary> libraries;
ASSERT_NO_THROW(libraries = scheduler.getLogicalLibraries(
s_adminOnAdminHost));
ASSERT_EQ(1, libraries.size());
LogicalLibrary logicalLibrary;
ASSERT_NO_THROW(logicalLibrary = libraries.front());
ASSERT_EQ(libraryName, logicalLibrary.name);
ASSERT_EQ(libraryComment, logicalLibrary.creationLog.comment);
}
const std::string vid = "TestVid";
const uint64_t capacityInBytes = 12345678;
const std::string tapeComment = "Tape comment";
ASSERT_NO_THROW(scheduler.createTape(s_adminOnAdminHost, vid, libraryName,
tapePoolName, capacityInBytes, tapeComment));
const uint16_t copyNb = 1;
const std::string archiveRouteComment = "Archive-route comment";
ASSERT_NO_THROW(scheduler.createArchiveRoute(s_adminOnAdminHost, storageClassName,
copyNb, tapePoolName, archiveRouteComment));
std::list<std::string> remoteFiles;
remoteFiles.push_back(s_remoteFileRawPath1);
const std::string archiveFile = "/grandparent/parent_file";
ASSERT_NO_THROW(scheduler.queueArchiveRequest(s_userOnUserHost, remoteFiles, archiveFile));
{
// Emulate a tape server by asking for a mount and then a file
std::unique_ptr<cta::TapeMount> mount;
ASSERT_NO_THROW(mount.reset(scheduler.getNextMount(libraryName, "drive0").release()));
ASSERT_NE((cta::TapeMount*)NULL, mount.get());
ASSERT_EQ(cta::MountType::ARCHIVE, mount.get()->getMountType());
std::unique_ptr<cta::ArchiveMount> archiveMount;
ASSERT_NO_THROW(archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())));
ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get());
// The file should be retried 10 times
for (int i=0; i<5; i++) {
std::unique_ptr<cta::ArchiveJob> archiveJob;
ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release()));
ASSERT_NE((cta::ArchiveJob*)NULL, archiveJob.get());
ASSERT_NO_THROW(archiveJob->failed(cta::exception::Exception("Archive failed")));
}
// Then the request should be gone
std::unique_ptr<cta::ArchiveJob> archiveJob;
ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release()));
ASSERT_EQ((cta::ArchiveJob*)NULL, archiveJob.get());
}
}
TEST_P(SchedulerTest, retrieve_non_existing_file) {
using namespace cta;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment