From a346db14d063cc20eaf340483c68bbf44ba7289a Mon Sep 17 00:00:00 2001
From: Cedric CAFFY <cedric.caffy@cern.ch>
Date: Mon, 9 Sep 2019 17:44:37 +0200
Subject: [PATCH] Changed systemtest of Repack disabled tape

Added activable logs to the RetrieveQueueStatistics cache for debugging
---
 .../orchestration/tests/repack_systemtest.sh  | 12 ++++++---
 .../tests/repack_systemtest_wrapper.sh        | 17 ++++++++----
 objectstore/Helpers.cpp                       | 27 ++++++++++++++++++-
 objectstore/Helpers.hpp                       |  7 +++++
 scheduler/OStoreDB/OStoreDB.cpp               |  5 +++-
 scheduler/OStoreDB/OStoreDB.hpp               |  2 +-
 scheduler/Scheduler.cpp                       |  9 ++++---
 scheduler/SchedulerDatabase.hpp               |  6 ++++-
 8 files changed, 70 insertions(+), 15 deletions(-)

diff --git a/continuousintegration/orchestration/tests/repack_systemtest.sh b/continuousintegration/orchestration/tests/repack_systemtest.sh
index 22652c3288..88285bf76b 100755
--- a/continuousintegration/orchestration/tests/repack_systemtest.sh
+++ b/continuousintegration/orchestration/tests/repack_systemtest.sh
@@ -18,6 +18,7 @@ eosinstance : the name of the ctaeos instance to be used (default ctaeos)
 timeout : the timeout in seconds to wait for the repack to be done
 -a : Launch a repack just add copies workflow
 -m : Launch a repack just move workflow
+-d : Force a repack on a disabled tape (adds --disabled to the repack add command)
 EOF
 exit 1
 }
@@ -40,7 +41,8 @@ then
   usage
 fi;
 
-while getopts "v:e:b:t:amg" o; do
+DISABLED_TAPE_FLAG=""
+while getopts "v:e:b:t:amgd" o; do
   case "${o}" in
     v)
       VID_TO_REPACK=${OPTARG}
@@ -63,6 +65,9 @@ while getopts "v:e:b:t:amg" o; do
     g)
       GENERATE_REPORT=0
       ;;
+    d)
+      DISABLED_TAPE_FLAG="--disabledtape"
+      ;;
     *)
       usage
       ;;
@@ -105,7 +110,8 @@ echo "Marking the tape ${VID_TO_REPACK} as full before Repacking it"
 admin_cta tape ch --vid ${VID_TO_REPACK} --full true
 
 echo "Launching repack request for VID ${VID_TO_REPACK}, bufferURL = ${FULL_REPACK_BUFFER_URL}"
-admin_cta re add --vid ${VID_TO_REPACK} ${REPACK_OPTION} --bufferurl ${FULL_REPACK_BUFFER_URL}
+
+admin_cta repack add --vid ${VID_TO_REPACK} ${REPACK_OPTION} --bufferurl ${FULL_REPACK_BUFFER_URL} ${DISABLED_TAPE_FLAG}
 
 SECONDS_PASSED=0
 while test 0 = `admin_cta --json repack ls --vid ${VID_TO_REPACK} | jq -r '.[0] | select(.status == "Complete" or .status == "Failed")' | wc -l`; do
