From f5584da65cbdc633a4101dbd88cfdb487680c791 Mon Sep 17 00:00:00 2001
From: Cedric CAFFY <cedric.caffy@hotmail.fr>
Date: Tue, 19 Mar 2019 11:32:28 -0400
Subject: [PATCH] Full support of failed RetrieveRequests from Repack -
 Queueing into the RetrieveQueueToReportToRepackForFailure - Reporting of the
 failed RetrieveJobs (add statistics + deletion of the failed sub requests)
 Unit test associated : expandRepackRequestFailedRetrieve

---
 objectstore/RepackRequest.cpp   |  12 +--
 scheduler/OStoreDB/OStoreDB.cpp | 146 +++++++++++++++++++++++++++++++-
 scheduler/OStoreDB/OStoreDB.hpp |  11 +++
 scheduler/SchedulerTest.cpp     |  28 +++++-
 4 files changed, 185 insertions(+), 12 deletions(-)

diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp
index 449e7798fc..c0448e8f7a 100644
--- a/objectstore/RepackRequest.cpp
+++ b/objectstore/RepackRequest.cpp
@@ -251,7 +251,7 @@ void RepackRequest::reportRetriveSuccesses(SubrequestStatistics::List& retrieveS
   }
   if (didUpdate) {
     m_payload.mutable_subrequests()->Clear();
-    for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
+    for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
   }
 }
 
@@ -279,7 +279,7 @@ void RepackRequest::reportRetriveFailures(SubrequestStatistics::List& retrieveFa
   }
   if (didUpdate) {
     m_payload.mutable_subrequests()->Clear();
-    for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
+    for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
   }
 }
 
@@ -307,7 +307,7 @@ void RepackRequest::reportArchiveSuccesses(SubrequestStatistics::List& archiveSu
   }
   if (didUpdate) {
     m_payload.mutable_subrequests()->Clear();
-    for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
+    for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
   }
 }
 
@@ -330,12 +330,12 @@ void RepackRequest::reportArchiveFailures(SubrequestStatistics::List& archiveFai
         didUpdate = true;
       }
     } catch (std::out_of_range &) {
-      throw exception::Exception("In RepackRequest::reportRetriveFailures(): got a report for unknown fSeq");
+      throw exception::Exception("In RepackRequest::reportArchiveFailures(): got a report for unknown fSeq");
     }
   }
   if (didUpdate) {
     m_payload.mutable_subrequests()->Clear();
-    for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
+    for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
   }
 }
 
@@ -361,7 +361,7 @@ void RepackRequest::reportSubRequestsForDeletion(std::list<uint64_t>& fSeqs) {
   }
   if (didUpdate) {
     m_payload.mutable_subrequests()->Clear();
-    for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add());
+    for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
   }
 }
 
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index 35ad70b4ad..96093dd530 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -1526,6 +1526,9 @@ std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextRepackRep
   try {
     return getNextSuccessfulRetrieveRepackReportBatch(lc);
   } catch (NoRepackReportBatchFound &) {}
+  try{
+    return getNextFailedRetrieveRepackReportBatch(lc);
+  } catch(const NoRepackReportBatchFound &) {}
   try {
     return getNextSuccessfulArchiveRepackReportBatch(lc);
   } catch (NoRepackReportBatchFound &) {}
