From 50109db543fa7dabd63752d1ca534ca309ea2a80 Mon Sep 17 00:00:00 2001
From: Cedric CAFFY <cedric.caffy@hotmail.fr>
Date: Thu, 17 Jan 2019 10:17:42 -0500
Subject: [PATCH] First version of expandRepackRequest + unit test

---
 common/CMakeLists.txt                         |   1 +
 .../RetrieveFileQueueCriteria.cpp             |  32 ++++++
 .../RetrieveFileQueueCriteria.hpp             |   2 +
 common/dataStructures/RetrieveRequest.cpp     |   8 +-
 common/dataStructures/RetrieveRequest.hpp     |   1 +
 objectstore/RetrieveRequest.cpp               |  11 ++
 objectstore/RetrieveRequest.hpp               |   3 +
 objectstore/cta.proto                         |   1 +
 scheduler/Scheduler.cpp                       |  57 +++++++--
 scheduler/Scheduler.hpp                       |  16 +++
 scheduler/SchedulerTest.cpp                   | 108 ++++++++++++++++++
 11 files changed, 226 insertions(+), 14 deletions(-)
 create mode 100644 common/dataStructures/RetrieveFileQueueCriteria.cpp

diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index faa6b7393b..0d415cb679 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -53,6 +53,7 @@ set (COMMON_LIB_SRC_FILES
   dataStructures/RepackInfo.cpp
   dataStructures/RequesterGroupMountRule.cpp
   dataStructures/RequesterMountRule.cpp
+  dataStructures/RetrieveFileQueueCriteria.cpp
   dataStructures/RetrieveJob.cpp
   dataStructures/RetrieveRequest.cpp
   dataStructures/SecurityIdentity.cpp
diff --git a/common/dataStructures/RetrieveFileQueueCriteria.cpp b/common/dataStructures/RetrieveFileQueueCriteria.cpp
new file mode 100644
index 0000000000..df8980f6f6
--- /dev/null
+++ b/common/dataStructures/RetrieveFileQueueCriteria.cpp
@@ -0,0 +1,32 @@
+/*
+ * The CERN Tape Archive (CTA) project
+ * Copyright (C) 2015  CERN
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "RetrieveFileQueueCriteria.hpp"
+namespace cta {
+namespace common {
+namespace dataStructures {
+    
+RetrieveFileQueueCriteria& RetrieveFileQueueCriteria::operator=(const RetrieveFileQueueCriteria& other){
+    if(this != &other){
+        this->archiveFile = other.archiveFile;
+        this->mountPolicy = other.mountPolicy;
+    }
+    return *this;
+}
+
+}}}
\ No newline at end of file
diff --git a/common/dataStructures/RetrieveFileQueueCriteria.hpp b/common/dataStructures/RetrieveFileQueueCriteria.hpp
index 9390556424..9e1dc809bc 100644
--- a/common/dataStructures/RetrieveFileQueueCriteria.hpp
+++ b/common/dataStructures/RetrieveFileQueueCriteria.hpp
@@ -42,6 +42,8 @@ struct RetrieveFileQueueCriteria {
    * The mount policy.
    */
   MountPolicy mountPolicy;
+  
+  RetrieveFileQueueCriteria &operator=(const RetrieveFileQueueCriteria& other);
 
 }; // struct RetrieveFileQueueCriteria
 
