From 920af0616be38129b3a0c401a3fb7e40d86667f7 Mon Sep 17 00:00:00 2001
From: Cedric CAFFY <cedric.caffy@cern.ch>
Date: Wed, 29 Jan 2020 11:01:43 +0100
Subject: [PATCH] No retrieve mount is triggered when a tape is disabled. If
 Repack --disabledtape Retrieve jobs are in the queue, the Retrieve mount will
 be triggered and the user retrieve jobs will be popped as well as the Repack
 Retrieve jobs

---
 scheduler/OStoreDB/OStoreDB.cpp |  26 +++++-
 scheduler/Scheduler.cpp         |  29 ++++---
 scheduler/SchedulerTest.cpp     | 145 +++++++++++++++++++++++++++++++-
 3 files changed, 181 insertions(+), 19 deletions(-)

diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index 2644501683..2284db028a 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -302,7 +302,24 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
     // If there are files queued, we create an entry for this retrieve queue in the
     // mount candidates list.
     auto rqSummary = rqueue.getJobsSummary();
-    if (rqSummary.jobs) {
+    bool isPotentialMount = false;
+    auto vidToTapeMap = m_catalogue.getTapesByVid({rqp.vid});
+    if(vidToTapeMap.at(rqp.vid).disabled){
+      //Check if there are Repack Retrieve requests with forceDisabledTape flag in the queue
+      for(auto &job: rqueue.dumpJobs()){
+        cta::objectstore::RetrieveRequest rr(job.address,this->m_objectStore);
+        rr.fetchNoLock();
+        if(rr.getRepackInfo().forceDisabledTape){
+          //At least one Retrieve job is a Repack Retrieve job with the tape disabled flag,
+          //we have a potential mount.
+          isPotentialMount = true;
+          break;
+        }
+      }
+    } else {
+      isPotentialMount = true;
+    }
+    if (rqSummary.jobs && isPotentialMount) {
       // Check if we have activities and if all the jobs are covered by one or not (possible mixed case).
       bool jobsWithoutActivity = true;
       if (rqSummary.activityCounts.size()) {
@@ -370,7 +387,8 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
         }
       }
     } else {
-      tmdi.queueTrimRequired = true;
+      if(!rqSummary.jobs)
+        tmdi.queueTrimRequired = true;
     }
     auto processingTime = t.secs(utils::Timer::resetCounter);
     log::ScopedParamContainer params (logContext);
@@ -3694,8 +3712,8 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMou
     // Compute the necessary space in each targeted disk system.
     std::map<std::string, uint64_t> spaceMap;
     for (auto &j: jobs.elements)
-        if (j.diskSystemName)
-          diskSpaceReservationRequest.addRequest(j.diskSystemName.value(), j.archiveFile.fileSize);
+      if (j.diskSystemName)
+        diskSpaceReservationRequest.addRequest(j.diskSystemName.value(), j.archiveFile.fileSize);
     // Get the existing reservation map from drives (including this drive's previous pending reservations).
     auto previousDrivesReservations = getExistingDrivesReservations();
     typedef std::pair<std::string, uint64_t> Res;
diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp
index d2ce43d500..78a3739206 100644
--- a/scheduler/Scheduler.cpp
+++ b/scheduler/Scheduler.cpp
@@ -940,21 +940,22 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
   // The library information is not know for the tapes involved in retrieves. We 
   // need to query the catalogue now about all those tapes.
   // Build the list of tapes.
-  std::set<std::string> tapeSet;
+  std::set<std::string> retrieveTapeSet;
   for (auto &m:mountInfo->potentialMounts) {
-    if (m.type==common::dataStructures::MountType::Retrieve) tapeSet.insert(m.vid);
+    if (m.type==common::dataStructures::MountType::Retrieve) retrieveTapeSet.insert(m.vid);
   }
