From 7d6fdae9164fa7e2d3def2074ba7decf36e3d2c3 Mon Sep 17 00:00:00 2001
From: mvelosob <miguel.veloso.barros@cern.ch>
Date: Wed, 6 Oct 2021 17:24:07 +0200
Subject: [PATCH] Fix holding of scheduler global lock while looping queues

Remove the looping through the retrieve queue jobs while holding the global lock in OStoreDB::fetchMountInfo.
This was done when a tape was disabled, since we mount disabled tapes if they have pending retrieve requests.
Instead we now mount the tape if the queue object info as in the mount policy map a mount policy whose name
starts with "repack".

With this change, cta-admin now forces repacks added with the --disabledtape to have a mount policy
prefixed with repack (without the --disabledtape flag, the mount policy can have any name)

Also Removed the criteria that a disabled/broken tape is mounted if the queue is filled with deleted
retrieved jobs. Eventually the maxrequestage for the queue will expire, the tape will be mounted
and the queue flushed
---
 ReleaseNotes.md                               |  3 ++
 .../orchestration/tests/prepare_tests.sh      |  9 ++++
 .../tests/repack_systemtest_wrapper.sh        | 20 ++++----
 objectstore/RetrieveQueue.cpp                 | 14 ++++++
 objectstore/RetrieveQueue.hpp                 |  3 ++
 scheduler/OStoreDB/OStoreDB.cpp               | 46 ++++++++-----------
 scheduler/SchedulerTest.cpp                   | 38 +++++++++++++--
 xroot_plugins/XrdSsiCtaRequestMessage.cpp     |  7 +++
 8 files changed, 98 insertions(+), 42 deletions(-)

diff --git a/ReleaseNotes.md b/ReleaseNotes.md
index 95e9c5dd28..369ae7f943 100644
--- a/ReleaseNotes.md
+++ b/ReleaseNotes.md
@@ -7,6 +7,9 @@
 ### Bug fixes
 - cta/CTA#1013 reportType uninitialized
 - cta/CTA#1035 log configuration options on frontend startup
+- cta/CTA#1042 Do not iterate over retrieve queues when holding global scheduler lock
+  - Repacks on a disabled tape must now use a mount policy whose name starts with repack
+  - There is no longer an empty mount when a disabled/broken tape queue is full of deleted requests
 
 # v4.3-1
 
diff --git a/continuousintegration/orchestration/tests/prepare_tests.sh b/continuousintegration/orchestration/tests/prepare_tests.sh
index 369b30f379..680967a29a 100755
--- a/continuousintegration/orchestration/tests/prepare_tests.sh
+++ b/continuousintegration/orchestration/tests/prepare_tests.sh
@@ -215,6 +215,15 @@ kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin --json version | jq
      --instance ${EOSINSTANCE}                                        \
      --name powerusers                                                  \
      --mountpolicy ctasystest --comment "ctasystest"
+###
+# This mount policy is for repack: IT MUST CONTAIN THE `repack` STRING IN IT TO ALLOW MOUNTING DISABLED TAPES
+kubectl --namespace ${NAMESPACE} exec ctacli -- cta-admin mountpolicy add    \
+    --name repack_ctasystest                                                 \
+    --archivepriority 2                                               \
+    --minarchiverequestage 1                                          \
+    --retrievepriority 2                                              \
+    --minretrieverequestage 1                                         \
+    --comment "repack mountpolicy for ctasystest"
 
 echo "Labeling tapes:"
   # add all tapes