@@ -1576,6 +1579,50 @@ std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextSuccessfu
   throw NoRepackReportBatchFound("In OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(): no report found.");
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::getNextFailedRetrieveRepackReportBatch()
+//------------------------------------------------------------------------------
+std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextFailedRetrieveRepackReportBatch(log::LogContext &lc){
+  typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForFailure> CaRqtrtrff;
+  CaRqtrtrff algo(this->m_objectStore,*m_agentReference);
+  // Decide from which queue we are going to pop.
+  RootEntry re(m_objectStore);
+  re.fetchNoLock();
+  while(true) {
+    auto queueList = re.dumpRetrieveQueues(JobQueueType::JobsToReportToRepackForFailure);
+    if (queueList.empty()) throw NoRepackReportBatchFound("In OStoreDB::getNextFailedRetrieveRepackReportBatch(): no queue found.");
+
+    // Try to get jobs from the first queue. If it is empty, it will be trimmed, so we can go for another round.
+    CaRqtrtrff::PopCriteria criteria;
+    criteria.files = c_repackReportBatchSize;
+    auto jobs = algo.popNextBatch(queueList.front().vid, criteria, lc);
+    if(jobs.elements.empty()) continue;
+    std::unique_ptr<RepackRetrieveFailureReportBatch> privateRet;
+    privateRet.reset(new RepackRetrieveFailureReportBatch(m_objectStore, *this));
+    std::set<std::string> repackRequestAddresses;
+    for(auto &j : jobs.elements)
+    {
+      privateRet->m_subrequestList.emplace_back(RepackRetrieveFailureReportBatch::SubrequestInfo());
+      auto & sr = privateRet->m_subrequestList.back();
+      sr.repackInfo = j.repackInfo;
+      sr.archiveFile = j.archiveFile;
+      sr.subrequest.reset(j.retrieveRequest.release());
+      repackRequestAddresses.insert(j.repackInfo.repackRequestAddress);
+    }
+    // As we are popping from a single report queue, all requests should concern only one repack request.
+    if (repackRequestAddresses.size() != 1) {
+      std::stringstream err;
+      err << "In OStoreDB::getNextFailedRetrieveRepackReportBatch(): reports for several repack requests in the same queue. ";
+      for (auto & rr: repackRequestAddresses) { err << rr << " "; }
+      throw exception::Exception(err.str());
+    }
+    privateRet->m_repackRequest.setAddress(*repackRequestAddresses.begin());
+    
+    return std::unique_ptr<SchedulerDatabase::RepackReportBatch>(privateRet.release());
+  }
+  throw NoRepackReportBatchFound("In OStoreDB::getNextFailedRetrieveRepackReportBatch(): no report found.");
+}
+
 //------------------------------------------------------------------------------
 // OStoreDB::getNextSuccessfulArchiveRepackReportBatch()
 //------------------------------------------------------------------------------
@@ -1707,6 +1754,7 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
         log::ScopedParamContainer params(lc);
         params.add("fileId", atar.subrequestInfo.archiveFile.archiveFileID)
               .add("subrequestAddress", atar.subrequestInfo.subrequest->getAddressIfSet());
+        timingList.addToLog(params);
         lc.log(log::INFO, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(), turned successful retrieve request in archive request.");
         successfullyTransformedSubrequests.push_back(SuccessfullyTranformedRequest{
           std::make_shared<objectstore::ArchiveRequest>(
@@ -1782,19 +1830,24 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
           log::ScopedParamContainer params(lc);
           params.add("fileId", adar.subrequestInfo.archiveFile.archiveFileID)
                 .add("subrequestAddress", adar.subrequestInfo.subrequest->getAddressIfSet());
+          timingList.addToLog(params);
           lc.log(log::INFO, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): deleted retrieve request after failure to transform in archive request.");
         } catch (cta::exception::Exception & ex) {
           // Log the failure to delete.
           log::ScopedParamContainer params(lc);
           params.add("fileId", adar.subrequestInfo.archiveFile.archiveFileID)
                 .add("subrequestAddress", adar.subrequestInfo.subrequest->getAddressIfSet())
-                .add("excepitonMsg", ex.getMessageValue());
+                .add("exceptionMsg", ex.getMessageValue());
           lc.log(log::ERR, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): async deletion of retrieve request failed on wait().");
         }
       }
       timingList.insertAndReset("asyncDeleteRetrieveWaitTime", t);
       m_oStoreDb.m_agentReference->removeBatchFromOwnership(retrieveRequestsToUnown, m_oStoreDb.m_objectStore);
       timingList.insertAndReset("removeDeletedRetrieveFromOwnershipTime", t);
+      log::ScopedParamContainer params(lc);
+      params.add("agentAddress",m_oStoreDb.m_agentReference->getAgentAddress());
+      timingList.addToLog(params);
+      lc.log(log::INFO,"In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): successfully removed retrieve requests from the agent's ownership.");
     }
   }
   
