From 9bddb0a988e3c5e9c903fcbdbd8296e81b31ceb4 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Fri, 22 Mar 2019 01:36:48 +0100
Subject: [PATCH] Implemented the archive success reporting for repack.

This included extracting missing data while popping elements from archive queue.
---
 objectstore/ArchiveQueueAlgorithms.hpp        |   4 +-
 .../ArchiveQueueToTransferAlgorithms.cpp      |   2 +-
 objectstore/ArchiveRequest.cpp                |  11 +-
 objectstore/ArchiveRequest.hpp                |   4 +-
 objectstore/RepackRequest.cpp                 |  11 ++
 scheduler/OStoreDB/OStoreDB.cpp               | 134 +++++++++++++++++-
 scheduler/OStoreDB/OStoreDB.hpp               |   5 +
 scheduler/SchedulerTest.cpp                   |   3 +-
 8 files changed, 160 insertions(+), 14 deletions(-)

diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp
index 5288959621..2cb093f97a 100644
--- a/objectstore/ArchiveQueueAlgorithms.hpp
+++ b/objectstore/ArchiveQueueAlgorithms.hpp
@@ -57,6 +57,7 @@ struct ContainerTraits<ArchiveQueue,C>
     std::string latestError;
     SchedulerDatabase::ArchiveJob::ReportType reportType;
     ArchiveRequest::RepackInfo repackInfo;
+    std::map<uint32_t, serializers::ArchiveJobStatus> archiveJobsStatusMap;
   };
   struct PoppedElementsSummary;
   struct PopCriteria {
@@ -408,7 +409,8 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container
       e->errorReportURL = u->get()->getArchiveErrorReportURL();
       e->srcURL = u->get()->getSrcURL();
       e->repackInfo = u->get()->getRepackInfo();
-      switch(u->get()->getJobStatus()) {
+      e->archiveJobsStatusMap = u->get()->getJobsStatusMap();
+      switch(e->archiveJobsStatusMap[e->copyNb]) {
         case serializers::ArchiveJobStatus::AJS_ToReportToUserForTransfer:
           e->reportType = SchedulerDatabase::ArchiveJob::ReportType::CompletionReport;
           break;
diff --git a/objectstore/ArchiveQueueToTransferAlgorithms.cpp b/objectstore/ArchiveQueueToTransferAlgorithms.cpp
index 4d9ea91c70..378d853eb8 100644
--- a/objectstore/ArchiveQueueToTransferAlgorithms.cpp
+++ b/objectstore/ArchiveQueueToTransferAlgorithms.cpp
@@ -47,7 +47,7 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria,
   for (auto &cjfq: candidateJobsFromQueue.candidates) {
     ret.elements.emplace_back(PoppedElement{cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore), cjfq.copyNb, cjfq.size,
     common::dataStructures::ArchiveFile(), "", "", "", "", SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired,
-    ArchiveRequest::RepackInfo()});
+    ArchiveRequest::RepackInfo(), {}});
     ret.summary.bytes += cjfq.size;
     ret.summary.files++;
   }
diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp
index b7be789eb1..e2c88afc36 100644
--- a/objectstore/ArchiveRequest.cpp
+++ b/objectstore/ArchiveRequest.cpp
@@ -565,7 +565,10 @@ ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint32
             if (j->failurelogs_size()) {
               retRef.m_latestError = j->failurelogs(j->failurelogs_size()-1);
             }
-            retRef.m_jobStatus = j->status();
+            for (auto &j2: payload.jobs()) {
+              // Get all jobs statuses.
+              retRef.m_jobsStatusMap[j2.copynb()] = j2.status();
+            }
             oh.set_payload(payload.SerializePartialAsString());
             retRef.m_timingReport.processTime = retRef.m_timer.secs(utils::Timer::resetCounter);
             return oh.SerializeAsString();
@@ -622,10 +625,10 @@ const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() {
 }
 
 //------------------------------------------------------------------------------
-// ArchiveRequest::AsyncJobOwnerUpdater::getJobStatus()
+// ArchiveRequest::AsyncJobOwnerUpdater::getArchiveJobsStatusMap()
 //------------------------------------------------------------------------------