@@ -118,7 +124,7 @@ while test 0 = `admin_cta --json repack ls --vid ${VID_TO_REPACK} | jq -r '.[0]
     exit 1
   fi
 done
-if test 1 = `admin_cta --json repack ls --vid ${VID_TO_REPACK} | jq -r '.[0] | select(.status == "Failed")' | wc -l`; then
+if test 1 = `admin_cta --json repack ls --vid ${VID_TO_REPACK} | jq -r '[.[0] | select (.status == "Failed")] | length'`; then
     echo "Repack failed for tape ${VID_TO_REPACK}."
     exit 1
 fi
diff --git a/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh b/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh
index ea5a7c60fc..97c8415c60 100755
--- a/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh
+++ b/continuousintegration/orchestration/tests/repack_systemtest_wrapper.sh
@@ -84,19 +84,26 @@ kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape reclaim --vid ${VID_TO_REP
 
 echo "Launching a Repack Request on a disabled tape"
 VID_TO_REPACK=$(getFirstVidContainingFiles)
-if [ "$VID_TO_REPACK" != "null" ] 
+
+if [ "$VID_TO_REPACK" != "null" ]
 then
   echo "Marking the tape ${VID_TO_REPACK} as disabled"
   kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape ch --disabled true --vid ${VID_TO_REPACK}
-  echo "Wating 15 seconds so that the RetrieveQueueStatisticsCache is flushed"
-  sleep 15
-  echo "Launching the repack request test on VID ${VID_TO_REPACK}
-  kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} || echo "OK" && exit 1
+  echo "Waiting 20 seconds so that the RetrieveQueueStatisticsCache is flushed"
+  sleep 20
+  echo "Launching the repack request test on VID ${VID_TO_REPACK}"
+  kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} && echo "The repack request is Complete instead of Failed, it should be failed as the tape is disabled" && exit 1 || echo "Test OK"
 else
   echo "No vid found to repack"
   exit 1
 fi;
 
+echo
+echo "Launching the repack request test on VID ${VID_TO_REPACK} with the --disabledtape flag"
+kubectl -n ${NAMESPACE} exec client -- bash /root/repack_systemtest.sh -v ${VID_TO_REPACK} -b ${REPACK_BUFFER_URL} -d || exit 1
+
+echo "Reclaiming tape ${VID_TO_REPACK}"
+kubectl -n ${NAMESPACE} exec ctacli -- cta-admin tape reclaim --vid ${VID_TO_REPACK}
 
 NB_FILES=1152
 kubectl -n ${NAMESPACE} exec client -- bash /root/client_ar.sh -n ${NB_FILES} -s ${FILE_SIZE_KB} -p 100 -d /eos/ctaeos/preprod -v -A || exit 1
diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp
index 71f8623db3..5486920ca7 100644
--- a/objectstore/Helpers.cpp
+++ b/objectstore/Helpers.cpp
@@ -378,18 +378,27 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid
       // If an update is in progress, we wait on it, and get the result after.
       // We have to release the global lock while doing so.
       if (g_retrieveQueueStatistics.at(v).updating) {
+        logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v),"g_retrieveQueueStatistics.at(v).updating");
         // Cache is updating, we wait on update.
         auto updateFuture = g_retrieveQueueStatistics.at(v).updateFuture;
         grqsmLock.unlock();
         updateFuture.wait();
         grqsmLock.lock();
         if(!g_retrieveQueueStatistics.at(v).tapeStatus.disabled || (g_retrieveQueueStatistics.at(v).tapeStatus.disabled && forceDisabledTape)) {
+          logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v),"!g_retrieveQueueStatistics.at(v).tapeStatus.disabled || (g_retrieveQueueStatistics.at(v).tapeStatus.disabled && forceDisabledTape)");
           candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
         }
       } else {
         // We have a cache hit, check it's not stale.
-        if (g_retrieveQueueStatistics.at(v).updateTime + c_retrieveQueueCacheMaxAge > time(nullptr))
+        time_t timeSinceLastUpdate = time(nullptr) - g_retrieveQueueStatistics.at(v).updateTime;
+        if (timeSinceLastUpdate > c_retrieveQueueCacheMaxAge){
+          logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v),"timeSinceLastUpdate ("+std::to_string(timeSinceLastUpdate)+")> c_retrieveQueueCacheMaxAge ("
+                  +std::to_string(c_retrieveQueueCacheMaxAge)+"), cache needs to be updated");
           throw std::out_of_range("");
+        }
+        
+        logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(v),"Cache is not updated, timeSinceLastUpdate ("+std::to_string(timeSinceLastUpdate)+
+        ") <= c_retrieveQueueCacheMaxAge ("+std::to_string(c_retrieveQueueCacheMaxAge)+")");
         // We're lucky: cache hit (and not stale)
         if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled || (g_retrieveQueueStatistics.at(v).tapeStatus.disabled && forceDisabledTape))
           candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
@@ -427,6 +436,8 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid
         throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in tapeStatus.");
       g_retrieveQueueStatistics[v].stats = queuesStats.front();
       g_retrieveQueueStatistics[v].tapeStatus = tapeStatus.at(v);
+      g_retrieveQueueStatistics[v].updateTime = time(nullptr);
+      logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[v]);
       // Signal to potential waiters
       updatePromise.set_value();
       // Update our own candidate list if needed.
@@ -472,6 +483,7 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_
     g_retrieveQueueStatistics.at(vid).stats.filesQueued=files;
     g_retrieveQueueStatistics.at(vid).stats.bytesQueued=bytes;
     g_retrieveQueueStatistics.at(vid).stats.currentPriority = priority;
+    logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(vid));
   } catch (std::out_of_range &) {
     // The entry is missing. We just create it.
     g_retrieveQueueStatistics[vid].stats.bytesQueued=bytes;
@@ -482,6 +494,7 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_
     g_retrieveQueueStatistics[vid].tapeStatus.full=false;
     g_retrieveQueueStatistics[vid].updating = false;
     g_retrieveQueueStatistics[vid].updateTime = time(nullptr);
+    logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[vid]);
   }
 }
 
@@ -709,4 +722,16 @@ void Helpers::removeRepackRequestToIndex(const std::string& vid, Backend& backen
   ri.commit();
 }
 
+void Helpers::logUpdateCacheIfNeeded(const bool entryCreation, const RetrieveQueueStatisticsWithTime& tapeStatistic, std::string message){
+  #ifdef HELPERS_CACHE_UPDATE_LOGGING
+    std::ofstream logFile(HELPERS_CACHE_UPDATE_LOGGING_FILE, std::ofstream::app);
+    std::time_t end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
+    // Chomp newline in the end
+    std::string date=std::ctime(&end_time);
+    date.erase(std::remove(date.begin(), date.end(), '\n'), date.end());
+    logFile << date << " pid=" << ::getpid() << " tid=" << syscall(SYS_gettid) << " message=" << message << " entryCreation="<< entryCreation <<" vid=" 
+            << tapeStatistic.tapeStatus.vid << " disabled=" << tapeStatistic.tapeStatus.disabled << " filesQueued=" << tapeStatistic.stats.filesQueued <<  std::endl;
+  #endif //HELPERS_CACHE_UPDATE_LOGGING
+}
+
 }} // namespace cta::objectstore.
diff --git a/objectstore/Helpers.hpp b/objectstore/Helpers.hpp
index 635bfb75d9..c1c44ace29 100644
--- a/objectstore/Helpers.hpp
+++ b/objectstore/Helpers.hpp
@@ -28,6 +28,12 @@
 #include <string>
 #include <set>
 #include <future>
+#include <fstream>
+#include <syscall.h>
+
+//Activate or not helper cache update for debugging
+//#define HELPERS_CACHE_UPDATE_LOGGING
+#define HELPERS_CACHE_UPDATE_LOGGING_FILE "/var/tmp/cta-helpers-update-cache.log"
 
 /**
  * A collection of helper functions for commonly used multi-object operations
@@ -118,6 +124,7 @@ private:
   static std::map<std::string, RetrieveQueueStatisticsWithTime> g_retrieveQueueStatistics;
   /** Time between cache updates */
   static const time_t c_retrieveQueueCacheMaxAge = 10;
+  static void logUpdateCacheIfNeeded(const bool entryCreation,const RetrieveQueueStatisticsWithTime& tapeStatistic, std::string message="");
   
 public:
   
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index b1b057272d..3e688fa8be 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -2188,8 +2188,9 @@ void OStoreDB::RepackRequest::setLastExpandedFSeq(uint64_t fseq){
 //------------------------------------------------------------------------------
 // OStoreDB::RepackRequest::addSubrequests()
 //------------------------------------------------------------------------------
-void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>& repackSubrequests, cta::common::dataStructures::ArchiveRoute::FullMap& archiveRoutesMap, uint64_t maxFSeqLowBound, const uint64_t maxAddedFSeq, const cta::SchedulerDatabase::RepackRequest::TotalStatsFiles &totalStatsFiles, log::LogContext& lc) {
+uint64_t OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>& repackSubrequests, cta::common::dataStructures::ArchiveRoute::FullMap& archiveRoutesMap, uint64_t maxFSeqLowBound, const uint64_t maxAddedFSeq, const cta::SchedulerDatabase::RepackRequest::TotalStatsFiles &totalStatsFiles, log::LogContext& lc) {
   // We need to prepare retrieve requests names and reference them, create them, enqueue them.
+  uint64_t nbRetrieveSubrequestsCreated = 0;
   objectstore::ScopedExclusiveLock rrl (m_repackRequest);
   m_repackRequest.fetch();
   std::set<uint64_t> fSeqs;
@@ -2421,12 +2422,14 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
       is.request->fetch();
       sorter.insertRetrieveRequest(is.request, *m_oStoreDB.m_agentReference, is.activeCopyNb, lc);
     }
+    nbRetrieveSubrequestsCreated = sorter.getAllRetrieve().size();
     locks.clear();
     sorter.flushAll(lc);
   }
   
   m_repackRequest.setLastExpandedFSeq(fSeq);
   m_repackRequest.commit();
+  return nbRetrieveSubrequestsCreated;
 }
 
 //------------------------------------------------------------------------------
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index eb86b0a074..5abf330050 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -344,7 +344,7 @@ public:
   public:
     RepackRequest(const std::string &jobAddress, OStoreDB &oStoreDB) :
     m_oStoreDB(oStoreDB), m_repackRequest(jobAddress, m_oStoreDB.m_objectStore){}
-    void addSubrequestsAndUpdateStats(std::list<Subrequest>& repackSubrequests, cta::common::dataStructures::ArchiveRoute::FullMap& archiveRoutesMap,
+    uint64_t addSubrequestsAndUpdateStats(std::list<Subrequest>& repackSubrequests, cta::common::dataStructures::ArchiveRoute::FullMap& archiveRoutesMap,
       uint64_t maxFSeqLowBound, const uint64_t maxAddedFSeq, const TotalStatsFiles &totalStatsFiles,  log::LogContext& lc) override;
     void expandDone() override;
     void fail() override;
diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp
index 62a65d0e70..c2dc9c282c 100644
--- a/scheduler/Scheduler.cpp
+++ b/scheduler/Scheduler.cpp
@@ -478,7 +478,8 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
     cta::disk::DirectoryFactory dirFactory;
     dir.reset(dirFactory.createDirectory(dirBufferURL.str()));
     if(dir->exist()){
-      filesInDirectory = dir->getFilesName();
+      //TODO : Repack tape repair workflow
+      //filesInDirectory = dir->getFilesName();
     } else {
       dir->mkdir();
     }
@@ -491,6 +492,8 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
     storageClasses = m_catalogue.getStorageClasses();
   
   repackRequest->m_dbReq->setExpandStartedAndChangeStatus();
+  uint64_t nbRetrieveSubrequestsQueued = 0;
+  
   while(archiveFilesForCatalogue.hasMore() && !stopExpansion) {
     size_t filesCount = 0;
     uint64_t maxAddedFSeq = 0;
@@ -621,7 +624,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
     // We pass this information to the db for recording in the repack request. This will allow restarting from the right
     // value in case of crash.
     try{
-      repackRequest->m_dbReq->addSubrequestsAndUpdateStats(retrieveSubrequests, archiveRoutesMap, fSeq, maxAddedFSeq, totalStatsFile, lc);
+      nbRetrieveSubrequestsQueued = repackRequest->m_dbReq->addSubrequestsAndUpdateStats(retrieveSubrequests, archiveRoutesMap, fSeq, maxAddedFSeq, totalStatsFile, lc);
     } catch(const cta::ExpandRepackRequestException& e){
       deleteRepackBuffer(std::move(dir));
       throw e;
@@ -645,7 +648,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
       lc.log(log::INFO,"Expansion time reached, Repack Request requeued in ToExpand queue.");
     }
   } else {
-    if(totalStatsFile.totalFilesToRetrieve == 0){
+    if(totalStatsFile.totalFilesToRetrieve == 0 || nbRetrieveSubrequestsQueued == 0){
       //If no files have been retrieve, the repack buffer will have to be deleted
       deleteRepackBuffer(std::move(dir));      
     }
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index 3f77730554..f757533610 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -452,7 +452,11 @@ public:
       //TODO : userprovidedfiles and userprovidedbytes
     };
     
-    virtual void addSubrequestsAndUpdateStats(std::list<Subrequest>& repackSubrequests, 
+    /**
+     * Add Retrieve subrequests to the repack request and update its statistics
+     * @return the number of retrieve subrequests queued
+     */
+    virtual uint64_t addSubrequestsAndUpdateStats(std::list<Subrequest>& repackSubrequests, 
       cta::common::dataStructures::ArchiveRoute::FullMap & archiveRoutesMap, uint64_t maxFSeqLowBound, const uint64_t maxAddedFSeq, const TotalStatsFiles &totalStatsFiles, log::LogContext & lc) = 0;
     virtual void expandDone() = 0;
     virtual void fail() = 0;
-- 
GitLab