@@ -1813,6 +1866,95 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
   }
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::RepackRetrieveFailureReportBatch()
+//------------------------------------------------------------------------------
+void OStoreDB::RepackRetrieveFailureReportBatch::report(log::LogContext& lc){
+  // We have a batch of popped failed Retrieve requests to report. We will first record them in the repack requests (update statistics),
+  // and then erase the request from the objectstore
+  utils::Timer t;
+  log::TimingList timingList;
+  
+  std::list<uint64_t> fSeqsToDelete;
+  // 1) Update statistics. As the repack request is protected against double reporting, we can release its lock
+  // before the next step.
+  {
+    // Prepare the report
+    objectstore::RepackRequest::SubrequestStatistics::List ssl;
+    for (auto &rr: m_subrequestList) {
+      ssl.push_back(objectstore::RepackRequest::SubrequestStatistics());
+      ssl.back().bytes = rr.archiveFile.fileSize;
+      ssl.back().files = 1;
+      ssl.back().fSeq = rr.repackInfo.fSeq;
+      fSeqsToDelete.push_back(rr.repackInfo.fSeq);
+    }
+    // Record it.
+    timingList.insertAndReset("failureStatsPrepareTime", t);
+    objectstore::ScopedExclusiveLock rrl(m_repackRequest);
+    timingList.insertAndReset("failureStatsLockTime", t);
+    m_repackRequest.fetch();
+    timingList.insertAndReset("failureStatsFetchTime", t);
+    m_repackRequest.reportSubRequestsForDeletion(fSeqsToDelete);
+    timingList.insertAndReset("failureStatsReportSubRequestsForDeletionTime", t);
+    m_repackRequest.reportRetriveFailures(ssl);
+    timingList.insertAndReset("failureStatsUpdateTime", t);
+    m_repackRequest.commit();
+    timingList.insertAndReset("failureStatsCommitTime", t);
+    
+    //Delete all the failed RetrieveRequests
+    struct AsyncDeleteAndReq {
+      SubrequestInfo & subrequestInfo;
+      std::unique_ptr<RetrieveRequest::AsyncJobDeleter> deleter;
+    };
+    //List of requests to remove from ownership
+    std::list<std::string> retrieveRequestsToUnown;
+    //List of the deleters of the subrequests
+    std::list<AsyncDeleteAndReq> asyncDeleterAndReqs;
+    
+    for(auto& fs: m_subrequestList){
+      retrieveRequestsToUnown.push_back(fs.subrequest->getAddressIfSet());
+      try{
+        asyncDeleterAndReqs.push_back({fs,std::unique_ptr<RetrieveRequest::AsyncJobDeleter>(fs.subrequest->asyncDeleteJob())});
+      } catch (cta::exception::Exception &ex) {
+        // Log the failure to delete.
+        log::ScopedParamContainer params(lc);
+        params.add("fileId", fs.archiveFile.archiveFileID)
+              .add("subrequestAddress", fs.subrequest->getAddressIfSet())
+              .add("exceptionMsg", ex.getMessageValue());
+        lc.log(log::ERR, "In OStoreDB::RepackRetrieveFailureReportBatch::report(): failed to asyncDelete() retrieve request.");
+      }
+    }
+    
+    timingList.insertAndReset("asyncDeleteRetrieveLaunchTime", t);
+    for (auto &adar: asyncDeleterAndReqs) {
+      try {
+        adar.deleter->wait();
+        // Log the deletion
+        log::ScopedParamContainer params(lc);
+        params.add("fileId", adar.subrequestInfo.archiveFile.archiveFileID)
+              .add("subrequestAddress", adar.subrequestInfo.subrequest->getAddressIfSet());
+        timingList.addToLog(params);
+        lc.log(log::INFO, "In OStoreDB::RepackRetrieveFailureReportBatch::report(): deleted retrieve request after multiple failures");
+        timingList.clear();
+      } catch (cta::exception::Exception & ex) {
+        // Log the failure to delete.
+        log::ScopedParamContainer params(lc);
+        params.add("fileId", adar.subrequestInfo.archiveFile.archiveFileID)
+              .add("subrequestAddress", adar.subrequestInfo.subrequest->getAddressIfSet())
+              .add("exceptionMsg", ex.getMessageValue());
+        lc.log(log::ERR, "In OStoreDB::RepackRetrieveFailureReportBatch::report(): async deletion of retrieve request failed on wait().");
+      }
+    }
+    timingList.insertAndReset("asyncDeleteRetrieveWaitTime", t);
+    m_oStoreDb.m_agentReference->removeBatchFromOwnership(retrieveRequestsToUnown, m_oStoreDb.m_objectStore);
+    timingList.insertAndReset("removeDeletedRetrieveFromOwnershipTime", t);
+    log::ScopedParamContainer params(lc);
+    timingList.addToLog(params);
+    params.add("agentAddress",m_oStoreDb.m_agentReference->getAgentAddress());
+    lc.log(log::INFO,"In OStoreDB::RepackRetrieveFailureReportBatch::report(): successfully removed retrieve requests from the agent's ownership.");
+  }
+}
+
 //------------------------------------------------------------------------------
 // OStoreDB::RepackRequest::getLastExpandedFSeq()
 //------------------------------------------------------------------------------
@@ -3172,7 +3314,7 @@ void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerD
       }
     }
   }