diff --git a/common/dataStructures/RetrieveRequest.cpp b/common/dataStructures/RetrieveRequest.cpp
index 651e4112d4..35154d638f 100644
--- a/common/dataStructures/RetrieveRequest.cpp
+++ b/common/dataStructures/RetrieveRequest.cpp
@@ -27,7 +27,7 @@ namespace dataStructures {
 //------------------------------------------------------------------------------
 // constructor
 //------------------------------------------------------------------------------
-RetrieveRequest::RetrieveRequest(): archiveFileID(0) {}
+RetrieveRequest::RetrieveRequest(): archiveFileID(0),isRepack(false) {}
 
 //------------------------------------------------------------------------------
 // operator==
@@ -37,7 +37,8 @@ bool RetrieveRequest::operator==(const RetrieveRequest &rhs) const {
       && archiveFileID==rhs.archiveFileID
       && dstURL==rhs.dstURL
       && diskFileInfo==rhs.diskFileInfo
-      && creationLog==rhs.creationLog;
+      && creationLog==rhs.creationLog
+      && isRepack == rhs.isRepack;
 }
 
 //------------------------------------------------------------------------------
@@ -55,7 +56,8 @@ std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj) {
      << " archiveFileID=" << obj.archiveFileID
      << " dstURL=" << obj.dstURL
      << " diskFileInfo=" << obj.diskFileInfo
-     << " creationLog=" << obj.creationLog << ")";
+     << " creationLog=" << obj.creationLog 
+     << " isRepack=" << obj.isRepack<<")";
   return os;
 }
 
diff --git a/common/dataStructures/RetrieveRequest.hpp b/common/dataStructures/RetrieveRequest.hpp
index d962583585..21d851c13b 100644
--- a/common/dataStructures/RetrieveRequest.hpp
+++ b/common/dataStructures/RetrieveRequest.hpp
@@ -48,6 +48,7 @@ struct RetrieveRequest {
   std::string errorReportURL;
   DiskFileInfo diskFileInfo;
   EntryLog creationLog;
+  bool isRepack;
 
 }; // struct RetrieveRequest
 
diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp
index eaca44a916..707237ed19 100644
--- a/objectstore/RetrieveRequest.cpp
+++ b/objectstore/RetrieveRequest.cpp
@@ -57,6 +57,7 @@ void RetrieveRequest::initialize() {
   ObjectOps<serializers::RetrieveRequest, serializers::RetrieveRequest_t>::initialize();
   m_payload.set_failurereportlog("");
   m_payload.set_failurereporturl("");
+  m_payload.set_isrepack(false);
   // This object is good to go (to storage)
   m_payloadInterpreted = true;
 }
@@ -850,4 +851,14 @@ void RetrieveRequest::setJobStatus(uint64_t copyNumber, const serializers::Retri
   throw exception::Exception("In RetrieveRequest::setJobStatus(): job not found.");
 }
 
+bool RetrieveRequest::isRepack(){
+  checkPayloadReadable();
+  return m_payload.isrepack();
+}
+
+void RetrieveRequest::setIsRepack(const bool isRepack){
+  checkPayloadWritable();
+  m_payload.set_isrepack(isRepack);
+}
+
 }} // namespace cta::objectstore
diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp
index 4806f41d83..1012f1936a 100644
--- a/objectstore/RetrieveRequest.hpp
+++ b/objectstore/RetrieveRequest.hpp
@@ -93,6 +93,9 @@ public:
     //! The copy number to enqueue. It could be different from the updated one in mixed success/failure scenario.
     serializers::RetrieveJobStatus nextStatus;
   };