-  if (tapeSet.size()) {
-    auto tapesInfo=m_catalogue.getTapesByVid(tapeSet);
+  common::dataStructures::VidToTapeMap retrieveTapesInfo;
+  if (retrieveTapeSet.size()) {
+    retrieveTapesInfo=m_catalogue.getTapesByVid(retrieveTapeSet);
     getTapeInfoTime = timer.secs(utils::Timer::resetCounter);
     for (auto &m:mountInfo->potentialMounts) {
       if (m.type==common::dataStructures::MountType::Retrieve) {
-        m.logicalLibrary=tapesInfo[m.vid].logicalLibraryName;
-        m.tapePool=tapesInfo[m.vid].tapePoolName;
-        m.vendor = tapesInfo[m.vid].vendor;
-        m.mediaType = tapesInfo[m.vid].mediaType;
-        m.vo = tapesInfo[m.vid].vo;
-        m.capacityInBytes = tapesInfo[m.vid].capacityInBytes;
+        m.logicalLibrary=retrieveTapesInfo[m.vid].logicalLibraryName;
+        m.tapePool=retrieveTapesInfo[m.vid].tapePoolName;
+        m.vendor = retrieveTapesInfo[m.vid].vendor;
+        m.mediaType = retrieveTapesInfo[m.vid].mediaType;
+        m.vo = retrieveTapesInfo[m.vid].vo;
+        m.capacityInBytes = retrieveTapesInfo[m.vid].capacityInBytes;
       }
     }
   }
@@ -965,11 +966,11 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
   // We cannot filter the archives yet
   for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
     if (m->type == common::dataStructures::MountType::Retrieve && m->logicalLibrary != logicalLibraryName) {
-      m = mountInfo->potentialMounts.erase(m);
+        m = mountInfo->potentialMounts.erase(m);
     } else {
       m++;
+      }
     }
-  }
   
   // With the existing mount list, we can now populate the potential mount list
   // with the per tape pool existing mount statistics.
@@ -1039,9 +1040,9 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
       params.add("mountType", common::dataStructures::toString(m->type))
             .add("existingMounts", existingMounts)
             .add("bytesQueued", m->bytesQueued)
-            .add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
+            .add("minBytesToWarrantMount", minBytesToWarrantAMount)
             .add("filesQueued", m->filesQueued)
-            .add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
+            .add("minFilesToWarrantMount", minFilesToWarrantAMount)
             .add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
             .add("minArchiveRequestAge", m->minRequestAge)
             .add("existingMounts", existingMounts)
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index fbc92487fd..b799b74a03 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -2737,6 +2737,149 @@ TEST_P(SchedulerTest, expandRepackRequestDisabledTape) {
   }
 }
 