-  // 2) Queue the repack requests for repack.
+  // 2) Queue the retrieve requests for repack.
   for (auto & repackRequestQueue: jobsToRequeueForRepackMap) {
     typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess> RQTRTRFSAlgo;
     RQTRTRFSAlgo::InsertedElement::list insertedRequests;
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index 244fda8631..570f9796bb 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -434,6 +434,16 @@ public:
     SubrequestInfo::List m_subrequestList;
   };
   
+  class RepackRetrieveFailureReportBatch: public RepackReportBatch{
+    friend class OStoreDB;
+    RepackRetrieveFailureReportBatch(objectstore::Backend& backend, OStoreDB &oStoreDb):RepackReportBatch(backend,oStoreDb){}
+  public:
+    void report(log::LogContext& lc) override;
+  private:
+    typedef RepackReportBatch::SubrequestInfo<objectstore::RetrieveRequest> SubrequestInfo;
+    SubrequestInfo::List m_subrequestList;
+  };
+  
   class RepackArchiveSuccessesReportBatch: public RepackReportBatch {
     friend class OStoreDB;
     RepackArchiveSuccessesReportBatch(objectstore::Backend & backend, OStoreDB & oStoreDb):
@@ -450,6 +460,7 @@ private:
   CTA_GENERATE_EXCEPTION_CLASS(NoRepackReportBatchFound);
   const size_t c_repackReportBatchSize = 500;
   std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextSuccessfulRetrieveRepackReportBatch(log::LogContext& lc);
+  std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextFailedRetrieveRepackReportBatch(log::LogContext& lc);
   std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextSuccessfulArchiveRepackReportBatch(log::LogContext& lc);
 public:
 
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index 89675c4283..45a45b92ce 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -1686,7 +1686,7 @@ TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) {
   cta::objectstore::Backend& backend = schedulerDB.getBackend();
   setupDefaultCatalogue();
   
-    
+
 #ifdef STDOUT_LOGGING
   log::StdoutLogger dl("dummy", "unitTest");
 #else
@@ -1825,7 +1825,7 @@ TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) {
       it++;
     }
     std::unique_ptr<cta::RetrieveJob> failedJobUniqPtr = std::move(*(executedJobs.begin()));
-    rrp.reportFailedJob(std::move(failedJobUniqPtr),cta::exception::Exception("FailedJob"));
+    rrp.reportFailedJob(std::move(failedJobUniqPtr),cta::exception::Exception("FailedJob expandRepackRequestFailedRetrieve"));
    
     rrp.setDiskDone();
     rrp.setTapeDone();
@@ -1860,7 +1860,7 @@ TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) {
 
         rrp.startThreads();
         
-        rrp.reportFailedJob(std::move(retrieveJob),cta::exception::Exception("FailedJob"));
+        rrp.reportFailedJob(std::move(retrieveJob),cta::exception::Exception("FailedJob for unit test expandRepackRequestFailedRetrieve"));
         
         rrp.setDiskDone();
         rrp.setTapeDone();
@@ -1878,7 +1878,7 @@ TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) {
         cta::objectstore::ScopedExclusiveLock sel(re);
         re.fetch();
 
-        //Get the retrieveQueueToReportToRepackForSuccess
+        //Get the retrieveQueueToReportToRepackForFailure
         // The queue is named after the repack request: we need to query the repack index
         objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend());
         ri.fetchNoLock();
@@ -1896,6 +1896,26 @@ TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) {
           ASSERT_EQ(archiveFileSize,job.size);
         }
       }
+      
+      {
+        Scheduler::RepackReportBatch reports = scheduler.getNextRepackReportBatch(lc);
+        reports.report(lc);
+        reports = scheduler.getNextRepackReportBatch(lc);
+        reports.report(lc);
+      }
+      {
+        //After the reporting, the RetrieveQueueToReportToRepackForFailure should not exist anymore.
+        cta::objectstore::RootEntry re(backend);
+        cta::objectstore::ScopedExclusiveLock sel(re);
+        re.fetch();
+
+        //Get the retrieveQueueToReportToRepackForFailure
+        // The queue is named after the repack request: we need to query the repack index
+        objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend());
+        ri.fetchNoLock();
+        
+        ASSERT_THROW(re.getRetrieveQueueAddress(ri.getRepackRequestAddress(vid),cta::objectstore::JobQueueType::JobsToReportToRepackForFailure),cta::exception::Exception);
+      }
     }
   }
 }
-- 
GitLab