diff --git a/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh b/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh
index 52c4264354..0db15cc727 100755
--- a/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh
+++ b/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh
@@ -81,7 +81,7 @@ roundTripRepack() {
   then
   echo
     echo "Launching the repack \"just move\" test on VID ${VID_TO_REPACK} (with backpressure)"
-    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step1-RoundTripRepack -p -n ctasystest || exit 1
+    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step1-RoundTripRepack -p -n repack_ctasystest || exit 1
   else
     echo "No vid found to repack"
     exit 1
@@ -95,7 +95,7 @@ roundTripRepack() {
   then
   echo
     echo "Launching the repack \"just move\" test on VID ${VID_TO_REPACK}"
-    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RoundTripRepack -n ctasystest  || exit 1
+    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RoundTripRepack -n repack_ctasystest  || exit 1
   else
     echo "No vid found to repack"
     exit 1
@@ -122,7 +122,7 @@ repackDisableTape() {
     echo "Marking the tape ${VID_TO_REPACK} as DISABLED"
     kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape ch --state DISABLED --reason "Repack disabled tape test" --vid ${VID_TO_REPACK}
     echo "Launching the repack request test on VID ${VID_TO_REPACK} without the disabled flag"
-    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackDisabledTape -n ctasystest && echo "The repack command should have failed as the tape is disabled" && exit 1 || echo "The repack submission has failed, test OK"
+    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackDisabledTape -n repack_ctasystest && echo "The repack command should have failed as the tape is disabled" && exit 1 || echo "The repack submission has failed, test OK"
   else
     echo "No vid found to repack"
     exit 1
@@ -130,7 +130,7 @@ repackDisableTape() {
 
   echo
   echo "Launching the repack request test on VID ${VID_TO_REPACK} with the --disabledtape flag"
-  kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -d -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackDisabledTape -n ctasystest  || exit 1
+  kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -d -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackDisabledTape -n repack_ctasystest  || exit 1
 
   echo "Reclaiming tape ${VID_TO_REPACK}"
   kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape reclaim --vid ${VID_TO_REPACK}
@@ -155,7 +155,7 @@ repackJustMove() {
   then
   echo
     echo "Launching the repack test \"just move\" on VID ${VID_TO_REPACK}"
-    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackJustMove -n ctasystest  || exit 1
+    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackJustMove -n repack_ctasystest  || exit 1
   else
     echo "No vid found to repack"
     exit 1
@@ -179,7 +179,7 @@ repackJustAddCopies() {
   if [ "$VID_TO_REPACK" != "null" ]
   then
     echo "Launching the repack \"just add copies\" test on VID ${VID_TO_REPACK} with all copies already on CTA"
-    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -a -r ${BASE_REPORT_DIRECTORY}/Step$1-JustAddCopiesAllCopiesInCTA -n ctasystest
+    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -a -r ${BASE_REPORT_DIRECTORY}/Step$1-JustAddCopiesAllCopiesInCTA -n repack_ctasystest
   else
     echo "No vid found to repack"
     exit 1
@@ -219,7 +219,7 @@ repackCancellation() {
   then
   echo
     echo "Launching a repack request on VID ${VID_TO_REPACK}"
-    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackCancellation -n ctasystest & 2>/dev/null
+    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackCancellation -n repack_ctasystest & 2>/dev/null
     pid=$!
   else
     echo "No vid found to repack"
@@ -353,7 +353,7 @@ repackMoveAndAddCopies() {
   VID_TO_REPACK=$(getFirstVidContainingFiles)
 
   echo "Launching the repack \"Move and add copies\" test on VID ${VID_TO_REPACK}"
-  kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -t 600 -r ${BASE_REPORT_DIRECTORY}/Step$1-MoveAndAddCopies -n ctasystest  || exit 1
+  kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -t 600 -r ${BASE_REPORT_DIRECTORY}/Step$1-MoveAndAddCopies -n repack_ctasystest  || exit 1
 
   repackLsResult=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json repack ls --vid ${VID_TO_REPACK} | jq ". [0]"`
   totalFilesToRetrieve=`echo $repackLsResult | jq -r ".totalFilesToRetrieve"`
@@ -436,7 +436,7 @@ repackTapeRepair() {
     done
 
     echo "Launching a repack request on the vid ${VID_TO_REPACK}"
-    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackTapeRepair -n ctasystest  ||      exit 1
+    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackTapeRepair -n repack_ctasystest  ||      exit 1
 
     repackLsResult=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json repack ls --vid ${VID_TO_REPACK} | jq -r ". [0]"`
     userProvidedFiles=`echo $repackLsResult | jq -r ".userProvidedFiles"`
@@ -539,7 +539,7 @@ repackTapeRepairNoRecall() {
     done
 
     echo "Launching a repack request on the vid ${VID_TO_REPACK}"
-    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackTapeRepairNoRecall -n ctasystest -u ||      exit 1
+    kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -m -r ${BASE_REPORT_DIRECTORY}/Step$1-RepackTapeRepairNoRecall -n repack_ctasystest -u ||      exit 1
 
     repackLsResult=`kubectl -n ${NAMESPACE} exec ctacli -- cta-admin --json repack ls --vid ${VID_TO_REPACK} | jq -r ". [0]"`
     userProvidedFiles=`echo $repackLsResult | jq -r ".userProvidedFiles"`
diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp
index ade91a2928..a1267f06d7 100644
--- a/objectstore/RetrieveQueue.cpp
+++ b/objectstore/RetrieveQueue.cpp
@@ -670,6 +670,20 @@ auto RetrieveQueue::getCandidateSummary() -> CandidateJobList {
   return ret;
 }
 
+auto RetrieveQueue::getMountPolicyNames() -> std::list<std::string> {
+  ValueCountMapString mountPolicyNameMap(m_payload.mutable_mountpolicynamemap());
+  auto mountPolicyCountMap = mountPolicyNameMap.getMap(); 
+
+  std::list<std::string> mountPolicyNames;
+
+  for(const auto &mountPolicyCount: mountPolicyCountMap) {
+    if (mountPolicyCount.second != 0) {
+      mountPolicyNames.push_back(mountPolicyCount.first);
+    }
+  }
+  return mountPolicyNames;
+}
+
 void RetrieveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemove) {
   checkPayloadWritable();
   ValueCountMapUint64 priorityMap(m_payload.mutable_prioritymap());
diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp
index 9ed21489c0..3dfd41c45c 100644
--- a/objectstore/RetrieveQueue.hpp
+++ b/objectstore/RetrieveQueue.hpp
@@ -126,6 +126,9 @@ public:
 
   //! Return a summary of the number of jobs and number of bytes in the queue
   CandidateJobList getCandidateSummary();
+
+  //! Return the mount policy names for the queue object
+  std::list<std::string> getMountPolicyNames();
   
   void removeJobsAndCommit(const std::list<std::string> & jobsToRemove);
   // -- Generic parameters
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index 956316820b..617d0b4167 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -339,38 +339,28 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
     auto vidToTapeMap = m_catalogue.getTapesByVid({rqp.vid});
     common::dataStructures::Tape::State tapeState = vidToTapeMap.at(rqp.vid).state;
     bool tapeIsDisabled = tapeState == common::dataStructures::Tape::DISABLED;
-    bool tapeIsBroken = tapeState == common::dataStructures::Tape::BROKEN;
-    if(tapeIsDisabled || tapeIsBroken){
+    bool tapeIsActive = tapeState == common::dataStructures::Tape::ACTIVE;
+    if (tapeIsActive) {
+      isPotentialMount = true;
+    }
+    else if(tapeIsDisabled){
       //In the case there are Repack Retrieve Requests with the force disabled flag set
-      //on it, we will trigger a mount.
+      //on a disabled tape, we will trigger a mount.
+      //Mount policies that begin with repack are used for repack requests with force disabled flag 
+      //set to true. We only look for those to avoid looping through the retrieve queue 
+      //while holding the global scheduler lock.
       //In the case there are only deleted Retrieve Request on a DISABLED or BROKEN tape
-      //we want to trigger a mount to flush the queue.
-      auto retrieveQueueJobs = rqueue.dumpJobs();
-      uint64_t nbJobsNotExistInQueue = 0;
-      for(auto &job: retrieveQueueJobs){
-        cta::objectstore::RetrieveRequest rr(job.address,this->m_objectStore);
-        try{
-          rr.fetchNoLock();
-          if(tapeIsDisabled && rr.getRepackInfo().forceDisabledTape){
-            //At least one Retrieve job is a Repack Retrieve job with the tape disabled flag,
-            //we have a potential mount.
-            isPotentialMount = true;
-            break;
-          }
-        } catch(const cta::objectstore::Backend::NoSuchObject & ex){
-          //In the case of a repack cancellation, the RetrieveRequest object is deleted, so we just ignore the exception
-          //it will not be a potential mount.
-          nbJobsNotExistInQueue++;
-        }
-      }
-      if(!isPotentialMount && nbJobsNotExistInQueue == retrieveQueueJobs.size()){
-        //The tape is disabled or broken, there are only jobs that have been deleted, it is a potential mount as we want to flush the queue.
+      //we will no longer trigger a mount. Eventually the oldestRequestAge will pass the configured
+      //threshold and the queue will be flushed
+
+      auto queueMountPolicyNames = rqueue.getMountPolicyNames();
+      auto mountPolicyItor = std::find_if(queueMountPolicyNames.begin(),queueMountPolicyNames.end(), [](const std::string &mountPolicyName){
+        return mountPolicyName.rfind("repack", 0) == 0; 
+      });
+      
+      if(mountPolicyItor != queueMountPolicyNames.end()){
         isPotentialMount = true;
       }
-    } else {
-      //A BROKEN tape cannot be a potential mount, only ACTIVE tape
-      if(tapeState == common::dataStructures::Tape::ACTIVE)
-        isPotentialMount = true;
     }
     if (rqSummary.jobs && (isPotentialMount || purpose == SchedulerDatabase::PurposeGetMountInfo::SHOW_QUEUES)) {
       //Getting the default mountPolicies parameters from the queue summary
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index cb3f1d6e73..eb59f12a53 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -304,6 +304,7 @@ protected:
   const std::string s_mediaType = "TestMediaType";
   const std::string s_vendor = "TestVendor";
   const std::string s_mountPolicyName = "mount_group";
+  const std::string s_repackMountPolicyName = "repack_mount_group";
   const bool s_defaultRepackDisabledTapeFlag = false;
   const bool s_defaultRepackNoRecall = false;
   const uint64_t s_minFilesToWarrantAMount = 5;
@@ -3311,9 +3312,35 @@ TEST_P(SchedulerTest, noMountIsTriggeredWhenTapeIsDisabled) {
   catalogue.setTapeDisabled(admin,vid,disabledReason);
   ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName,"drive0",lc));
 
-  //Queue a Repack Request with --disabledtape flag set to force Retrieve Mount for disabled tape
+  //create repack mount policy
+  const std::string mountPolicyName = s_repackMountPolicyName;
+  const uint64_t archivePriority = s_archivePriority;
+  const uint64_t minArchiveRequestAge = s_minArchiveRequestAge;
+  const uint64_t retrievePriority = s_retrievePriority;
+  const uint64_t minRetrieveRequestAge = s_minRetrieveRequestAge;
+  const std::string mountPolicyComment = "create mount group";
+
+  catalogue::CreateMountPolicyAttributes mountPolicy;
+  mountPolicy.name = mountPolicyName;
+  mountPolicy.archivePriority = archivePriority;
+  mountPolicy.minArchiveRequestAge = minArchiveRequestAge;
+  mountPolicy.retrievePriority = retrievePriority;
+  mountPolicy.minRetrieveRequestAge = minRetrieveRequestAge;
+  mountPolicy.comment = mountPolicyComment;
+
+  catalogue.createMountPolicy(s_adminOnAdminHost, mountPolicy);
+
+  auto mountPolicies = catalogue.getMountPolicies();
+
+  auto mountPolicyItor = std::find_if(mountPolicies.begin(),mountPolicies.end(), [](const common::dataStructures::MountPolicy &mountPolicy){
+        return mountPolicy.name.rfind("repack", 0) == 0; 
+  });
+
+  ASSERT_NE(mountPolicyItor, mountPolicies.end());
+
+  //Queue a Repack Request with --disabledtape flag set to force Retrieve Mount for disabled tape with repack prefix in the mount policy
   cta::SchedulerDatabase::QueueRepackRequest qrr(vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,
-    common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,true,s_defaultRepackNoRecall);
+    *mountPolicyItor,true,s_defaultRepackNoRecall);
   scheduler.queueRepack(admin,qrr, lc);
   scheduler.waitSchedulerDbSubthreadsComplete();
 
@@ -3330,7 +3357,8 @@ TEST_P(SchedulerTest, noMountIsTriggeredWhenTapeIsDisabled) {
 
   /*
    * Test expected behaviour for NOW:
-   * The getNextMount should return a mount as the tape is disabled and there are repack --disabledtape retrieve jobs in it
+   * The getNextMount should return a mount as the tape is disabled, there are repack --disabledtape retrieve jobs in it
+   * and the mount policy name begins with repack
    * We will then get the Repack AND USER jobs from the getNextJobBatch
    */
   auto nextMount = scheduler.getNextMount(s_libraryName,"drive0",lc);
@@ -3340,8 +3368,9 @@ TEST_P(SchedulerTest, noMountIsTriggeredWhenTapeIsDisabled) {
   auto jobBatch = retrieveMount->getNextJobBatch(20,20*archiveFileSize,lc);
   ASSERT_EQ(11,jobBatch.size()); //1 user job + 10 Repack jobs = 11 jobs in the batch
 }
-
+/*
 TEST_P(SchedulerTest, emptyMountIsTriggeredWhenCancelledRetrieveRequest) {
+
   using namespace cta;
   using namespace cta::objectstore;
   unitTests::TempDirectory tempDirectory;
@@ -3469,6 +3498,7 @@ TEST_P(SchedulerTest, emptyMountIsTriggeredWhenCancelledRetrieveRequest) {
   ASSERT_NE(nullptr,retrieveMount);
 
 }
+*/
 
 TEST_P(SchedulerTest, DISABLED_archiveReportMultipleAndQueueRetrievesWithActivities) {
   using namespace cta;
diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
index d98dba35f6..6f872a4b95 100644
--- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp
+++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
@@ -1527,6 +1527,13 @@ void RequestMessage::processRepack_Add(cta::xrd::Response &response)
 
   bool forceDisabledTape = has_flag(OptionBoolean::DISABLED);
 
+  if (forceDisabledTape) {
+   //repacks on a disabled tape must be from a mount policy whose name starts with repack
+   if (mountPolicy.name.rfind("repack", 0) != 0) { 
+      throw cta::exception::UserError("--disabledtape requires a mount policy whose name starts with repack");
+   }
+  }
+
   bool noRecall = has_flag(OptionBoolean::NO_RECALL);
 
   // Process each item in the list
-- 
GitLab