+  bool isRepack();
+  void setIsRepack(const bool isRepack);
+  
 private:
   /*!
    * Determine and set the new status of the job.
diff --git a/objectstore/cta.proto b/objectstore/cta.proto
index 56b4f9b625..3aa848463d 100644
--- a/objectstore/cta.proto
+++ b/objectstore/cta.proto
@@ -369,6 +369,7 @@ message RetrieveRequest {
   repeated RetrieveJob jobs = 9154;
   required string failurereporturl = 9155;
   required string failurereportlog = 9156;
+  required bool isrepack = 9157; //In protobuf, default values for bool is false
 }
 
 message ValueCountPair {
diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp
index 9e2eae9441..d637f496ea 100644
--- a/scheduler/Scheduler.cpp
+++ b/scheduler/Scheduler.cpp
@@ -183,8 +183,13 @@ void Scheduler::queueRetrieve(
   using utils::midEllipsis;
   utils::Timer t;
   // Get the queue criteria
-  const common::dataStructures::RetrieveFileQueueCriteria queueCriteria =
-    m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, lc);
+  common::dataStructures::RetrieveFileQueueCriteria queueCriteria;
+  if(!request.isRepack){
+    queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, lc);
+  } else {
+    //Repack does not need policy
+    queueCriteria.archiveFile = m_catalogue.getArchiveFileById(request.archiveFileID);
+  }
   auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
   std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, lc);
   auto schedulerDbTime = t.secs();
@@ -220,15 +225,23 @@ void Scheduler::queueRetrieve(
     spc.add(tc.str(), tf.second);
   }
   spc.add("selectedVid", selectedVid)
-     .add("policyName", queueCriteria.mountPolicy.name)
-     .add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed)
-     .add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge)
-     .add("policyPriority", queueCriteria.mountPolicy.retrievePriority)
      .add("catalogueTime", catalogueTime)
      .add("schedulerDbTime", schedulerDbTime);
+  if(!request.isRepack){
+      spc.add("policyName", queueCriteria.mountPolicy.name)
+     .add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed)
+     .add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge)
+     .add("policyPriority", queueCriteria.mountPolicy.retrievePriority);
+  }
   lc.log(log::INFO, "Queued retrieve request");
 }
 
+void Scheduler::queueRetrieveRequestForRepack(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request,
+  std::list<uint64_t> copyNbs, log::LogContext &lc)
+{
+
+}
+
 //------------------------------------------------------------------------------
 // deleteArchive
 //------------------------------------------------------------------------------
@@ -385,15 +398,37 @@ std::unique_ptr<RepackRequest> Scheduler::getNextRepackRequestToExpand() {
     return nullptr;
 }
 
+const std::string Scheduler::generateRetrieveDstURL(const cta::common::dataStructures::DiskFileInfo dfi) const{
+  std::ostringstream strStream;
+  strStream<<"repack:/"<<dfi.path;
+  return strStream.str();
+}
+
 //------------------------------------------------------------------------------
 // expandRepackRequest
 //------------------------------------------------------------------------------
 void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList&, utils::Timer&, log::LogContext& lc) {
-  typedef cta::common::dataStructures::RetrieveRequest RetrieveRequest;
-  RetrieveRequest retrieveRequest;
-  retrieveRequest.dstURL = "";
-  this->queueRetrieve("ExpandRepack",retrieveRequest,lc);
-  throw exception::Exception("In Scheduler::expandRepackRequest(): not implemented");
+  uint64_t fseq = c_defaultFseqForRepack;
+  std::list<common::dataStructures::ArchiveFile> files;
+  std::list<uint64_t> copyNbs;
+  auto vid = repackRequest->getRepackInfo().vid;
+  while(true) {
+    files = m_catalogue.getFilesForRepack(vid,fseq,c_defaultMaxNbFilesForRepack);
+    for(auto &archiveFile : files)
+    {
+      cta::common::dataStructures::RetrieveRequest retrieveRequest;
+      retrieveRequest.archiveFileID = archiveFile.archiveFileID;
+      retrieveRequest.diskFileInfo = archiveFile.diskFileInfo;
+      retrieveRequest.dstURL = generateRetrieveDstURL(archiveFile.diskFileInfo);
+      retrieveRequest.isRepack = true;
+      //retrieveRequest.requester = repackRequest->
+      queueRetrieve(archiveFile.diskInstance,retrieveRequest,lc);
+    }
+    if (files.size()) {
+      auto & tf=files.back().tapeFiles;
+      fseq = std::find_if(tf.cbegin(), tf.cend(), [vid](decltype(*(tf.cbegin())) &f){ return f.second.vid == vid; })->second.fSeq + 1;
+    } else break;
+  }
 }
 
 
diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp
index ea310d590f..0d99c5638c 100644
--- a/scheduler/Scheduler.hpp
+++ b/scheduler/Scheduler.hpp
@@ -142,6 +142,17 @@ public:
   void queueRetrieve(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request,
     log::LogContext &lc);
   
+  
+  /**
+   * Queue a retrieve request specific for Repack
+   * @param instanceName
+   * @param request
+   * @param copyNbs
+   * @param lc
+   */
+  void queueRetrieveRequestForRepack(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request,
+  std::list<uint64_t> copyNbs, log::LogContext &lc);
+  
   /** 
    * Delete an archived file or a file which is in the process of being archived.
    * Throws a UserError exception in case of wrong request parameters (ex. unknown file id)
@@ -282,6 +293,9 @@ public:
 
   /*============== Actual mount scheduling and queue status reporting ========*/
 private:
+  const uint64_t c_defaultFseqForRepack = 1;
+  const size_t c_defaultMaxNbFilesForRepack = 500;
+  
   typedef std::pair<std::string, common::dataStructures::MountType> tpType;
   /**
    * Common part to getNextMountDryRun() and getNextMount() to populate mount decision info.
@@ -292,6 +306,8 @@ private:
     std::map<tpType, uint32_t> & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList,
     double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc);
   
+  const std::string generateRetrieveDstURL(const cta::common::dataStructures::DiskFileInfo dfi) const;
+  
 public:
   /**
    * Run the mount decision logic lock free, so we have no contention in the 
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index 42c3cf0c57..85add00441 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -34,6 +34,8 @@
 #include "objectstore/GarbageCollector.hpp"
 #include "objectstore/BackendRadosTestSwitch.hpp"
 #include "tests/TestsCompileTimeSwitches.hpp"
+#include "common/Timer.hpp"
+
 #ifdef STDOUT_LOGGING
 #include "common/log/StdoutLogger.hpp"
 #endif
@@ -107,6 +109,7 @@ public:
     const uint64_t nbConns = 1;
     const uint64_t nbArchiveFileListingConns = 1;
     const uint32_t maxTriesToConnect = 1;
+    //m_catalogue = cta::make_unique<catalogue::SchemaCreatingSqliteCatalogue>(m_dummyLog,"/home/cedric/db/test.db",nbConns,nbArchiveFileListingConns,maxTriesToConnect);
     m_catalogue = cta::make_unique<catalogue::InMemoryCatalogue>(m_dummyLog, nbConns, nbArchiveFileListingConns, maxTriesToConnect);
     m_scheduler = cta::make_unique<Scheduler>(*m_catalogue, *m_db, 5, 2*1000*1000);
   }
@@ -1076,6 +1079,111 @@ TEST_P(SchedulerTest, getNextRepackRequestToExpand) {
   ASSERT_EQ(nullRepackRequest,nullptr);
 }
 
+TEST_P(SchedulerTest, expandRepackRequest) {
+  using namespace cta;
+  
+  setupDefaultCatalogue();
+  
+  auto &catalogue = getCatalogue();
+  auto &scheduler = getScheduler();
+  cta::log::DummyLogger dummyLogger("dummy","dummy");
+  log::LogContext lc(dummyLogger);
+  
+  const std::string vid1 = "VID123";
+  const std::string vid2 = "VID456";
+  const std::string mediaType = "media_type";
+  const std::string vendor = "vendor";
+  const std::string logicalLibraryName = "logical_library_name";
+  const std::string tapePoolName1 = "tape_pool_name_1";
+  const std::string vo = "vo";
+  const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
+  const bool disabledValue = false;
+  const bool fullValue = false;
+  const std::string comment = "Create tape";
+  cta::common::dataStructures::SecurityIdentity admin;
+  admin.username = "admin_user_name";
+  admin.host = "admin_host";
+  
+  catalogue.createLogicalLibrary(admin, logicalLibraryName, "Create logical library");
+  catalogue.createTapePool(admin, tapePoolName1, vo, 1, true, "Create tape pool");
+  catalogue.createTape(admin, vid1, mediaType, vendor, logicalLibraryName, tapePoolName1, capacityInBytes,
+    disabledValue, fullValue, comment);
+  common::dataStructures::StorageClass storageClass;
+  storageClass.diskInstance = "disk_instance";
+  storageClass.name = "storage_class";
+  storageClass.nbCopies = 2;
+  storageClass.comment = "Create storage class";
+  catalogue.createStorageClass(admin, storageClass);
+  
+  const std::string checksumType = "checksum_type";
+  const std::string checksumValue = "checksum_value";
+  const std::string tapeDrive = "tape_drive";
+  
+  const uint64_t nbArchiveFiles = 10; // Must be a multiple of 2 fo rthis test
+  const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000;
+  const uint64_t compressedFileSize = archiveFileSize;
+
+  std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1;
+  for(uint64_t i = 1; i <= nbArchiveFiles; i++) {
+    std::ostringstream diskFileId;
+    diskFileId << (12345677 + i);
+    std::ostringstream diskFilePath;
+    diskFilePath << "/public_dir/public_file_" << i;
+
+    // Tape copy 1 written to tape
+    auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
+    auto & fileWritten = *fileWrittenUP;
+    fileWritten.archiveFileId = i;
+    fileWritten.diskInstance = storageClass.diskInstance;
+    fileWritten.diskFileId = diskFileId.str();
+    fileWritten.diskFilePath = diskFilePath.str();
+    fileWritten.diskFileUser = "public_disk_user";
+    fileWritten.diskFileGroup = "public_disk_group";
+    fileWritten.diskFileRecoveryBlob = "opaque_disk_file_recovery_contents";
+    fileWritten.size = archiveFileSize;
+    fileWritten.checksumType = checksumType;
+    fileWritten.checksumValue = checksumValue;
+    fileWritten.storageClassName = storageClass.name;
+    fileWritten.vid = vid1;
+    fileWritten.fSeq = i;
+    fileWritten.blockId = i * 100;
+    fileWritten.compressedSize = compressedFileSize;
+    fileWritten.copyNb = 1;
+    fileWritten.tapeDrive = tapeDrive;
+    tapeFilesWrittenCopy1.emplace(fileWrittenUP.release());
+  }
+  //update the DB tape
+  catalogue.filesWrittenToTape(tapeFilesWrittenCopy1);
+  
+  scheduler.queueRepack(admin,vid1,"bufferURL",common::dataStructures::RepackInfo::Type::ExpandOnly,lc);
+  scheduler.promoteRepackRequestsToToExpand(lc);
+  auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand();
+  log::TimingList tl;
+  utils::Timer t;
+  
+  scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc);
+  scheduler.waitSchedulerDbSubthreadsComplete();
+  
+  std::list<common::dataStructures::RetrieveJob> retrieveJobs = scheduler.getPendingRetrieveJobs(vid1,lc);
+  ASSERT_EQ(retrieveJobs.size(),10);
+  int i = 1;
+  for(auto retrieveJob : retrieveJobs){
+    ASSERT_EQ(retrieveJob.request.archiveFileID,i);
+    ASSERT_EQ(retrieveJob.fileSize,compressedFileSize);
+    std::stringstream ss;
+    ss<<"repack://public_dir/public_file_"<<i;
+    ASSERT_EQ(retrieveJob.request.dstURL, ss.str());
+    ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.copyNb,1);
+    ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.checksumType,checksumType);
+    ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.checksumValue,checksumValue);
+    ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.blockId,i*100);
+    ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.compressedSize,compressedFileSize);
+    ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.fSeq,i);
+    ASSERT_EQ(retrieveJob.tapeCopies[vid1].second.vid,vid1);
+    ++i;
+  }
+}
+
 #undef TEST_MOCK_DB
 #ifdef TEST_MOCK_DB
 static cta::MockSchedulerDatabaseFactory mockDbFactory;
-- 
GitLab