From 0a998704d9c978c419c4a13c16448bae050c0030 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Fri, 8 Mar 2019 22:41:45 +0100
Subject: [PATCH] Partially implemented retrive reporting for repack.

Changed the reporting structures in the repack request to better handle repoting
(we will report one archive job per rearchived copy number, which is potentially
several per fSeq.

Created the getNextBatch() function for retrieve successes and partially implemented
the report function.
---
 objectstore/RepackRequest.cpp   |  54 ++++----
 objectstore/RepackRequest.hpp   |   5 +-
 objectstore/RetrieveRequest.cpp |   2 +
 objectstore/RetrieveRequest.hpp |   5 +-
 objectstore/cta.proto           |  13 +-
 scheduler/OStoreDB/OStoreDB.cpp | 218 ++++++++++++++++++++++++++++++++
 scheduler/OStoreDB/OStoreDB.hpp |  50 +++++++-
 7 files changed, 314 insertions(+), 33 deletions(-)

diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp
index f35f687df9..449e7798fc 100644
--- a/objectstore/RepackRequest.cpp
+++ b/objectstore/RepackRequest.cpp
@@ -29,6 +29,13 @@ namespace cta { namespace objectstore {
 RepackRequest::RepackRequest(const std::string& address, Backend& os):
   ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t> (os, address) { }
 
+//------------------------------------------------------------------------------
+// Constructor
+//------------------------------------------------------------------------------
+RepackRequest::RepackRequest(Backend& os):
+  ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t> (os) { }
+
+
 //------------------------------------------------------------------------------
 // RepackRequest::RepackRequest()
 //------------------------------------------------------------------------------
@@ -146,10 +153,10 @@ void RepackRequest::setBufferURL(const std::string& bufferURL) {
 void RepackRequest::RepackSubRequestPointer::serialize(serializers::RepackSubRequestPointer& rsrp) {
   rsrp.set_address(address);
   rsrp.set_fseq(fSeq);
-  rsrp.set_retrieveaccounted(retrieveAccounted);
-  rsrp.set_archiveaccounted(archiveAccounted);
-  rsrp.set_failureaccounted(failureAccounted);
-  rsrp.set_subrequestdeleted(subrequestDeleted);
+  rsrp.set_retrieve_accounted(retrieveAccounted);
+  rsrp.mutable_archive_copynb_accounted()->Clear();
+  for (auto cna: archiveCopyNbsAccounted) { rsrp.mutable_archive_copynb_accounted()->Add(cna); }
+  rsrp.set_subrequest_deleted(subrequestDeleted);
 }
 
 //------------------------------------------------------------------------------
@@ -158,10 +165,10 @@ void RepackRequest::RepackSubRequestPointer::serialize(serializers::RepackSubReq
 void RepackRequest::RepackSubRequestPointer::deserialize(const serializers::RepackSubRequestPointer& rsrp) {
   address = rsrp.address();
   fSeq = rsrp.fseq();
-  retrieveAccounted = rsrp.retrieveaccounted();
-  archiveAccounted = rsrp.archiveaccounted();
-  failureAccounted = rsrp.failureaccounted();
-  subrequestDeleted = rsrp.subrequestdeleted();
+  retrieveAccounted = rsrp.retrieve_accounted();
+  archiveCopyNbsAccounted.clear();
+  for (auto acna: rsrp.archive_copynb_accounted()) { archiveCopyNbsAccounted.insert(acna); }
+  subrequestDeleted = rsrp.subrequest_deleted();
 }
 
 //------------------------------------------------------------------------------
@@ -190,7 +197,8 @@ auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint64_t> fSeqs, AgentRe
       auto & p = pointerMap[fs];
       p.address = retInfo.address;
       p.fSeq = fs;
-      p.archiveAccounted = p.retrieveAccounted = p.failureAccounted = p.subrequestDeleted = false;
+      p.retrieveAccounted = p.subrequestDeleted = false;
+      p.archiveCopyNbsAccounted.clear();
       newElementCreated = true;
     }
     ret.emplace(retInfo);
@@ -256,13 +264,13 @@ void RepackRequest::reportRetriveFailures(SubrequestStatistics::List& retrieveFa
   // Read the map
   for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp);
   bool didUpdate = false;
-  for (auto & rs: retrieveFailures) {
+  for (auto & rf: retrieveFailures) {
     try {
-      auto & p = pointerMap.at(rs.fSeq);
-      if (!p.failureAccounted) {
-        p.failureAccounted = true;
-        m_payload.set_failedtoretrievebytes(m_payload.failedtoretrievebytes() + rs.bytes);
-        m_payload.set_failedtoretrievefiles(m_payload.failedtoretrievefiles() + rs.files);
+      auto & p = pointerMap.at(rf.fSeq);
+      if (!p.retrieveAccounted) {
+        p.retrieveAccounted = true;
+        m_payload.set_failedtoretrievebytes(m_payload.failedtoretrievebytes() + rf.bytes);
+        m_payload.set_failedtoretrievefiles(m_payload.failedtoretrievefiles() + rf.files);
         didUpdate = true;
       }
     } catch (std::out_of_range &) {
@@ -287,8 +295,8 @@ void RepackRequest::reportArchiveSuccesses(SubrequestStatistics::List& archiveSu
   for (auto & as: archiveSuccesses) {
     try {
       auto & p = pointerMap.at(as.fSeq);
-      if (!p.archiveAccounted) {
-        p.archiveAccounted = true;
+      if (!p.archiveCopyNbsAccounted.count(as.copyNb)) {
+        p.archiveCopyNbsAccounted.insert(as.copyNb);
         m_payload.set_archivedbytes(m_payload.archivedbytes() + as.bytes);
         m_payload.set_archivedfiles(m_payload.archivedfiles() + as.files);
         didUpdate = true;
@@ -312,13 +320,13 @@ void RepackRequest::reportArchiveFailures(SubrequestStatistics::List& archiveFai
   // Read the map
   for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp);
   bool didUpdate = false;
-  for (auto & rs: archiveFailures) {
+  for (auto & af: archiveFailures) {
     try {
-      auto & p = pointerMap.at(rs.fSeq);
-      if (!p.failureAccounted) {
-        p.failureAccounted = true;
-        m_payload.set_failedtoarchivebytes(m_payload.failedtoarchivebytes() + rs.bytes);
-        m_payload.set_failedtoarchivefiles(m_payload.failedtoarchivefiles() + rs.files);
+      auto & p = pointerMap.at(af.fSeq);
+      if (!p.archiveCopyNbsAccounted.count(af.copyNb)) {
+        p.archiveCopyNbsAccounted.insert(af.copyNb);
+        m_payload.set_failedtoarchivebytes(m_payload.failedtoarchivebytes() + af.bytes);
+        m_payload.set_failedtoarchivefiles(m_payload.failedtoarchivefiles() + af.files);
         didUpdate = true;
       }
     } catch (std::out_of_range &) {
diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp
index 8513caa2af..c0d28598d1 100644
--- a/objectstore/RepackRequest.hpp
+++ b/objectstore/RepackRequest.hpp
@@ -66,8 +66,7 @@ private:
     std::string address;
     uint64_t fSeq;
     bool retrieveAccounted;
-    bool archiveAccounted;
-    bool failureAccounted;
+    std::set<uint32_t> archiveCopyNbsAccounted;
     bool subrequestDeleted;
     typedef std::map<uint64_t, RepackSubRequestPointer> Map;
     void serialize (serializers::RepackSubRequestPointer & rsrp);
@@ -83,6 +82,8 @@ public:
     uint64_t fSeq;
     uint64_t files = 1;
     uint64_t bytes;
+    /// CopyNb is needed to record archive jobs statistics (we can have several archive jobs for the same fSeq)
+    uint32_t copyNb = 0;
     typedef std::list<SubrequestStatistics> List;
     bool operator< (const SubrequestStatistics & o) const { return fSeq < o.fSeq; }
   };
diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp
index a46aa7358d..7b288bc5e1 100644
--- a/objectstore/RetrieveRequest.cpp
+++ b/objectstore/RetrieveRequest.cpp
@@ -529,6 +529,7 @@ void RetrieveRequest::setRepackInfo(const RepackInfo& repackInfo) {
     }
     m_payload.mutable_repack_info()->set_file_buffer_url(repackInfo.fileBufferURL);
     m_payload.mutable_repack_info()->set_repack_request_address(repackInfo.repackRequestAddress);
+    m_payload.mutable_repack_info()->set_fseq(repackInfo.fSeq);
   }
 }
 
@@ -789,6 +790,7 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string
               ri.fileBufferURL = payload.repack_info().file_buffer_url();
               ri.isRepack = true;
               ri.repackRequestAddress = payload.repack_info().repack_request_address();
+              ri.fSeq = payload.repack_info().fseq();
             }
             // TODO serialization of payload maybe not necessary
             oh.set_payload(payload.SerializePartialAsString());
diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp
index 330642d080..003af3fe3f 100644
--- a/objectstore/RetrieveRequest.hpp
+++ b/objectstore/RetrieveRequest.hpp
@@ -56,7 +56,7 @@ public:
     uint32_t copyNb;
     serializers::RetrieveJobStatus status;
   };
-  // An asynchronous job ownership updating class.
+  // An asynchronous request deleting class.
   class AsyncJobDeleter {
     friend class RetrieveRequest;
   public:
@@ -148,6 +148,7 @@ public:
     std::map<uint32_t, std::string> archiveRouteMap;
     std::set<uint32_t> copyNbsToRearchive;
     std::string repackRequestAddress;
+    uint64_t fSeq;
     std::string fileBufferURL;
   };
   void setRepackInfo(const RepackInfo & repackInfo);
@@ -165,6 +166,7 @@ public:
       for (auto cntr: copyNbsToRearchive) rrri.mutable_copy_nbs_to_rearchive()->Add(cntr);
       rrri.set_file_buffer_url(fileBufferURL);
       rrri.set_repack_request_address(repackRequestAddress);
+      rrri.set_fseq(fSeq);
     }
     
     void deserialize(const cta::objectstore::serializers::RetrieveRequestRepackInfo & rrri) {
@@ -173,6 +175,7 @@ public:
       for(auto &cntr: rrri.copy_nbs_to_rearchive()) { copyNbsToRearchive.insert(cntr); }
       fileBufferURL = rrri.file_buffer_url();
       repackRequestAddress = rrri.repack_request_address();
+      fSeq = rrri.fseq();
     }
   };
 private:
diff --git a/objectstore/cta.proto b/objectstore/cta.proto
index ed5b3414ea..08c040577c 100644
--- a/objectstore/cta.proto
+++ b/objectstore/cta.proto
@@ -377,6 +377,7 @@ message RetrieveRequestRepackInfo {
   repeated uint32 copy_nbs_to_rearchive = 9510;
   required string repack_request_address = 9520;
   required string file_buffer_url = 9530;
+  required uint64 fseq = 9540;
 }
 
 message RetrieveRequest {
@@ -491,15 +492,15 @@ enum RepackRequestStatus {
 // sub request should be interpreted as an unfulfilled creation intent (deleted=false) and create the
 // missing sub request or the completion of the request (which can happen anytime after sub request
 // creation).
-// Likewise, the "accounted" booleans will prevent double counting in case a report (for success or failure)
-// need to be retried after a process failure.
+// Likewise, the "accounted" booleans or copyNbs will prevent double counting in case a report (for success or failure)
+// need to be retried after a process failure. The same flag is used for both success and failure. Archive requires a
+// set of copyNbs as a single repack can lead to several archives (in case we create new copies).
 message RepackSubRequestPointer {
   required string address = 10500;
   required uint64 fseq = 10510;
-  required bool retrieveaccounted = 10530;
-  required bool archiveaccounted = 10534;
-  required bool failureaccounted = 10537;
-  required bool subrequestdeleted = 10540;
+  required bool retrieve_accounted = 10530;
+  repeated uint32 archive_copynb_accounted = 10534;
+  required bool subrequest_deleted = 10540;
 }
 
 message RepackRequest {
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index e400149d6e..ccd0907a45 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -1523,9 +1523,227 @@ std::unique_ptr<SchedulerDatabase::RepackRequest> OStoreDB::getNextRepackJobToEx
 // OStoreDB::getNextRepackJobToExpand()
 //------------------------------------------------------------------------------
 std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextRepackReportBatch(log::LogContext& lc) {
+  try {
+    return getNextSuccessfulRetrieveRepackReportBatch(lc);
+  } catch (NoRepackReportBatchFound &) {}
   return nullptr;
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::getNextSuccessfulRetrieveRepackReportBatch()
+//------------------------------------------------------------------------------
+std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(log::LogContext& lc) {
+  typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess> Carqtrtrfs;
+  Carqtrtrfs algo(this->m_objectStore, *m_agentReference);
+  // Decide from which queue we are going to pop.
+  RootEntry re(m_objectStore);
+  re.fetchNoLock();
+  while(true) {
+    auto queueList = re.dumpRetrieveQueues(JobQueueType::JobsToReportToRepackForSuccess);
+    if (queueList.empty()) throw NoRepackReportBatchFound("In OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(): no queue found.");
+
+    // Try to get jobs from the first queue. If it is empty, it will be trimmed, so we can go for another round.
+    Carqtrtrfs::PopCriteria criteria;
+    criteria.files = c_repackReportBatchSize;
+    auto jobs = algo.popNextBatch(queueList.front().vid, criteria, lc);
+    if(jobs.elements.empty()) continue;
+    std::unique_ptr<RepackRetrieveSuccessesReportBatch> privateRet;
+    privateRet.reset(new RepackRetrieveSuccessesReportBatch(m_objectStore, *this));
+    std::set<std::string> repackRequestAddresses;
+    for(auto &j : jobs.elements)
+    {
+      privateRet->m_subrequestList.emplace_back(RepackRetrieveSuccessesReportBatch::SubrequestInfo());
+      auto & sr = privateRet->m_subrequestList.back();
+      sr.repackInfo = j.repackInfo;
+      sr.archiveFile = j.archiveFile;
+      sr.subrequest.reset(j.retrieveRequest.release());
+      repackRequestAddresses.insert(j.repackInfo.repackRequestAddress);
+    }
+    // As we are popping from a single report queue, all requests should concern only one repack request.
+    if (repackRequestAddresses.size() != 1) {
+      std::stringstream err;
+      err << "In OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(): reports for several repack requests in the same queue. ";
+      for (auto & rr: repackRequestAddresses) { err << rr << " "; }
+      throw exception::Exception(err.str());
+    }
+    privateRet->m_repackRequest.setAddress(*repackRequestAddresses.begin());
+    
+    return std::unique_ptr<SchedulerDatabase::RepackReportBatch>(privateRet.release());
+  }
+  throw NoRepackReportBatchFound("In OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(): no report found.");
+}
+
+//------------------------------------------------------------------------------
+// OStoreDB::getNextSuccessfulRetrieveRepackReportBatch()
+//------------------------------------------------------------------------------
+void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) {
+  // We have a batch of popped requests to report. We will first record them in the repack requests (update statistics),
+  // then transform the requests (retrieve to archives) and finally queue the archive jobs in the right queues.
+  // As usual there are many opportunities for failure.
+  utils::Timer t;
+  log::TimingList timingList;
+  
+  // 1) Update statistics. As the repack request is protected against double reporting, we can release its lock
+  // before the next step.
+  {
+    // Prepare the report
+    objectstore::RepackRequest::SubrequestStatistics::List ssl;
+    for (auto &rr: m_subrequestList) {
+      ssl.push_back(objectstore::RepackRequest::SubrequestStatistics());
+      ssl.back().bytes = rr.archiveFile.fileSize;
+      ssl.back().files = 1;
+      ssl.back().fSeq = rr.repackInfo.fSeq;
+    }
+    // Record it.
+    timingList.insertAndReset("successStatsPrepareTime", t);
+    objectstore::ScopedExclusiveLock rrl(m_repackRequest);
+    timingList.insertAndReset("successStatsLockTime", t);
+    m_repackRequest.fetch();
+    timingList.insertAndReset("successStatsFetchTime", t);
+    m_repackRequest.reportRetriveSuccesses(ssl);
+    timingList.insertAndReset("successStatsUpdateTime", t);
+    m_repackRequest.commit();
+    timingList.insertAndReset("successStatsCommitTime", t);
+  }
+  
+  // 2) We should async transform the retrieve requests into archive requests.
+  // From this point on, failing to transform is counted as a failure to archive.
+  {
+    objectstore::RepackRequest::SubrequestStatistics::List failedArchiveSSL;
+    std::list<SubrequestInfo *> failedSubrequests;
+    struct AsyncTransformerAndReq {
+      SubrequestInfo & subrequestInfo;
+      std::unique_ptr<objectstore::RetrieveRequest::AsyncRetrieveToArchiveTransformer> transformer;
+    };
+    std::list<AsyncTransformerAndReq> asyncTransformsAndReqs;
+    for (auto &rr: m_subrequestList) {
+      try {
+        asyncTransformsAndReqs.push_back({
+          rr, 
+          std::unique_ptr<objectstore::RetrieveRequest::AsyncRetrieveToArchiveTransformer>(
+            rr.subrequest->asyncTransformToArchiveRequest(*m_oStoreDb.m_agentReference)
+          )
+        });
+      } catch (exception::Exception & ex) {
+        // We failed to archive the file (to create the request, in fact). So all the copyNbs 
+        // can be counted as failed.
+        for (auto cnbtr: rr.repackInfo.copyNbsToRearchive) {
+          failedArchiveSSL.push_back(objectstore::RepackRequest::SubrequestStatistics());
+          auto & fassl = failedArchiveSSL.back();
+          fassl.bytes = rr.archiveFile.fileSize;
+          fassl.files = 1;
+          fassl.fSeq = rr.repackInfo.fSeq;
+          fassl.copyNb = cnbtr;
+        }
+        // We will need to delete the request too.
+        failedSubrequests.push_back(&rr);
+        // Log the error
+        log::ScopedParamContainer params(lc);
+        params.add("fileId", rr.archiveFile.archiveFileID)
+              .add("subrequestAddress", rr.subrequest->getAddressIfSet())
+              .add("exceptionMsg", ex.getMessageValue());
+        lc.log(log::ERR, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): failed to asyncTransformToArchiveRequest().");
+      }
+    }
+    timingList.insertAndReset("asyncTransformLaunchTime", t);
+
+    // 2. b. Deal with transformation results (and log the transformation...
+    for (auto &atar: asyncTransformsAndReqs) {
+      try {
+        atar.transformer->wait();
+        // Log the transformation
+        log::ScopedParamContainer params(lc);
+        params.add("fileId", atar.subrequestInfo.archiveFile.archiveFileID)
+              .add("subrequestAddress", atar.subrequestInfo.subrequest->getAddressIfSet());
+        lc.log(log::INFO, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(), turned successful retrieve request in archive request.");
+      } catch (exception::Exception & ex) {
+        // We failed to archive the file (to create the request, in fact). So all the copyNbs 
+        // can be counted as failed.
+        for (auto cnbtr: atar.subrequestInfo.repackInfo.copyNbsToRearchive) {
+          failedArchiveSSL.push_back(objectstore::RepackRequest::SubrequestStatistics());
+          auto & fassl = failedArchiveSSL.back();
+          fassl.bytes = atar.subrequestInfo.archiveFile.fileSize;
+          fassl.files = 1;
+          fassl.fSeq = atar.subrequestInfo.repackInfo.fSeq;
+          fassl.copyNb = cnbtr;
+        }
+        // We will need to delete the request too.
+        failedSubrequests.push_back(&atar.subrequestInfo);
+        // Log the error
+        log::ScopedParamContainer params(lc);
+        params.add("fileId", atar.subrequestInfo.archiveFile.archiveFileID)
+              .add("subrequestAddress", atar.subrequestInfo.subrequest->getAddressIfSet())
+              .add("exceptionMsg", ex.getMessageValue());
+        lc.log(log::ERR, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): async transformation failed on wait().");
+      }
+    }
+    timingList.insertAndReset("asyncTransformCompletionTime", t);
+    
+    // 3) Deal with transformation failures (and delete the requests) : 
+    // - record the deletion intent in + status in repack request results
+    // - async delete
+    // - wait deletions
+    if (failedSubrequests.size()) {
+      // Record the stats (before deleting the requests, otherwise we could leak some counting
+      // in case of failure).
+      objectstore::ScopedExclusiveLock rrl(m_repackRequest);
+      timingList.insertAndReset("failureStatsLockTime", t);
+      m_repackRequest.fetch();
+      timingList.insertAndReset("failureStatsFetchTime", t);
+      m_repackRequest.reportArchiveFailures(failedArchiveSSL);
+      timingList.insertAndReset("failureStatsUpdateTime", t);
+      m_repackRequest.commit();
+      timingList.insertAndReset("failureStatsCommitTime", t);
+      // And now delete the requests
+      struct AsyncDeleteAndReq {
+        SubrequestInfo & subrequestInfo;
+        std::unique_ptr<RetrieveRequest::AsyncJobDeleter> deleter;
+      };
+      std::list<std::string> retrieveRequestsToUnown;
+      std::list<AsyncDeleteAndReq> asyncDeleterAndReqs;
+      for (auto &fs: failedSubrequests) {
+        // This is the end of error handling. If we fail to delete a request, so be it.
+        retrieveRequestsToUnown.push_back(fs->subrequest->getAddressIfSet());
+        try {
+          asyncDeleterAndReqs.push_back({*fs, 
+              std::unique_ptr<RetrieveRequest::AsyncJobDeleter>(fs->subrequest->asyncDeleteJob())});
+        } catch (cta::exception::Exception &ex) {
+          // Log the failure to delete.
+          log::ScopedParamContainer params(lc);
+          params.add("fileId", fs->archiveFile.archiveFileID)
+                .add("subrequestAddress", fs->subrequest->getAddressIfSet())
+                .add("excepitonMsg", ex.getMessageValue());
+          lc.log(log::ERR, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): failed to asyncDelete() retrieve request.");
+        }
+      }
+      timingList.insertAndReset("asyncDeleteRetrieveLaunchTime", t);
+      for (auto &adar: asyncDeleterAndReqs) {
+        try {
+          adar.deleter->wait();
+          // Log the deletion
+          log::ScopedParamContainer params(lc);
+          params.add("fileId", adar.subrequestInfo.archiveFile.archiveFileID)
+                .add("subrequestAddress", adar.subrequestInfo.subrequest->getAddressIfSet());
+          lc.log(log::INFO, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): deleted retrieve request after failure to transform in archive request.");
+        } catch (cta::exception::Exception & ex) {
+          // Log the failure to delete.
+          log::ScopedParamContainer params(lc);
+          params.add("fileId", adar.subrequestInfo.archiveFile.archiveFileID)
+                .add("subrequestAddress", adar.subrequestInfo.subrequest->getAddressIfSet())
+                .add("excepitonMsg", ex.getMessageValue());
+          lc.log(log::ERR, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): async deletion of retrieve request failed on wait().");
+        }
+      }
+      timingList.insertAndReset("asyncDeleteRetrieveWaitTime", t);
+      m_oStoreDb.m_agentReference->removeBatchFromOwnership(retrieveRequestsToUnown, m_oStoreDb.m_objectStore);
+      timingList.insertAndReset("removeDeletedRetrieveFromOwnershipTime", t);
+    }
+  }
+  
+  // 3. We now just need to queue the freshly created archive jobs into their respective queues
+  // XXX: TODO
+}
+
 //------------------------------------------------------------------------------
 // OStoreDB::RepackRequest::getLastExpandedFSeq()
 //------------------------------------------------------------------------------
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index 5638b95b33..7dff16c9e8 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -396,8 +396,56 @@ public:
    * @return a unique_ptr holding the RepackRequest
    */
   std::unique_ptr<SchedulerDatabase::RepackRequest> getNextRepackJobToExpand() override;
+ 
+  class RepackRetrieveSuccessesReportBatch;
+  class RepackRetrieveFailureReportBatch;
+  class RepackArchiveSuccessesReportBatch;
+  class RepackArchiveFailureReportBatch;
+  friend class RepackRetrieveSuccessesReportBatch;
+  friend class RepackRetrieveFailureReportBatch;
+  friend class RepackArchiveSuccessesReportBatch;
+  friend class RepackArchiveFailureReportBatch;
+  /**
+   * Base class handling the commonalities
+   */
+  class RepackReportBatch: public SchedulerDatabase::RepackReportBatch {
+    friend class OStoreDB;
+    friend class RepackRetrieveSuccessesReportBatch;
+    friend class RepackRetrieveFailureReportBatch;
+    friend class RepackArchiveSuccessesReportBatch;
+    friend class RepackArchiveFailureReportBatch;
+    RepackReportBatch(objectstore::Backend & backend, OStoreDB & oStoreDb): m_repackRequest(backend), m_oStoreDb(oStoreDb) {}
+  protected:
+    objectstore::RepackRequest m_repackRequest;
+    OStoreDB & m_oStoreDb;
+    template <class SR> 
+    struct SubrequestInfo {
+      /// CopyNb is only useful for archive requests where we want to distinguish several jobs.
+      uint32_t archivedCopyNb = 0;
+      std::shared_ptr<SR> subrequest;
+      common::dataStructures::ArchiveFile archiveFile;
+      objectstore::RetrieveRequest::RepackInfo repackInfo;
+      typedef std::list<SubrequestInfo> List;
+    };
+  };
   
-  std::unique_ptr<RepackReportBatch> getNextRepackReportBatch(log::LogContext& lc) override;
+  class RepackRetrieveSuccessesReportBatch: public RepackReportBatch {
+    friend class OStoreDB;
+    RepackRetrieveSuccessesReportBatch(objectstore::Backend & backend, OStoreDB & oStoreDb):
+      RepackReportBatch(backend,oStoreDb) {}
+  public:
+    void report(log::LogContext& lc) override;
+  private:
+    typedef RepackReportBatch::SubrequestInfo<objectstore::RetrieveRequest> SubrequestInfo;
+    SubrequestInfo::List m_subrequestList;
+  };
+  
+  std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextRepackReportBatch(log::LogContext& lc) override;
+private:
+  CTA_GENERATE_EXCEPTION_CLASS(NoRepackReportBatchFound);
+  const size_t c_repackReportBatchSize = 500;
+  std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextSuccessfulRetrieveRepackReportBatch(log::LogContext& lc);
+public:
 
   /* === Drive state handling  ============================================== */
   /**
-- 
GitLab