From 52c4e6f34b86494d584402efc5badb5019cf0538 Mon Sep 17 00:00:00 2001
From: Cedric CAFFY <cedric.caffy@cern.ch>
Date: Wed, 28 Aug 2019 17:46:36 +0200
Subject: [PATCH] Added a check for the repack just add copies workflow that
 verify that two archive routes does not point to the same tapepool

Remove the repack buffer if the repack expansion indicates that no Retrieve is necessary
Corrected the XrootDirectory::exist() method
---
 disk/DiskFile.cpp                  |  6 +--
 objectstore/ArchiveRequest.cpp     | 14 +++++++
 objectstore/ArchiveRequest.hpp     |  1 +
 objectstore/RetrieveRequest.cpp    |  8 ++++
 scheduler/OStoreDB/OStoreDB.cpp    | 19 +++++++--
 scheduler/RepackRequestManager.cpp |  5 +--
 scheduler/Scheduler.cpp            | 64 +++++++++++++++---------------
 scheduler/Scheduler.hpp            |  3 +-
 scheduler/SchedulerDatabase.hpp    |  8 +++-
 scheduler/SchedulerTest.cpp        |  7 ++--
 10 files changed, 84 insertions(+), 51 deletions(-)

diff --git a/disk/DiskFile.cpp b/disk/DiskFile.cpp
index 18667be230..32a31aa32c 100644
--- a/disk/DiskFile.cpp
+++ b/disk/DiskFile.cpp
@@ -774,11 +774,7 @@ bool XRootdDirectory::exist() {
   if(statStatus.errNo == XErrorCode::kXR_NotFound){
     return false;
   }
-  cta::exception::XrootCl::throwOnError(statStatus,"In XrootdDirectory::exist(): fail to determine if directory exists.");
-  if(statInfo->GetSize() !=  0){
-    return true;
-  }
-  return false;
+  return true;
 }
 
 std::set<std::string> XRootdDirectory::getFilesName(){
diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp
index 7e52328a14..3d42dd084c 100644
--- a/objectstore/ArchiveRequest.cpp
+++ b/objectstore/ArchiveRequest.cpp
@@ -65,6 +65,20 @@ void ArchiveRequest::initialize() {
   m_payloadInterpreted = true;
 }
 
+void ArchiveRequest::commit(){
+  checkPayloadWritable();
+  checkPayloadReadable();
+  for(auto & job: m_payload.jobs()){
+    int nbTapepool = std::count_if(m_payload.jobs().begin(),m_payload.jobs().end(),[&job](const cta::objectstore::serializers::ArchiveJob & archiveJob){
+      return archiveJob.tapepool() == job.tapepool();
+    });
+    if(nbTapepool != 1){
+      throw cta::exception::Exception("In ArchiveRequest::commit(), cannot insert an ArchiveRequest containing archive jobs with the same destination tapepool");
+    }
+  }
+  ObjectOps<serializers::ArchiveRequest, serializers::ArchiveRequest_t>::commit();
+}
+
 //------------------------------------------------------------------------------
 // ArchiveRequest::addJob()
 //------------------------------------------------------------------------------
diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp
index 6936f967a1..8d925e8150 100644
--- a/objectstore/ArchiveRequest.hpp
+++ b/objectstore/ArchiveRequest.hpp
@@ -44,6 +44,7 @@ public:
   ArchiveRequest(Backend & os);
   ArchiveRequest(GenericObject & go);
   void initialize();
+  void commit();
   // Ownership of archive requests is managed per job. Object level owner has no meaning.
   std::string getOwner() = delete;
   void setOwner(const std::string &) = delete;
diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp
index ee799a1592..45101ef3b5 100644
--- a/objectstore/RetrieveRequest.cpp
+++ b/objectstore/RetrieveRequest.cpp
@@ -489,6 +489,14 @@ void RetrieveRequest::setRetrieveFileQueueCriteria(const cta::common::dataStruct
   ArchiveFileSerDeser(criteria.archiveFile).serialize(*m_payload.mutable_archivefile());
   for (auto &tf: criteria.archiveFile.tapeFiles) {
     MountPolicySerDeser(criteria.mountPolicy).serialize(*m_payload.mutable_mountpolicy());
+    /*
+     * Explaination about these hardcoded retries :
+     * The hardcoded RetriesWithinMount will ensure that we will try to retrieve the file 3 times
+     * in the same mount.
+     * The hardcoded TotalRetries ensure that we will never try more than 6 times to retrieve a file.
+     * As totalretries = 6 and retrieswithinmount = 3, this will ensure that the file will be retried by maximum 2 mounts.
+     * (2 mounts * 3 retrieswithinmount = 6 totalretries)
+     */
     const uint32_t hardcodedRetriesWithinMount = 3;
     const uint32_t hardcodedTotalRetries = 6;
     const uint32_t hardcodedReportRetries = 2;
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index f074a177c4..419984bb62 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -2237,6 +2237,15 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
         for (auto & ar: archiveRoutesMap.at(std::make_tuple(rsr.archiveFile.diskInstance, rsr.archiveFile.storageClass))) {
           rRRepackInfo.archiveRouteMap[ar.second.copyNb] = ar.second.tapePoolName;
         }
+        //Check that we do not have the same destination tapepool for two different copyNb
+        for(auto & currentCopyNbTapePool: rRRepackInfo.archiveRouteMap){
+          int nbTapepool = std::count_if(rRRepackInfo.archiveRouteMap.begin(),rRRepackInfo.archiveRouteMap.end(),[&currentCopyNbTapePool](const std::pair<uint64_t,std::string> & copyNbTapepool){
+            return copyNbTapepool.second == currentCopyNbTapePool.second;
+          });
+          if(nbTapepool != 1){
+            throw cta::ExpandRepackRequestException("In OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(), found the same destination tapepool for multiple copyNb.");
+          }
+        }
       } catch (std::out_of_range &) {
         notCreatedSubrequests.emplace_back(rsr);
         failedCreationStats.files++;
@@ -4299,10 +4308,12 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){
     objectstore::ArchiveRequest & ar = *sri.subrequest;
     if (moreJobsToDo) {
       try {
-        jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> (
-              ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(),
-              newStatus)), 
-            sri});
+        if(ar.exists()){
+          jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> (
+                ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(),
+                newStatus)), 
+              sri});
+        }
       } catch (cta::exception::Exception & ex) {
         // Log the error
         log::ScopedParamContainer params(lc);
diff --git a/scheduler/RepackRequestManager.cpp b/scheduler/RepackRequestManager.cpp
index d2c385ac3e..7f7146793d 100644
--- a/scheduler/RepackRequestManager.cpp
+++ b/scheduler/RepackRequestManager.cpp
@@ -41,10 +41,7 @@ void RepackRequestManager::runOnePass(log::LogContext& lc) {
       //We have a RepackRequest that has the status ToExpand, expand it
       try{
         m_scheduler.expandRepackRequest(repackRequest,timingList,t,lc);
-      } catch (const NoArchiveRoute& ex){
-        lc.log(log::ERR,ex.what());
-        repackRequest->fail();
-      } catch (const NoStorageClass &ex){
+      } catch (const ExpandRepackRequestException& ex){
         lc.log(log::ERR,ex.what());
         repackRequest->fail();
       } catch (const cta::exception::Exception &e){
diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp
index 691988c67b..eddebd0e81 100644
--- a/scheduler/Scheduler.cpp
+++ b/scheduler/Scheduler.cpp
@@ -450,13 +450,6 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
   auto repackInfo = repackRequest->getRepackInfo();
   
   typedef cta::common::dataStructures::RepackInfo::Type RepackType;
-  /*if (repackInfo.type != RepackType::MoveOnly && repackInfo.type != RepackType::AddCopiesOnly) {
-    log::ScopedParamContainer params(lc);
-    params.add("tapeVid", repackInfo.vid);
-    lc.log(log::ERR, "In Scheduler::expandRepackRequest(): failing repack request with unsupported (yet) type.");
-    repackRequest->fail();
-    return;
-  }*/
   
   //We need to get the ArchiveRoutes to allow the retrieval of the tapePool in which the
   //tape where the file is is located
@@ -479,10 +472,10 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
   std::stringstream dirBufferURL;
   dirBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/";
   std::set<std::string> filesInDirectory;
+  std::unique_ptr<cta::disk::Directory> dir;
   if(archiveFilesForCatalogue.hasMore()){
     //We only create the folder if there are some files to Repack
     cta::disk::DirectoryFactory dirFactory;
-    std::unique_ptr<cta::disk::Directory> dir;
     dir.reset(dirFactory.createDirectory(dirBufferURL.str()));
     if(dir->exist()){
       filesInDirectory = dir->getFilesName();
@@ -544,34 +537,35 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
           common::dataStructures::StorageClass sc = *storageClassItor;
           uint64_t nbFilesAlreadyArchived = archiveFile.tapeFiles.size();
           uint64_t nbCopiesInStorageClass = sc.nbCopies;
-          if(nbFilesAlreadyArchived < nbCopiesInStorageClass){
-            uint64_t filesToArchive = nbCopiesInStorageClass - nbFilesAlreadyArchived;
+          uint64_t filesToArchive = nbCopiesInStorageClass - nbFilesAlreadyArchived;
+          if(filesToArchive > 0){
             totalStatsFile.totalFilesToArchive += filesToArchive;
             totalStatsFile.totalBytesToArchive += (filesToArchive * archiveFile.fileSize);
-            if(filesToArchive != 0){
-              std::set<uint64_t> copyNbsAlreadyInCTA;
-              for (auto & tc: archiveFile.tapeFiles) {
-                copyNbsAlreadyInCTA.insert(tc.copyNb);
-                if (tc.vid == repackInfo.vid) {
-                  // We make the (reasonable) assumption that the archive file only has one copy on this tape.
-                  // If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape.
-                  // This will prevent double subrequest creation (we already have such a mechanism in case of crash and 
-                  // restart of expansion.
-                  //We found the copy of the file we want to retrieve and archive
-                  //retrieveSubRequest.fSeq = tc.fSeq;
+            std::set<uint64_t> copyNbsAlreadyInCTA;
+            for (auto & tc: archiveFile.tapeFiles) {
+              copyNbsAlreadyInCTA.insert(tc.copyNb);
+              if (tc.vid == repackInfo.vid) {
+                // We make the (reasonable) assumption that the archive file only has one copy on this tape.
+                // If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape.
+                // This will prevent double subrequest creation (we already have such a mechanism in case of crash and 
+                // restart of expansion.
+                //We found the copy of the file we want to retrieve and archive
+                //retrieveSubRequest.fSeq = tc.fSeq;
+                if(repackInfo.type == RepackType::AddCopiesOnly)
                   retrieveSubRequest.fSeq = (retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) ? tc.fSeq : std::max(tc.fSeq, retrieveSubRequest.fSeq);
-                }
-              }
-              for(auto archiveFileRoutesItor = archiveFileRoutes.begin(); archiveFileRoutesItor != archiveFileRoutes.end(); ++archiveFileRoutesItor){
-                if(copyNbsAlreadyInCTA.find(archiveFileRoutesItor->first) == copyNbsAlreadyInCTA.end()){
-                  //We need to archive the missing copy
-                  retrieveSubRequest.copyNbsToRearchive.insert(archiveFileRoutesItor->first);
-                }
               }
-              if(retrieveSubRequest.copyNbsToRearchive.size() != filesToArchive){
-                throw NoArchiveRoute("In Scheduler::expandRepackRequest(): Missing archive routes for the creation of the new copies of the files");
+            }
+            for(auto archiveFileRoutesItor = archiveFileRoutes.begin(); archiveFileRoutesItor != archiveFileRoutes.end(); ++archiveFileRoutesItor){
+              if(copyNbsAlreadyInCTA.find(archiveFileRoutesItor->first) == copyNbsAlreadyInCTA.end()){
+                //We need to archive the missing copy
+                retrieveSubRequest.copyNbsToRearchive.insert(archiveFileRoutesItor->first);
               }
-            } else {
+            }
+            if(retrieveSubRequest.copyNbsToRearchive.size() != filesToArchive){
+              throw ExpandRepackRequestException("In Scheduler::expandRepackRequest(): Missing archive routes for the creation of the new copies of the files");
+            }
+          } else {
+            if(repackInfo.type == RepackType::AddCopiesOnly){
               //Nothing to Archive so nothing to Retrieve as well
               retrieveSubrequests.pop_back();
               continue;
@@ -579,7 +573,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
           }
         } else {
           //No storage class have been found for the current tapefile throw an exception
-          throw NoStorageClass("In Scheduler::expandRepackRequest(): No storage class have been found for the file to add copies");
+          throw ExpandRepackRequestException("In Scheduler::expandRepackRequest(): No storage class have been found for the file to add copies");
         }
       }
       
@@ -649,6 +643,12 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
       lc.log(log::INFO,"Expansion time reached, Repack Request requeued in ToExpand queue.");
     }
   } else {
+    if(totalStatsFile.totalFilesToRetrieve == 0){
+      //If no files have been retrieve, the repack buffer will have to be deleted
+      if(dir != nullptr && dir->exist()){
+        dir->rmdir();
+      }
+    }
     repackRequest->m_dbReq->expandDone();
     lc.log(log::INFO,"In Scheduler::expandRepackRequest(), repack request expanded");
   }
diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp
index 3de3649479..dc4cbb1d13 100644
--- a/scheduler/Scheduler.hpp
+++ b/scheduler/Scheduler.hpp
@@ -74,8 +74,7 @@ class RetrieveJob;
  * The scheduler is the unique entry point to the central storage for taped. It is 
  * 
  */
-CTA_GENERATE_EXCEPTION_CLASS(NoArchiveRoute);
-CTA_GENERATE_EXCEPTION_CLASS(NoStorageClass);
+CTA_GENERATE_EXCEPTION_CLASS(ExpandRepackRequestException);
 
 class Scheduler {
   
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index 1c251816c6..94df1ce61a 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -596,13 +596,19 @@ public:
 	return false;
       /**
        * For the tests, we try to have the priority by 
-       * alphabetical order : vid1 should be treated before vid2,
+       * alphabetical order : vid1 / tapepool1 should be treated before vid2/tapepool2,
        * so if this->vid < other.vid : then this > other.vid, so return false
        */
       if(vid < other.vid)
 	return false;
       if(vid > other.vid)
 	return true;
+      
+      if(tapePool < other.tapePool)
+	return false;
+      if(tapePool > other.tapePool)
+	return true;
+      
       return false;
     }
   };
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index 37709a3f0a..4fd01500cf 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -3050,7 +3050,7 @@ TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) {
   {
     scheduler.waitSchedulerDbSubthreadsComplete();
     {
-      //The first mount given by the scheduler should be the vidDestination2
+      //The first mount given by the scheduler should be the vidDestination1 that belongs to the tapepool1
       std::unique_ptr<cta::TapeMount> mount;
       mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
       ASSERT_NE(nullptr, mount.get());
@@ -3063,11 +3063,12 @@ TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) {
       {
         auto jobBatch = archiveMount->getNextJobBatch(20,20 * archiveFileSize,lc);
         ASSERT_EQ(10,jobBatch.size());
-        ASSERT_EQ(vidDestination2,archiveMount->getVid());
+        ASSERT_EQ(vidDestination1,archiveMount->getVid());
       }
     }
     
     {
+      //Second mount should be the vidDestination2 that belongs to the tapepool2
       std::unique_ptr<cta::TapeMount> mount;
       mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
       ASSERT_NE(nullptr, mount.get());
@@ -3080,7 +3081,7 @@ TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) {
       {
         auto jobBatch = archiveMount->getNextJobBatch(20,20 * archiveFileSize,lc);
         ASSERT_EQ(10,jobBatch.size());
-        ASSERT_EQ(vidDestination1,archiveMount->getVid());
+        ASSERT_EQ(vidDestination2,archiveMount->getVid());
       }
     }
   }
-- 
GitLab