-objectstore::serializers::ArchiveJobStatus ArchiveRequest::AsyncJobOwnerUpdater::getJobStatus() {
-  return m_jobStatus;
+std::map<uint32_t, serializers::ArchiveJobStatus> ArchiveRequest::AsyncJobOwnerUpdater::getJobsStatusMap(){
+  return m_jobsStatusMap;
 }
 
 //------------------------------------------------------------------------------
diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp
index c686502963..eb6d503b1b 100644
--- a/objectstore/ArchiveRequest.hpp
+++ b/objectstore/ArchiveRequest.hpp
@@ -121,8 +121,8 @@ public:
     const std::string & getArchiveReportURL();
     const std::string & getArchiveErrorReportURL();
     const std::string & getLastestError();
-    serializers::ArchiveJobStatus getJobStatus();
     RepackInfo getRepackInfo();
+    std::map<uint32_t, serializers::ArchiveJobStatus> getJobsStatusMap();
     // TODO: use the more general structure from utils.
     struct TimingsReport {
       double lockFetchTime = 0;
@@ -137,8 +137,8 @@ public:
     std::string m_srcURL;
     std::string m_archiveReportURL;
     std::string m_archiveErrorReportURL;
-    serializers::ArchiveJobStatus m_jobStatus;
     RepackInfo m_repackInfo;
+    std::map<uint32_t, serializers::ArchiveJobStatus> m_jobsStatusMap;
     std::string m_latestError;
     utils::Timer m_timer;
     TimingsReport m_timingReport;
diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp
index c0448e8f7a..0558e2448d 100644
--- a/objectstore/RepackRequest.cpp
+++ b/objectstore/RepackRequest.cpp
@@ -306,6 +306,14 @@ void RepackRequest::reportArchiveSuccesses(SubrequestStatistics::List& archiveSu
     }
   }
   if (didUpdate) {
+    // Check whether we reached the end.
+    if (m_payload.archivedfiles() + m_payload.failedtoarchivefiles() >= m_payload.totalfilestoarchive()) {
+      if (m_payload.failedtoarchivefiles()) {
+        m_payload.set_status(serializers::RepackRequestStatus::RRS_Failed);
+      } else {
+        m_payload.set_status(serializers::RepackRequestStatus::RRS_Failed);
+      }
+    }
     m_payload.mutable_subrequests()->Clear();
     for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
   }
@@ -334,6 +342,9 @@ void RepackRequest::reportArchiveFailures(SubrequestStatistics::List& archiveFai
     }
   }
   if (didUpdate) {
+    // Check whether we reached the end.
+    if (m_payload.archivedfiles() + m_payload.failedtoarchivefiles() >= m_payload.totalfilestoarchive())
+      m_payload.set_status(serializers::RepackRequestStatus::RRS_Failed);
     m_payload.mutable_subrequests()->Clear();
     for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add());
   }
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index 3f930dda64..066688abde 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -1697,6 +1697,8 @@ std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextSuccessfu
       privateRet->m_subrequestList.emplace_back(RepackArchiveSuccessesReportBatch::SubrequestInfo());
       auto & sr = privateRet->m_subrequestList.back();
       sr.repackInfo = j.repackInfo;
+      sr.archivedCopyNb = j.copyNb;
+      sr.archiveJobsStatusMap = j.archiveJobsStatusMap;
       sr.archiveFile = j.archiveFile;
       sr.subrequest.reset(j.archiveRequest.release());
       repackRequestAddresses.insert(j.repackInfo.repackRequestAddress);
@@ -1878,7 +1880,6 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
           log::ScopedParamContainer params(lc);
           params.add("fileId", adar.subrequestInfo.archiveFile.archiveFileID)
                 .add("subrequestAddress", adar.subrequestInfo.subrequest->getAddressIfSet());
-          timingList.addToLog(params);
           lc.log(log::INFO, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): deleted retrieve request after failure to transform in archive request.");
         } catch (cta::exception::Exception & ex) {
           // Log the failure to delete.
@@ -1894,7 +1895,6 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
       timingList.insertAndReset("removeDeletedRetrieveFromOwnershipTime", t);
       log::ScopedParamContainer params(lc);
       params.add("agentAddress",m_oStoreDb.m_agentReference->getAgentAddress());
-      timingList.addToLog(params);
       lc.log(log::INFO,"In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): successfully removed retrieve requests from the agent's ownership.");
     }
   }
