Commit 0fb7666f authored by Eric Cano's avatar Eric Cano
Browse files

Added support for honoring the force down bit on the drive.

If the drive is set to force down mode, we stop finding new jobs and let the current session drain.
parent 88d625e9
......@@ -1553,6 +1553,18 @@ auto OStoreDB::ArchiveMount::getNextJob(log::LogContext &logContext) -> std::uni
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
// First, check we should not forcibly go down. In such an occasion, we just find noting to do.
// Get drive register
{
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
ScopedSharedLock drl(dr);
dr.fetch();
auto drs = dr.getDriveState(mountInfo.drive);
if (!drs.desiredDriveState.up && drs.desiredDriveState.forceDown) {
logContext.log(log::INFO, "In OStoreDB::ArchiveMount::getNextJob(): returning no job as we are forcibly going down.");
return nullptr;
}
}
auto aql = re.dumpArchiveQueues();
rel.release();
std::string aqAddress;
......@@ -2040,12 +2052,24 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo(
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::getNextJob()
//------------------------------------------------------------------------------
auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::RetrieveJob> {
auto OStoreDB::RetrieveMount::getNextJob(log::LogContext & logContext) -> std::unique_ptr<SchedulerDatabase::RetrieveJob> {
// Find the next file to retrieve
// Get the tape pool and then tape
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
// First, check we should not forcibly go down. In such an occasion, we just find noting to do.
// Get drive register
{
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
ScopedSharedLock drl(dr);
dr.fetch();
auto drs = dr.getDriveState(mountInfo.drive);
if (!drs.desiredDriveState.up && drs.desiredDriveState.forceDown) {
logContext.log(log::INFO, "In OStoreDB::RetrieveMount::getNextJob(): returning no job as we are forcibly going down.");
return nullptr;
}
}
auto rql = re.dumpRetrieveQueues();
rel.release();
std::string rqAddress;
......
......@@ -156,7 +156,7 @@ public:
objectstore::AgentReference & m_agentReference;
public:
const MountInfo & getMountInfo() override;
std::unique_ptr<RetrieveJob> getNextJob() override;
std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext) override;
void complete(time_t completionTime) override;
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
};
......
......@@ -68,11 +68,11 @@ std::string cta::RetrieveMount::getMountTransactionId() const{
//------------------------------------------------------------------------------
// getNextJob
//------------------------------------------------------------------------------
std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob() {
std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob(log::LogContext & logContext) {
if (!m_sessionRunning)
throw SessionNotRunning("In RetrieveMount::getNextJob(): trying to get job from complete/not started session");
// Try and get a new job from the DB
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> dbJob(m_dbMount->getNextJob().release());
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> dbJob(m_dbMount->getNextJob(logContext).release());
if (!dbJob.get())
return std::unique_ptr<cta::RetrieveJob>();
// We have something to retrieve: prepare the response
......
......@@ -118,7 +118,7 @@ namespace cta {
* @return A unique_ptr to the next archive job or NULL if there are no more
* archive jobs left for this tape mount.
*/
virtual std::unique_ptr<RetrieveJob> getNextJob();
virtual std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext);
/**
* Destructor.
......
......@@ -328,7 +328,7 @@ public:
uint64_t mountId;
} mountInfo;
virtual const MountInfo & getMountInfo() = 0;
virtual std::unique_ptr<RetrieveJob> getNextJob() = 0;
virtual std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext) = 0;
virtual void complete(time_t completionTime) = 0;
virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0;
virtual ~RetrieveMount() {}
......
......@@ -507,10 +507,10 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release()));
ASSERT_NE((cta::RetrieveMount*)NULL, retrieveMount.get());
std::unique_ptr<cta::RetrieveJob> retrieveJob;
retrieveJob.reset(retrieveMount->getNextJob().release());
retrieveJob.reset(retrieveMount->getNextJob(lc).release());
ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get());
retrieveJob->complete();
retrieveJob.reset(retrieveMount->getNextJob().release());
retrieveJob.reset(retrieveMount->getNextJob(lc).release());
ASSERT_EQ((cta::RetrieveJob*)NULL, retrieveJob.get());
}
}
......
......@@ -33,7 +33,7 @@ namespace cta {
~MockRetrieveMount() throw() {
}
std::unique_ptr<cta::RetrieveJob> getNextJob() override {
std::unique_ptr<cta::RetrieveJob> getNextJob(log::LogContext & logContext) override {
getJobs++;
if(m_jobs.empty()) {
return std::unique_ptr<cta::RetrieveJob>();
......
......@@ -40,10 +40,10 @@
namespace unitTests{
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
virtual void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) { throw std::runtime_error("Not implemented"); }
const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext &) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
};
class TestingRetrieveMount: public cta::RetrieveMount {
......
......@@ -35,10 +35,10 @@
namespace unitTests{
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
virtual void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) { throw std::runtime_error("Not implemented"); }
const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext & logContext) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
};
class TestingRetrieveMount: public cta::RetrieveMount {
......
......@@ -208,7 +208,7 @@ bool RecallTaskInjector::synchronousFetch()
uint64_t files=0;
uint64_t bytes=0;
while(files<=m_maxFiles && bytes<=m_maxBytes) {
std::unique_ptr<cta::RetrieveJob> job=m_retrieveMount.getNextJob();
std::unique_ptr<cta::RetrieveJob> job=m_retrieveMount.getNextJob(m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.fileSize;
......@@ -292,7 +292,7 @@ void RecallTaskInjector::WorkerThread::run()
uint64_t files=0;
uint64_t bytes=0;
while(files<=req.filesRequested && bytes<=req.bytesRequested) {
std::unique_ptr<cta::RetrieveJob> job=m_parent.m_retrieveMount.getNextJob();
std::unique_ptr<cta::RetrieveJob> job=m_parent.m_retrieveMount.getNextJob(m_parent.m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.fileSize;
......
......@@ -130,10 +130,10 @@ namespace unitTests
};
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
virtual void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) { throw std::runtime_error("Not implemented"); }
const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext & logContext) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
};
TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNominal) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment