From dabbdd104f6562385c2b2e36fb66de9b7af0c7db Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Wed, 15 Nov 2017 10:22:53 +0100
Subject: [PATCH] Moved the archive job reporting loop from client into the
 mount.

This allowed the archive requeuest to be removed from ownership bulk.
Also adapted unit tests and related Mocks (mainly MockArchiveMount).
Removed unused functions from job level updating.
---
 scheduler/ArchiveJob.cpp                      |  67 +----------
 scheduler/ArchiveJob.hpp                      |  23 +---
 scheduler/ArchiveMount.cpp                    | 109 +++++++++++-------
 scheduler/ArchiveMount.hpp                    |   4 +-
 scheduler/OStoreDB/OStoreDB.cpp               |  43 ++++++-
 scheduler/OStoreDB/OStoreDB.hpp               |  14 ++-
 scheduler/SchedulerDatabase.hpp               |   7 +-
 scheduler/SchedulerDatabaseTest.cpp           |  12 +-
 scheduler/SchedulerTest.cpp                   |   8 +-
 scheduler/testingMocks/MockArchiveJob.hpp     |   7 +-
 scheduler/testingMocks/MockArchiveMount.hpp   |  32 +++++
 .../daemon/MigrationReportPacker.cpp          |   2 +-
 .../daemon/MigrationReportPackerTest.cpp      |  14 +--
 13 files changed, 178 insertions(+), 164 deletions(-)

diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp
index ea8b52c02a..595b08a028 100644
--- a/scheduler/ArchiveJob.cpp
+++ b/scheduler/ArchiveJob.cpp
@@ -41,45 +41,12 @@ cta::ArchiveJob::ArchiveJob(ArchiveMount &mount,
   tapeFile(tapeFile) {}
 
 //------------------------------------------------------------------------------
-// asyncSetJobSucceed
+// asyncReportComplete
 //------------------------------------------------------------------------------
-void cta::ArchiveJob::asyncSetJobSucceed() {
-  m_dbJob->asyncSucceed();
-}
-
-//------------------------------------------------------------------------------
-// asyncSetJobSucceed
-//------------------------------------------------------------------------------
-void cta::ArchiveJob::asyncSucceedAndWaitJobsBatch(std::list<std::unique_ptr<cta::ArchiveJob> >& jobs) {
-  // Call succeed on all jobs
-  for (auto & j: jobs) {
-    j->asyncSetJobSucceed();
-  }
-}
-
-
-//------------------------------------------------------------------------------
-// asyncSetJobsSucceed
-//------------------------------------------------------------------------------
-void cta::ArchiveJob::asyncSetJobsBatchSucceed(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs) {
-  // We need a handle on the mount (all jobs are supposed to come from the same mount.
-  // It will be provided indirectly by a non-static member function of one job (if any).
-  if (jobs.size()) {
-    jobs.front()->asyncSucceedAndWaitJobsBatch(jobs);
-  }
-}
-
-//------------------------------------------------------------------------------
-// checkAndReportComplete
-//------------------------------------------------------------------------------
-bool cta::ArchiveJob::checkAndAsyncReportComplete() {
-  if (m_dbJob->checkSucceed()) {
-    m_reporter.reset(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState));
-    m_reporter->asyncReportArchiveFullyComplete();
-    m_reporterTimer.reset();
-    return true;
-  }
-  return false;
+void cta::ArchiveJob::asyncReportComplete() {
+  m_reporter.reset(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState));
+  m_reporter->asyncReportArchiveFullyComplete();
+  m_reporterTimer.reset();
 }
 
 //------------------------------------------------------------------------------
@@ -89,30 +56,6 @@ double cta::ArchiveJob::reportTime() {
   return m_reporterTimer.secs();
 }
 
-//------------------------------------------------------------------------------
-// ArchiveJob::writeToCatalogue
-//------------------------------------------------------------------------------
-void cta::ArchiveJob::writeToCatalogue() {
-  catalogue::TapeFileWritten fileReport;
-  fileReport.archiveFileId = archiveFile.archiveFileID;
-  fileReport.blockId = tapeFile.blockId;
-  fileReport.checksumType = tapeFile.checksumType;
-  fileReport.checksumValue = tapeFile.checksumValue;
-  fileReport.compressedSize = tapeFile.compressedSize;
-  fileReport.copyNb = tapeFile.copyNb;
-  fileReport.diskFileId = archiveFile.diskFileId;
-  fileReport.diskFileUser = archiveFile.diskFileInfo.owner;
-  fileReport.diskFileGroup = archiveFile.diskFileInfo.group;
-  fileReport.diskFilePath = archiveFile.diskFileInfo.path;
-  fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob;
-  fileReport.diskInstance = archiveFile.diskInstance;
-  fileReport.fSeq = tapeFile.fSeq;
-  fileReport.size = archiveFile.fileSize;
-  fileReport.storageClassName = archiveFile.storageClass;
-  fileReport.tapeDrive = m_mount.getDrive();
-  fileReport.vid = tapeFile.vid;
-  m_catalogue.filesWrittenToTape (std::set<catalogue::TapeFileWritten>{fileReport});
-}
 //------------------------------------------------------------------------------
 // ArchiveJob::validateAndGetTapeFileWritten
 //------------------------------------------------------------------------------
diff --git a/scheduler/ArchiveJob.hpp b/scheduler/ArchiveJob.hpp
index 629cfb86bc..5255983600 100644
--- a/scheduler/ArchiveJob.hpp
+++ b/scheduler/ArchiveJob.hpp
@@ -72,29 +72,15 @@ public:
   CTA_GENERATE_EXCEPTION_CLASS(ChecksumNotSet);
   CTA_GENERATE_EXCEPTION_CLASS(ChecksumMismatch);
   
-  /**
-   * Indicates that the job was successful and updates the backend store 
-   * asynchronously. 
-   */
-  virtual void asyncSetJobSucceed();
-  
   /**
    * Start an asynchronous update for a batch of jobs and then make sure they complete.
    */
   static void asyncSetJobsBatchSucceed(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs);
-  
-protected:
-  /**
-   * Backend asynchronous batch of jobs update implementation. The default implementation
-   * can be overridden for efficiency.
-   */
-  virtual void asyncSucceedAndWaitJobsBatch(std::list<std::unique_ptr<cta::ArchiveJob>> & jobs);
 public:
   /**
-   * Wait if the job was updated in the backend store asynchronously. 
-   * @return true if the archive was also sent to client asynchronously.
+   * Launch a report to the user.
    */
-  virtual bool checkAndAsyncReportComplete();
+  virtual void asyncReportComplete();
   
   /**
    * Get the report time (in seconds).
@@ -108,11 +94,6 @@ public:
    */
   virtual void validate();
   
-  /**
-   * Update the catalog with the archive request.
-   */
-  virtual void writeToCatalogue();
-  
   /**
    * Validate that archiveFile and tapeFile fields are set correctly for archive
    * request.
diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp
index 9f254e9586..1c27486fbd 100644
--- a/scheduler/ArchiveMount.cpp
+++ b/scheduler/ArchiveMount.cpp
@@ -123,78 +123,106 @@ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(
 //------------------------------------------------------------------------------
 // reportJobsBatchWritten
 //------------------------------------------------------------------------------
-void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs,
+void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs,
   cta::log::LogContext& logContext) {
   std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten;
   std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;
   std::unique_ptr<cta::ArchiveJob> job;
   try{
+    uint64_t files=0;
+    uint64_t bytes=0;
+    double catalogueTime=0;
+    double schedulerDbTime=0;
+    double clientReportingTime=0;
     while(!successfulArchiveJobs.empty()) {
       // Get the next job to report and make sure we will not attempt to process it twice.
       job = std::move(successfulArchiveJobs.front());
       successfulArchiveJobs.pop();
       if (!job.get()) continue;        
       tapeFilesWritten.insert(job->validateAndGetTapeFileWritten());
+      files++;
+      bytes+=job->archiveFile.fileSize;
       validatedSuccessfulArchiveJobs.emplace_back(std::move(job));      
       job.reset(nullptr);
     }
-    
+    utils::Timer t;
     // Note: former content of ReportFlush::updateCatalogueWithTapeFilesWritten
     updateCatalogueWithTapeFilesWritten(tapeFilesWritten);
+    catalogueTime=t.secs(utils::Timer::resetCounter);
     {
       cta::log::ScopedParamContainer params(logContext);
-      params.add("tapeFilesWritten", tapeFilesWritten.size());
-      logContext.log(cta::log::INFO,"Catalog updated for batch of jobs");   
+      params.add("tapeFilesWritten", tapeFilesWritten.size())
+            .add("files", files)
+            .add("bytes", bytes)
+            .add("catalogueTime", catalogueTime);
+      logContext.log(cta::log::DEBUG,"Catalog updated for batch of jobs");   
     }
-    for (auto &job: validatedSuccessfulArchiveJobs) {
-      job->asyncSetJobSucceed();
+    
+    // Now get the db mount to mark the jobs as successful.
+    // Extract the db jobs from the scheduler jobs.
+    std::list<cta::SchedulerDatabase::ArchiveJob *> validatedSuccessfulDBArchiveJobs;
+    for (auto &schJob: validatedSuccessfulArchiveJobs) {
+      validatedSuccessfulDBArchiveJobs.emplace_back(schJob->m_dbJob.get());
     }
     
-    // Note:  former content of ReportFlush::checkAndAsyncReportCompletedJobs
-    std::list<std::unique_ptr <cta::ArchiveJob> > reportedArchiveJobs;
-
-    for (auto &job: validatedSuccessfulArchiveJobs){
+    // We can now pass this list for the dbMount to process.
+    // The dbMount will indicate the list of jobs that need to the reported to
+    // the client (the complete ones) in the report set.
+    std::set<cta::SchedulerDatabase::ArchiveJob *> jobsToReport = 
+        m_dbMount->setJobBatchSuccessful(validatedSuccessfulDBArchiveJobs, logContext);
+    schedulerDbTime=t.secs(utils::Timer::resetCounter);
+    // We have the list of files to report to the user and the that just needed 
+    // an update.
+    for (auto &job: validatedSuccessfulArchiveJobs) {
       cta::log::ScopedParamContainer params(logContext);
       params.add("fileId", job->archiveFile.archiveFileID)
             .add("diskInstance", job->archiveFile.diskInstance)
             .add("diskFileId", job->archiveFile.diskFileId)
             .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
-      logContext.log(cta::log::DEBUG,
-        "In MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs()"
-        " check for async backend update finished");
-      if(job->checkAndAsyncReportComplete()) { 
-        params.add("reportURL", job->reportURL());
-        reportedArchiveJobs.emplace_back(std::move(job));
-        logContext.log(cta::log::INFO,"Sent to the client a full file archival");
+      if (jobsToReport.count(job->m_dbJob.get())) {
+        logContext.log(cta::log::DEBUG,
+          "In ArchiveMount::reportJobsBatchWritten(): archive request complete. Will launch async report to user.");
+        job->asyncReportComplete();
       } else {
-        logContext.log(cta::log::INFO, "Recorded the partial migration of a file");
+        logContext.log(cta::log::DEBUG,
+          "In ArchiveMount::reportJobsBatchWritten(): Recorded the partial migration of a file.");
       }
     }
-
-    for (const auto &job: reportedArchiveJobs){
-      try {
-        job->waitForReporting();
-        cta::log::ScopedParamContainer params(logContext);
-        params.add("fileId", job->archiveFile.archiveFileID)
-              .add("diskInstance", job->archiveFile.diskInstance)
-              .add("diskFileId", job->archiveFile.diskFileId)
-              .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
-              .add("reportURL", job->reportURL())
-              .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
-              .add("reportTime", job->reportTime());
-        logContext.log(cta::log::INFO,"Reported to the client a full file archival");
-      } catch(cta::exception::Exception &ex) {
-        cta::log::ScopedParamContainer params(logContext);
+    
+    // Now gather the result of the reporting to client.
+    for (auto &job: validatedSuccessfulArchiveJobs) {
+      if (jobsToReport.count(job->m_dbJob.get())) {
+        try {
+          job->waitForReporting();
+          cta::log::ScopedParamContainer params(logContext);
           params.add("fileId", job->archiveFile.archiveFileID)
                 .add("diskInstance", job->archiveFile.diskInstance)
                 .add("diskFileId", job->archiveFile.diskFileId)
-                .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path).add("reportURL", job->reportURL())
-                .add("errorMessage", ex.getMessage().str());
-          logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:");
+                .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
+                .add("reportURL", job->reportURL())
+                .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
+                .add("reportTime", job->reportTime());
+          logContext.log(cta::log::INFO,"Reported to the client a full file archival");
+        } catch(cta::exception::Exception &ex) {
+          cta::log::ScopedParamContainer params(logContext);
+            params.add("fileId", job->archiveFile.archiveFileID)
+                  .add("diskInstance", job->archiveFile.diskInstance)
+                  .add("diskFileId", job->archiveFile.diskFileId)
+                  .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path).add("reportURL", job->reportURL())
+                  .add("errorMessage", ex.getMessage().str());
+            logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:");
+        }
       }
     }
-           
-    logContext.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape");    
+    clientReportingTime=t.secs();
+    cta::log::ScopedParamContainer params(logContext);
+    params.add("files", files)
+          .add("bytes", bytes)
+          .add("catalogueTime", catalogueTime)
+          .add("schedulerDbTime", schedulerDbTime)
+          .add("clientReportingTime", clientReportingTime)
+          .add("totalTime", catalogueTime  + schedulerDbTime + clientReportingTime);
+    logContext.log(log::INFO, "In ArchiveMount::reportJobsBatchWritten(): recorded a batch of archive jobs in metadata.");
   } catch(const cta::exception::Exception& e){
     cta::log::ScopedParamContainer params(logContext);
     params.add("exceptionMessageValue", e.getMessageValue());
@@ -205,7 +233,7 @@ void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::A
             .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
             .add("reportURL", job->reportURL());
     }
-    const std::string msg_error="An exception was caught trying to call reportMigrationResults";
+    const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception";
     logContext.log(cta::log::ERR, msg_error);
     throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error);
   } catch(const std::exception& e){
@@ -217,7 +245,7 @@ void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::A
             .add("diskFileId", job->archiveFile.diskFileId)
             .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
     }
-    const std::string msg_error="An std::exception was caught trying to call reportMigrationResults";
+    const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an standard exception";
     logContext.log(cta::log::ERR, msg_error);
     throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error);
   }
@@ -267,4 +295,3 @@ void cta::ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver::daem
 void cta::ArchiveMount::setTapeFull() {
   m_catalogue.noSpaceLeftOnTape(m_dbMount->getMountInfo().vid);
 }
-
diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp
index 0af070f07f..81e53a4b20 100644
--- a/scheduler/ArchiveMount.hpp
+++ b/scheduler/ArchiveMount.hpp
@@ -32,7 +32,7 @@
 namespace cta {
   /**
    * The class driving a retrieve mount.
-   * The class only has private constructors as it is instanciated by
+   * The class only has private constructors as it is instantiated by
    * the Scheduler class.
    */
   class ArchiveMount: public TapeMount {
@@ -131,7 +131,7 @@ namespace cta {
      * @param successfulArchiveJobs the jobs to report
      * @param logContext
      */
-    void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, cta::log::LogContext &logContext);
+    virtual void reportJobsBatchWritten (std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, cta::log::LogContext &logContext);
     
     /**
      * Returns the tape pool of the tape to be mounted.
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index 9146f6e8e6..f820989558 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -2547,6 +2547,41 @@ void OStoreDB::ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver:
   m_oStoreDB.updateDriveStatistics(driveInfo, inputs, lc);
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::ArchiveMount::castFromSchedDBJob()
+//------------------------------------------------------------------------------
+OStoreDB::ArchiveJob * OStoreDB::ArchiveMount::castFromSchedDBJob(SchedulerDatabase::ArchiveJob * job) {
+  OStoreDB::ArchiveJob * ret = dynamic_cast<OStoreDB::ArchiveJob *> (job);
+  if (!ret) {
+    std::string unexpectedType = typeid(*job).name();
+    throw cta::exception::Exception(std::string("In OStoreDB::ArchiveMount::castFromSchedDBJob(): unexpected archive job type while casting: ")+
+        unexpectedType);
+  }
+  return ret;
+}
+
+//------------------------------------------------------------------------------
+// OStoreDB::ArchiveMount::asyncSetJobBatchSuccessful()
+//------------------------------------------------------------------------------
+std::set<cta::SchedulerDatabase::ArchiveJob*> OStoreDB::ArchiveMount::setJobBatchSuccessful(
+  std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::LogContext & lc) {
+  std::set<cta::SchedulerDatabase::ArchiveJob*> ret;
+  std::list<std::string> ajToUnown;
+  // We will asynchronously report the archive jobs (which MUST be OStoreDBJobs).
+  // We let the exceptions through as failing to report is fatal.
+  for (auto & sDBJob: jobsBatch) {
+    castFromSchedDBJob(sDBJob)->asyncSucceed();
+  }
+  for (auto & sDBJob: jobsBatch) {
+    if (castFromSchedDBJob(sDBJob)->waitAsyncSucceed())
+      ret.insert(sDBJob);
+    ajToUnown.push_back(castFromSchedDBJob(sDBJob)->m_archiveRequest.getAddressIfSet());
+  }
+  m_oStoreDB.m_agentReference->removeBatchFromOwnership(ajToUnown, m_oStoreDB.m_objectStore);
+  return ret;
+}
+
+
 //------------------------------------------------------------------------------
 // OStoreDB::ArchiveJob::fail()
 //------------------------------------------------------------------------------
@@ -2612,7 +2647,7 @@ void OStoreDB::ArchiveJob::asyncSucceed() {
 //------------------------------------------------------------------------------
 // OStoreDB::ArchiveJob::checkSucceed()
 //------------------------------------------------------------------------------
-bool OStoreDB::ArchiveJob::checkSucceed() {  
+bool OStoreDB::ArchiveJob::waitAsyncSucceed() {  
   m_jobUpdate->wait();
   log::LogContext lc(m_oStoreDB.m_logger);
   log::ScopedParamContainer params(lc);
@@ -2623,11 +2658,7 @@ bool OStoreDB::ArchiveJob::checkSucceed() {
   }
   // We no more own the job (which could be gone)
   m_jobOwned = false;
-  // Remove ownership from agent
-  const std::string atfrAddress = m_archiveRequest.getAddressIfSet();
-  m_oStoreDB.m_agentReference->removeFromOwnership(atfrAddress, m_oStoreDB.m_objectStore);
-  params.add("agentObject", m_oStoreDB.m_agentReference->getAgentAddress());
-  lc.log(log::DEBUG, "Removed job from ownership");
+  // Ownership removal will be done globally by the caller.
   return m_jobUpdate->m_isLastJob;
 }
 
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index 925f798e09..992f6d8db4 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -115,6 +115,7 @@ public:
   void trimEmptyQueues(log::LogContext& lc) override;
 
   /* === Archive Mount handling ============================================= */
+  class ArchiveJob;
   class ArchiveMount: public SchedulerDatabase::ArchiveMount {
     friend class TapeMountDecisionInfo;
   private:
@@ -123,11 +124,16 @@ public:
   public:
     CTA_GENERATE_EXCEPTION_CLASS(MaxFSeqNotGoingUp);
     const MountInfo & getMountInfo() override;
-    std::list<std::unique_ptr<ArchiveJob> > getNextJobBatch(uint64_t filesRequested, 
+    std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > getNextJobBatch(uint64_t filesRequested, 
       uint64_t bytesRequested, log::LogContext& logContext) override;
     void complete(time_t completionTime) override;
     void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
     void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override;
+  private:
+    OStoreDB::ArchiveJob * castFromSchedDBJob(SchedulerDatabase::ArchiveJob * job);
+  public:
+    std::set<cta::SchedulerDatabase::ArchiveJob*> setJobBatchSuccessful(
+      std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::LogContext & lc) override;
   };
   friend class ArchiveMount;
   
@@ -138,8 +144,10 @@ public:
     CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
     CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
     void fail(log::LogContext & lc) override;
-    void asyncSucceed() override;
-    bool checkSucceed() override;
+  private:
+    void asyncSucceed();
+    bool waitAsyncSucceed();
+  public:
     void bumpUpTapeFileCount(uint64_t newFileCount) override;
     ~ArchiveJob() override;
   private:
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index bac987e632..f9e3062ed7 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -172,6 +172,8 @@ public:
     virtual void complete(time_t completionTime) = 0;
     virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0;
     virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0;
+    virtual std::set<cta::SchedulerDatabase::ArchiveJob *> setJobBatchSuccessful(
+      std::list<cta::SchedulerDatabase::ArchiveJob *> & jobsBatch, log::LogContext & lc) = 0;
     virtual ~ArchiveMount() {}
     uint32_t nbFilesCurrentlyOnTape;
   };
@@ -180,16 +182,13 @@ public:
    * The class to handle the DB-side of a tape job.
    */
   class ArchiveJob {
+    friend class ArchiveMount;
   public:
     std::string srcURL;
     std::string archiveReportURL;
     cta::common::dataStructures::ArchiveFile archiveFile;
     cta::common::dataStructures::TapeFile tapeFile;
     virtual void fail(log::LogContext & lc) = 0;
-    /// Indicates a success to the DB. 
-    virtual void asyncSucceed() = 0;
-    /// Check a succeed job status. If this is the last job, return true.
-    virtual bool checkSucceed() = 0;
     virtual void bumpUpTapeFileCount(uint64_t newFileCount) = 0;
     virtual ~ArchiveJob() {}
   };
diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp
index d2d73c510d..57fc412ea0 100644
--- a/scheduler/SchedulerDatabaseTest.cpp
+++ b/scheduler/SchedulerDatabaseTest.cpp
@@ -203,10 +203,10 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
   while (!done) {
     auto aj = am->getNextJobBatch(1,1,lc);
     if (aj.size()) {
+      std::list <cta::SchedulerDatabase::ArchiveJob *> jobBatch;
+      jobBatch.emplace_back(std::move(aj.front()).get());
       count++;
-      //std::cout << aj->archiveFile.diskFileInfo.path << std::endl;
-      aj.front()->asyncSucceed();
-      aj.front()->checkSucceed();
+      am->setJobBatchSuccessful(jobBatch, lc);
     }
     else
       done = true;
@@ -290,10 +290,10 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
   while (!done) {
     auto aj = am->getNextJobBatch(1,1,lc);
     if (aj.size()) {
+      std::list <cta::SchedulerDatabase::ArchiveJob *> jobBatch;
+      jobBatch.emplace_back(aj.front().get());
       count++;
-      //std::cout << aj->archiveFile.diskFileInfo.path << std::endl;
-      aj.front()->asyncSucceed();
-      aj.front()->checkSucceed();
+      am->setJobBatchSuccessful(jobBatch, lc);
     }
     else
       done = true;
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index a9070717b6..fe45504f62 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -452,7 +452,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
     ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get());
     std::list<std::unique_ptr<cta::ArchiveJob>> archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
     ASSERT_NE((cta::ArchiveJob*)NULL, archiveJobBatch.front().get());
-    auto * archiveJob = archiveJobBatch.front().get();
+    std::unique_ptr<ArchiveJob> archiveJob = std::move(archiveJobBatch.front());
     archiveJob->tapeFile.blockId = 1;
     archiveJob->tapeFile.fSeq = 1;
     archiveJob->tapeFile.checksumType = "ADLER32";
@@ -460,9 +460,9 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
     archiveJob->tapeFile.compressedSize = archiveJob->archiveFile.fileSize;
     archiveJob->tapeFile.copyNb = 1;
     archiveJob->validate();
-    archiveJob->writeToCatalogue();
-    archiveJob->asyncSetJobSucceed();
-    archiveJob->checkAndAsyncReportComplete();
+    std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch;
+    sDBarchiveJobBatch.emplace(std::move(archiveJob));
+    archiveMount->reportJobsBatchWritten(sDBarchiveJobBatch, lc);
     archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
     ASSERT_EQ(0, archiveJobBatch.size());
     archiveMount->complete();
diff --git a/scheduler/testingMocks/MockArchiveJob.hpp b/scheduler/testingMocks/MockArchiveJob.hpp
index 53df1ac01b..0f57a2e312 100644
--- a/scheduler/testingMocks/MockArchiveJob.hpp
+++ b/scheduler/testingMocks/MockArchiveJob.hpp
@@ -38,14 +38,11 @@ namespace cta {
       failures++;
     }
     
-    virtual void asyncSetJobSucceed() override {
+    virtual void reportJobSucceeded() {
       completes++;
     }
-    virtual bool checkAndAsyncReportComplete() override {
-      return false;
-    }    
+    
     virtual void validate() override  {}
-    virtual void writeToCatalogue() override {}
     virtual catalogue::TapeFileWritten validateAndGetTapeFileWritten() override {
       catalogue::TapeFileWritten fileReport;
       fileReport.archiveFileId = archiveFile.archiveFileID;
diff --git a/scheduler/testingMocks/MockArchiveMount.hpp b/scheduler/testingMocks/MockArchiveMount.hpp
index cc9aa79900..b059ca436c 100644
--- a/scheduler/testingMocks/MockArchiveMount.hpp
+++ b/scheduler/testingMocks/MockArchiveMount.hpp
@@ -49,6 +49,38 @@ namespace cta {
         }
       }
       
+      void reportJobsBatchWritten(std::queue<std::unique_ptr<cta::ArchiveJob> >& successfulArchiveJobs, 
+        cta::log::LogContext& logContext) override {
+        try {
+          std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten;
+          std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;
+          std::unique_ptr<cta::ArchiveJob> job;
+          while(!successfulArchiveJobs.empty()) {
+            // Get the next job to report and make sure we will not attempt to process it twice.
+            job = std::move(successfulArchiveJobs.front());
+            successfulArchiveJobs.pop();
+            if (!job.get()) continue;        
+            tapeFilesWritten.insert(job->validateAndGetTapeFileWritten());
+            validatedSuccessfulArchiveJobs.emplace_back(std::move(job));      
+            job.reset(nullptr);
+          }
+          m_catalogue.filesWrittenToTape(tapeFilesWritten);
+          for (auto &job: validatedSuccessfulArchiveJobs) {
+            MockArchiveJob * maj = dynamic_cast<MockArchiveJob *>(job.get());
+            if (!maj) throw cta::exception::Exception("Wrong job type.");
+            maj->reportJobSucceeded();
+            logContext.log(log::INFO, "Reported to the client a full file archival.");
+          }
+          logContext.log(log::INFO, "Reported to the client that a batch of files was written on tape");
+        } catch(const cta::exception::Exception& e){
+          cta::log::ScopedParamContainer params(logContext);
+          params.add("exceptionMessageValue", e.getMessageValue());
+          const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception";
+          logContext.log(cta::log::ERR, msg_error);
+          throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error);
+        }
+      }
+
       void complete() override {
         completes++;
       }
diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp
index fbc4f9e734..fcc187c6a8 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp
@@ -201,7 +201,7 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
       reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing.");
       return;
     }
-    reportPacker.m_archiveMount->reportJobsBatchWritten(std::move(reportPacker.m_successfulArchiveJobs), reportPacker.m_lc);
+    reportPacker.m_archiveMount->reportJobsBatchWritten(reportPacker.m_successfulArchiveJobs, reportPacker.m_lc);
   } else {
     // This is an abnormal situation: we should never flush after an error!
     reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client");
diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp
index 908f5e8c7f..db831632cd 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp
@@ -69,16 +69,7 @@ namespace unitTests {
        int & completes, int &failures):
     MockArchiveJob(am, catalogue), completesRef(completes), failuresRef(failures) {}
     
-    virtual void asyncSetJobSucceed() override {
-      completesRef++;
-    }
-    
-    virtual bool checkAndAsyncReportComplete() override {
-      return false;
-    }
-    
     virtual void validate() override {}
-    virtual void writeToCatalogue()  override {}
     virtual cta::catalogue::TapeFileWritten validateAndGetTapeFileWritten() override {
       cta::catalogue::TapeFileWritten fileReport;
       fileReport.archiveFileId = archiveFile.archiveFileID;
@@ -105,6 +96,11 @@ namespace unitTests {
     void failed(const cta::exception::Exception& ex, cta::log::LogContext & lc) override {
       failuresRef++;
     }
+    
+    void reportJobSucceeded() override {
+      completesRef++;
+    }
+
   private:
     int & completesRef;
     int & failuresRef;
-- 
GitLab