@@ -1912,6 +1912,10 @@ void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
     locks.clear();
     sorter.flushAll(lc);
   }
+  timingList.insertAndReset("archiveRequestsQueueingTime", t);
+  log::ScopedParamContainer params(lc);
+  timingList.addToLog(params);
+  lc.log(log::INFO,"In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): Processed a batch of reports.");
 }
 
 //------------------------------------------------------------------------------
@@ -3860,8 +3864,130 @@ objectstore::ArchiveRequest::RepackInfo OStoreDB::ArchiveJob::getRepackInfoAfter
 // OStoreDB::RepackArchiveSuccessesReportBatch::report()
 //------------------------------------------------------------------------------
 void OStoreDB::RepackArchiveSuccessesReportBatch::report(log::LogContext& lc) {
-  //TODO : Do the reporting of RepackArchiveSuccessesReportBatch
-  throw 1;
+  // We have a batch of popped jobs to report. We will first record them in the repack requests (update statistics),
+  // and then either mark them as complete (if any sibling jobs will still require processing) or
+  // simply remove the request.
+  // Repack request will be filpped from running to successsful (or failed) if we process the last job.
+  utils::Timer t;
+  log::TimingList timingList;
+  
+  // 1) Update statistics. As the repack request is protected against double reporting, we can release its lock
+  // before the next (deletions).
+  {
+    // Prepare the report
+    objectstore::RepackRequest::SubrequestStatistics::List ssl;
+    for (auto &sri: m_subrequestList) {
+      ssl.push_back(objectstore::RepackRequest::SubrequestStatistics());
+      ssl.back().bytes = sri.archiveFile.fileSize;
+      ssl.back().files = 1;
+      ssl.back().fSeq = sri.repackInfo.fSeq;
+      ssl.back().copyNb = sri.archivedCopyNb;
+    }
+    // Record it.
+    timingList.insertAndReset("successStatsPrepareTime", t);
+    objectstore::ScopedExclusiveLock rrl(m_repackRequest);
+    timingList.insertAndReset("successStatsLockTime", t);
+    m_repackRequest.fetch();
+    timingList.insertAndReset("successStatsFetchTime", t);
+    m_repackRequest.reportArchiveSuccesses(ssl);
+    timingList.insertAndReset("successStatsUpdateTime", t);
+    m_repackRequest.commit();
+    timingList.insertAndReset("successStatsCommitTime", t);
+  }
+  
+  // 2) For each job, determine if sibling jobs are complete or not. If so, delete, else just update status and set empty owner.
+  struct Deleters {
+    std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter> deleter;
+    RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> & subrequestInfo;
+    typedef std::list<Deleters> List;
+  };
+  struct JobOwnerUpdaters {
+    std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> jobOwnerUpdater;
+    RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> & subrequestInfo;
+    typedef std::list<JobOwnerUpdaters> List;
+  };
+  Deleters::List deletersList;
+  JobOwnerUpdaters::List jobOwnerUpdatersList;
+  for (auto &sri: m_subrequestList) {
+    bool moreJobsToDo = false;
+    for (auto &j: sri.archiveJobsStatusMap) {
+      if ((j.first != sri.archivedCopyNb) && 
+          (j.second != serializers::ArchiveJobStatus::AJS_Complete) && 
+          (j.second != serializers::ArchiveJobStatus::AJS_Failed)) {
+        moreJobsToDo = true;
+        break;
+      }
+    }
+    objectstore::ArchiveRequest & ar = *sri.subrequest;
+    if (moreJobsToDo) {
+      try {
+        jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> (
+              ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(),
+              serializers::ArchiveJobStatus::AJS_Complete)), 
+            sri});
+      } catch (cta::exception::Exception & ex) {
+        // Log the error
+        log::ScopedParamContainer params(lc);
+        params.add("fileId", sri.archiveFile.archiveFileID)
+              .add("subrequestAddress", sri.subrequest->getAddressIfSet())
+              .add("exceptionMsg", ex.getMessageValue());
+        lc.log(log::ERR, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): failed to asyncUpdateJobOwner()");
+      }
+    } else {
+      try {
+        deletersList.push_back({std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter>(ar.asyncDeleteRequest()), sri});
+      } catch (cta::exception::Exception & ex) {
+        // Log the error
+        log::ScopedParamContainer params(lc);
+        params.add("fileId", sri.archiveFile.archiveFileID)
+              .add("subrequestAddress", sri.subrequest->getAddressIfSet())
+              .add("exceptionMsg", ex.getMessageValue());
+        lc.log(log::ERR, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): failed to asyncDelete()");
+      }
+    }
+  }
+  timingList.insertAndReset("asyncUpdateOrDeleteLaunchTime", t);
+  for (auto & d: deletersList) {
+    try {
+      d.deleter->wait();
+      log::ScopedParamContainer params(lc);
+        params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID)
+              .add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet());
+        lc.log(log::INFO, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): deleted request.");
+    } catch (cta::exception::Exception & ex) {
+        // Log the error
+        log::ScopedParamContainer params(lc);
+        params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID)
+              .add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet())
+              .add("exceptionMsg", ex.getMessageValue());
+        lc.log(log::ERR, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): async deletion failed.");
+    }
+  }
+  for (auto & jou: jobOwnerUpdatersList) {
+    try {
+      jou.jobOwnerUpdater->wait();
+      log::ScopedParamContainer params(lc);
+      params.add("fileId", jou.subrequestInfo.archiveFile.archiveFileID)
+            .add("subrequestAddress", jou.subrequestInfo.subrequest->getAddressIfSet());
+      lc.log(log::INFO, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): async updated job.");
+    } catch (cta::exception::Exception & ex) {
+      // Log the error
+      log::ScopedParamContainer params(lc);
+      params.add("fileId", jou.subrequestInfo.archiveFile.archiveFileID)
+            .add("subrequestAddress", jou.subrequestInfo.subrequest->getAddressIfSet())
+            .add("exceptionMsg", ex.getMessageValue());
+      lc.log(log::ERR, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): async job update.");
+    }    
+  }
+  timingList.insertAndReset("asyncUpdateOrDeleteCompletionTime", t);
+  // 3) Just remove all jobs from ownership
+  std::list<std::string> jobsToUnown;
+  for (auto sri: m_subrequestList) jobsToUnown.push_back(sri.subrequest->getAddressIfSet());
+  m_oStoreDb.m_agentReference->removeBatchFromOwnership(jobsToUnown, m_oStoreDb.m_objectStore);
+  timingList.insertAndReset("ownershipRemoval", t);
+  log::ScopedParamContainer params(lc);
+  timingList.addToLog(params);
+  lc.log(log::INFO, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): reported a batch of jobs.");
 }
 
 //------------------------------------------------------------------------------
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index 7ddf585977..518e0fde11 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -419,6 +419,11 @@ public:
     struct SubrequestInfo {
       /// CopyNb is only useful for archive requests where we want to distinguish several jobs.
       uint32_t archivedCopyNb = 0;
+      /** Status map is only useful for archive requests, where we need to know other job's status to decide
+       * whether we should delete the request (all done). It's more efficient to get the information on pop
+       * in order to save a read in the most common case (only one job), and trigger immediate deletion of
+       * the request after succeeding/failing. */
+      std::map<uint32_t, objectstore::serializers::ArchiveJobStatus> archiveJobsStatusMap;
       std::shared_ptr<SR> subrequest;
       common::dataStructures::ArchiveFile archiveFile;
       typename SR::RepackInfo repackInfo;
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index ccda870be6..c7908bc1d0 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -2131,8 +2131,7 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) {
     {
       //Do the reporting of the Archive Jobs succeeded
       Scheduler::RepackReportBatch reports = scheduler.getNextRepackReportBatch(lc);
-      //TODO : uncomment and do the reporting
-      //reports.report(lc);
+      reports.report(lc);
     }
   }
 }
-- 
GitLab