From a05ad490dde17b90705dfaae149738ecd23c2ff9 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Wed, 12 Jul 2017 16:02:32 +0200
Subject: [PATCH] Added support for honoring the force down bit on the drive.

If the drive is set to force down mode, we stop finding new jobs and let the current session drain.
---
 scheduler/OStoreDB/OStoreDB.cpp               | 26 ++++++++++++++++++-
 scheduler/OStoreDB/OStoreDB.hpp               |  2 +-
 scheduler/RetrieveMount.cpp                   |  4 +--
 scheduler/RetrieveMount.hpp                   |  2 +-
 scheduler/SchedulerDatabase.hpp               |  2 +-
 scheduler/SchedulerTest.cpp                   |  4 +--
 scheduler/testingMocks/MockRetrieveMount.hpp  |  2 +-
 .../tapeserver/daemon/DiskWriteTaskTest.cpp   |  8 +++---
 .../daemon/DiskWriteThreadPoolTest.cpp        |  8 +++---
 .../tapeserver/daemon/RecallTaskInjector.cpp  |  4 +--
 .../daemon/RecallTaskInjectorTest.cpp         |  8 +++---
 11 files changed, 47 insertions(+), 23 deletions(-)

diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index 2b9ecb52ec..7ed3f01920 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -1553,6 +1553,18 @@ auto OStoreDB::ArchiveMount::getNextJob(log::LogContext &logContext) -> std::uni
   objectstore::RootEntry re(m_objectStore);
   objectstore::ScopedSharedLock rel(re);
   re.fetch();
+  // First, check we should not forcibly go down. In such an occasion, we just find noting to do.
+  // Get drive register
+  {
+    objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
+    ScopedSharedLock drl(dr);
+    dr.fetch();
+    auto drs = dr.getDriveState(mountInfo.drive);
+    if (!drs.desiredDriveState.up && drs.desiredDriveState.forceDown) {
+      logContext.log(log::INFO, "In OStoreDB::ArchiveMount::getNextJob(): returning no job as we are forcibly going down.");
+      return nullptr;
+    }
+  }
   auto aql = re.dumpArchiveQueues();
   rel.release();
   std::string aqAddress;