+TEST_P(SchedulerTest, noMountIsTriggeredWhenTapeIsDisabled) {
+  using namespace cta;
+  using namespace cta::objectstore;
+  unitTests::TempDirectory tempDirectory;
+  auto &catalogue = getCatalogue();
+  auto &scheduler = getScheduler();
+  auto &schedulerDB = getSchedulerDB();
+
+  cta::objectstore::Backend& backend = schedulerDB.getBackend();
+  setupDefaultCatalogue();
+#ifdef STDOUT_LOGGING
+  log::StdoutLogger dl("dummy", "unitTest");
+#else
+  log::DummyLogger dl("", "");
+#endif
+  log::LogContext lc(dl);
+  
+  //Create an agent to represent this test process
+  cta::objectstore::AgentReference agentReference("expandRepackRequestTest", dl);
+  cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend);
+  agent.initialize();
+  agent.setTimeout_us(0);
+  agent.insertAndRegisterSelf(lc);
+  
+  const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
+  const bool disabledValue = false;
+  const bool fullValue = true;
+  const bool readOnlyValue = false;
+  const std::string comment = "Create tape";
+  cta::common::dataStructures::SecurityIdentity admin;
+  admin.username = "admin_user_name";
+  admin.host = "admin_host";
+  
+  //Create a logical library in the catalogue
+  const bool logicalLibraryIsDisabled = false;
+  catalogue.createLogicalLibrary(admin, s_libraryName, logicalLibraryIsDisabled, "Create logical library");
+  
+  std::ostringstream ossVid;
+  ossVid << s_vid << "_" << 1;
+  std::string vid = ossVid.str();
+  catalogue.createTape(s_adminOnAdminHost,vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes,
+    disabledValue, fullValue, readOnlyValue, comment);
+  
+  //Create a storage class in the catalogue
+  common::dataStructures::StorageClass storageClass;
+  storageClass.diskInstance = s_diskInstance;
+  storageClass.name = s_storageClassName;
+  storageClass.nbCopies = 2;
+  storageClass.comment = "Create storage class";
+
+  const std::string tapeDrive = "tape_drive";
+  const uint64_t nbArchiveFilesPerTape = 10;
+  const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000;
+  
+  //Simulate the writing of 10 files in 1 tape in the catalogue
+  std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1;
+  {
+    uint64_t archiveFileId = 1;
+    std::string currentVid = vid;
+    for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) {
+      std::ostringstream diskFileId;
+      diskFileId << (12345677 + archiveFileId);
+      std::ostringstream diskFilePath;
+      diskFilePath << "/public_dir/public_file_"<<1<<"_"<< j;
+      auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
+      auto & fileWritten = *fileWrittenUP;
+      fileWritten.archiveFileId = archiveFileId++;
+      fileWritten.diskInstance = storageClass.diskInstance;
+      fileWritten.diskFileId = diskFileId.str();
+      fileWritten.diskFilePath = diskFilePath.str();
+      fileWritten.diskFileOwnerUid = PUBLIC_OWNER_UID;
+      fileWritten.diskFileGid = PUBLIC_GID;
+      fileWritten.size = archiveFileSize;
+      fileWritten.checksumBlob.insert(cta::checksum::ADLER32,"1234");
+      fileWritten.storageClassName = s_storageClassName;
+      fileWritten.vid = currentVid;
+      fileWritten.fSeq = j;
+      fileWritten.blockId = j * 100;
+      fileWritten.size = archiveFileSize;
+      fileWritten.copyNb = 1;
+      fileWritten.tapeDrive = tapeDrive;
+      tapeFilesWrittenCopy1.emplace(fileWrittenUP.release());
+    }
+    //update the DB tape
+    catalogue.filesWrittenToTape(tapeFilesWrittenCopy1);
+    tapeFilesWrittenCopy1.clear();
+  }
+  //Test the queueing of the Retrieve Request and try to mount after having disabled the tape
+  scheduler.waitSchedulerDbSubthreadsComplete();
+  {
+    std::string diskInstance="disk_instance";
+    cta::common::dataStructures::RetrieveRequest rReq;
+    rReq.archiveFileID=1;
+    rReq.requester.name = s_userName;
+    rReq.requester.group = "someGroup";
+    rReq.dstURL = "dst_url";
+    scheduler.queueRetrieve(diskInstance, rReq, lc);
+    scheduler.waitSchedulerDbSubthreadsComplete();
+  }
+  //disabled the tape
+  catalogue.setTapeDisabled(admin,vid,true);
+  
+  //No mount should be returned by getNextMount
+  ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName, "drive0", lc));
+  
+  //enable the tape
+  catalogue.setTapeDisabled(admin,vid, false);
+  
+  //A mount should be returned by getNextMount
+  ASSERT_NE(nullptr,scheduler.getNextMount(s_libraryName,"drive0",lc));
+  
+  //disable the tape
+  catalogue.setTapeDisabled(admin,vid, true);
+  ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName,"drive0",lc));
+  
+  //Queue a Repack Request with --disabledtape flag set to force Retrieve Mount for disabled tape
+  scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly, common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,true, lc);
+  scheduler.waitSchedulerDbSubthreadsComplete();
+
+  log::TimingList tl;
+  utils::Timer t;
+
+  scheduler.promoteRepackRequestsToToExpand(lc);
+  scheduler.waitSchedulerDbSubthreadsComplete();
+
+  auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand();
+
+  scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc);
+  scheduler.waitSchedulerDbSubthreadsComplete();
+
+  /*
+   * Test expected behaviour for NOW:
+   * The getNextMount should return a mount as the tape is disabled and there are repack --disabledtape retrieve jobs in it
+   * We will then get the Repack AND USER jobs from the getNextJobBatch
+   */
+  auto nextMount = scheduler.getNextMount(s_libraryName,"drive0",lc);
+  ASSERT_NE(nullptr,nextMount);
+  std::unique_ptr<cta::RetrieveMount> retrieveMount;
+  retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(nextMount.release()));
+  auto jobBatch = retrieveMount->getNextJobBatch(20,20*archiveFileSize,lc);
+  ASSERT_EQ(11,jobBatch.size()); //1 user job + 10 Repack jobs = 11 jobs in the batch
+}
+
 TEST_P(SchedulerTest, archiveReportMultipleAndQueueRetrievesWithActivities) {
   using namespace cta;
 
@@ -3967,7 +4110,7 @@ TEST_P(SchedulerTest, repackRetrieveRequestsFailToFetchDiskSystem){
   }
 }
 
-
+  
 #undef TEST_MOCK_DB
 #ifdef TEST_MOCK_DB
 static cta::MockSchedulerDatabaseFactory mockDbFactory;
-- 
GitLab