@@ -2040,12 +2052,24 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo(
 //------------------------------------------------------------------------------
 // OStoreDB::RetrieveMount::getNextJob()
 //------------------------------------------------------------------------------
-auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::RetrieveJob> {
+auto OStoreDB::RetrieveMount::getNextJob(log::LogContext & logContext) -> std::unique_ptr<SchedulerDatabase::RetrieveJob> {
   // Find the next file to retrieve
   // Get the tape pool and then tape
   objectstore::RootEntry re(m_objectStore);
   objectstore::ScopedSharedLock rel(re);
   re.fetch();
+  // First, check we should not forcibly go down. In such an occasion, we just find noting to do.
+  // Get drive register
+  {
+    objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
+    ScopedSharedLock drl(dr);
+    dr.fetch();
+    auto drs = dr.getDriveState(mountInfo.drive);
+    if (!drs.desiredDriveState.up && drs.desiredDriveState.forceDown) {
+      logContext.log(log::INFO, "In OStoreDB::RetrieveMount::getNextJob(): returning no job as we are forcibly going down.");
+      return nullptr;
+    }
+  }
   auto rql = re.dumpRetrieveQueues();
   rel.release();
   std::string rqAddress;
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index ff8cd57003..102b34e5e1 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -156,7 +156,7 @@ public:
     objectstore::AgentReference & m_agentReference;
   public:
     const MountInfo & getMountInfo() override;
-    std::unique_ptr<RetrieveJob> getNextJob() override;
+    std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext) override;
     void complete(time_t completionTime) override;
     void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
   };
diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp
index 2bf3f999d9..709b688b0b 100644
--- a/scheduler/RetrieveMount.cpp
+++ b/scheduler/RetrieveMount.cpp
@@ -68,11 +68,11 @@ std::string cta::RetrieveMount::getMountTransactionId() const{
 //------------------------------------------------------------------------------
 // getNextJob
 //------------------------------------------------------------------------------
-std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob() {
+std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob(log::LogContext & logContext) {
   if (!m_sessionRunning)
     throw SessionNotRunning("In RetrieveMount::getNextJob(): trying to get job from complete/not started session");
   // Try and get a new job from the DB
-  std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> dbJob(m_dbMount->getNextJob().release());
+  std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> dbJob(m_dbMount->getNextJob(logContext).release());
   if (!dbJob.get())
     return std::unique_ptr<cta::RetrieveJob>();
   // We have something to retrieve: prepare the response
diff --git a/scheduler/RetrieveMount.hpp b/scheduler/RetrieveMount.hpp
index b5273a92dd..9cf7481ced 100644
--- a/scheduler/RetrieveMount.hpp
+++ b/scheduler/RetrieveMount.hpp
@@ -118,7 +118,7 @@ namespace cta {
      * @return A unique_ptr to the next archive job or NULL if there are no more
      * archive jobs left for this tape mount.
      */
-    virtual std::unique_ptr<RetrieveJob> getNextJob();
+    virtual std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext);
     
     /**
      * Destructor.
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index f01d0dd62f..c78c82929a 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -328,7 +328,7 @@ public:
       uint64_t mountId;
     } mountInfo;
     virtual const MountInfo & getMountInfo() = 0;
-    virtual std::unique_ptr<RetrieveJob> getNextJob() = 0;
+    virtual std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext) = 0;
     virtual void complete(time_t completionTime) = 0;
     virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0;
     virtual ~RetrieveMount() {}
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index c68b674d81..295849ca56 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -507,10 +507,10 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
     retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release()));
     ASSERT_NE((cta::RetrieveMount*)NULL, retrieveMount.get());
     std::unique_ptr<cta::RetrieveJob> retrieveJob;
-    retrieveJob.reset(retrieveMount->getNextJob().release());
+    retrieveJob.reset(retrieveMount->getNextJob(lc).release());
     ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get());
     retrieveJob->complete();
-    retrieveJob.reset(retrieveMount->getNextJob().release());
+    retrieveJob.reset(retrieveMount->getNextJob(lc).release());
     ASSERT_EQ((cta::RetrieveJob*)NULL, retrieveJob.get());
   }
 }
diff --git a/scheduler/testingMocks/MockRetrieveMount.hpp b/scheduler/testingMocks/MockRetrieveMount.hpp
index 9944cf3a06..edd2295633 100644
--- a/scheduler/testingMocks/MockRetrieveMount.hpp
+++ b/scheduler/testingMocks/MockRetrieveMount.hpp
@@ -33,7 +33,7 @@ namespace cta {
     ~MockRetrieveMount() throw() {
     }
 
-    std::unique_ptr<cta::RetrieveJob> getNextJob() override {
+    std::unique_ptr<cta::RetrieveJob> getNextJob(log::LogContext & logContext) override {
       getJobs++;
       if(m_jobs.empty()) {
         return std::unique_ptr<cta::RetrieveJob>();
diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp
index bddacf9a6c..fc5dbfe17a 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp
@@ -40,10 +40,10 @@
 
 namespace unitTests{
   class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
-    virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
-    virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
-    virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
-    virtual void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) { throw std::runtime_error("Not implemented"); }
+    const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
+    std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext &) override { throw std::runtime_error("Not implemented");}
+    void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
+    void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
   };
   
   class TestingRetrieveMount: public cta::RetrieveMount {
diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp
index de996d6ef7..5a8fdbe95e 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp
@@ -35,10 +35,10 @@
 namespace unitTests{
   
   class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
-    virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
-    virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
-    virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
-    virtual void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) { throw std::runtime_error("Not implemented"); }
+    const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
+    std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext & logContext) override { throw std::runtime_error("Not implemented");}
+    void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
+    void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
   };
   
   class TestingRetrieveMount: public cta::RetrieveMount {
diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp
index e80b66eb71..8201530251 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp
@@ -208,7 +208,7 @@ bool RecallTaskInjector::synchronousFetch()
     uint64_t files=0;
     uint64_t bytes=0;
     while(files<=m_maxFiles && bytes<=m_maxBytes) {
-      std::unique_ptr<cta::RetrieveJob> job=m_retrieveMount.getNextJob();
+      std::unique_ptr<cta::RetrieveJob> job=m_retrieveMount.getNextJob(m_lc);
       if(!job.get()) break;
       files++;
       bytes+=job->archiveFile.fileSize;
@@ -292,7 +292,7 @@ void RecallTaskInjector::WorkerThread::run()
       uint64_t files=0;
       uint64_t bytes=0;
       while(files<=req.filesRequested && bytes<=req.bytesRequested) {
-        std::unique_ptr<cta::RetrieveJob> job=m_parent.m_retrieveMount.getNextJob();
+        std::unique_ptr<cta::RetrieveJob> job=m_parent.m_retrieveMount.getNextJob(m_parent.m_lc);
         if(!job.get()) break;
         files++;
         bytes+=job->archiveFile.fileSize;
diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp
index 8031efd89a..67d7727e5a 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp
@@ -130,10 +130,10 @@ namespace unitTests
   };
   
   class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
-    virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
-    virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
-    virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
-    virtual void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) { throw std::runtime_error("Not implemented"); }
+    const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
+    std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext & logContext) override { throw std::runtime_error("Not implemented");}
+    void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
+    void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
   };
   
   TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNominal) {
-- 
GitLab