From 95f9943ec8b17ecefd4593c4bfc192d020f38d4e Mon Sep 17 00:00:00 2001
From: Victor Kotlyar <Victor.Kotlyar@cern.ch>
Date: Wed, 19 Jul 2017 16:53:26 +0200
Subject: [PATCH] Make archive reporting on the flush for the batch of jobs.

Changed reporting to the Catalog with a batch of written files.
Changed synchronous reporting to the backend job by job to the
asynchronous reporting for batch of jobs.
Changed synchronous reporting to the EOS mgm to the asynchronous
reporting.
---
 eos/DiskReporter.hpp                          |   1 +
 eos/DiskReporterFactory.cpp                   |   5 +-
 eos/DiskReporterFactory.hpp                   |   3 +-
 eos/EOSReporter.cpp                           |  47 ++++-
 eos/EOSReporter.hpp                           |  19 +-
 eos/NullReporter.hpp                          |   1 +
 objectstore/ArchiveRequest.cpp                |  45 +++++
 objectstore/ArchiveRequest.hpp                |  13 ++
 objectstore/Backend.hpp                       |   1 +
 objectstore/BackendRados.cpp                  |  59 ++++--
 objectstore/BackendVFS.cpp                    |  33 +++-
 objectstore/ObjectOps.hpp                     |   6 +
 scheduler/ArchiveJob.cpp                      | 108 ++++++----
 scheduler/ArchiveJob.hpp                      |  48 ++++-
 scheduler/ArchiveMount.cpp                    |  10 +-
 scheduler/ArchiveMount.hpp                    |  14 +-
 scheduler/OStoreDB/OStoreDB.cpp               |  28 +--
 scheduler/OStoreDB/OStoreDB.hpp               |   4 +-
 scheduler/SchedulerDatabase.hpp               |   6 +-
 scheduler/SchedulerDatabaseTest.cpp           |   6 +-
 scheduler/SchedulerTest.cpp                   |   5 +-
 scheduler/testingMocks/MockArchiveJob.hpp     |  37 +++-
 .../daemon/MigrationReportPacker.cpp          | 166 +++++++++++-----
 .../daemon/MigrationReportPacker.hpp          |  12 ++
 .../daemon/MigrationReportPackerTest.cpp      | 186 ++++++++++++++++--
 25 files changed, 703 insertions(+), 160 deletions(-)

diff --git a/eos/DiskReporter.hpp b/eos/DiskReporter.hpp
index affaf001de..72441f2232 100644
--- a/eos/DiskReporter.hpp
+++ b/eos/DiskReporter.hpp
@@ -23,6 +23,7 @@ namespace cta { namespace eos {
 class DiskReporter {
 public:
   virtual void reportArchiveFullyComplete() = 0;
+  virtual void asyncReportArchiveFullyComplete() = 0;
   virtual ~DiskReporter() {};
 };
 
diff --git a/eos/DiskReporterFactory.cpp b/eos/DiskReporterFactory.cpp
index a49735eb03..4be5df6936 100644
--- a/eos/DiskReporterFactory.cpp
+++ b/eos/DiskReporterFactory.cpp
@@ -24,14 +24,15 @@
 
 namespace cta { namespace eos {
 
-DiskReporter* DiskReporterFactory::createDiskReporter(const std::string URL) {
+DiskReporter* DiskReporterFactory::createDiskReporter(const std::string URL, std::promise<void> &reporterState) {
   threading::MutexLocker ml(m_mutex);
   auto regexResult = m_EosUrlRegex.exec(URL);
   if (regexResult.size()) {
-    return new EOSReporter(regexResult[1], regexResult[2]);
+    return new EOSReporter(regexResult[1], regexResult[2], reporterState);
   }
   regexResult = m_NullRegex.exec(URL);
   if (regexResult.size()) {
+    reporterState.set_value();
     return new NullReporter();
   }
   throw cta::exception::Exception(
diff --git a/eos/DiskReporterFactory.hpp b/eos/DiskReporterFactory.hpp
index 3c52eb4596..be3ffac3c7 100644
--- a/eos/DiskReporterFactory.hpp
+++ b/eos/DiskReporterFactory.hpp
@@ -23,12 +23,13 @@
 #include "common/threading/Mutex.hpp"
 
 #include <string>
+#include <future>
 
 namespace cta { namespace eos {
 
 class DiskReporterFactory {
 public:
-  DiskReporter * createDiskReporter(const std::string URL);
+  DiskReporter * createDiskReporter(const std::string URL, std::promise<void> &sreporterState);
 private:
   // The typical call to give report to EOS will be:
   // xrdfs localhost query opaquefile "/eos/wfe/passwd?mgm.pcmd=event&mgm.fid=112&mgm.logid=cta&mgm.event=migrated&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0"
diff --git a/eos/EOSReporter.cpp b/eos/EOSReporter.cpp
index d48967dbf9..4905da0f39 100644
--- a/eos/EOSReporter.cpp
+++ b/eos/EOSReporter.cpp
@@ -16,13 +16,15 @@
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+#include <future>
+
 #include "EOSReporter.hpp"
 #include "common/exception/XrootCl.hpp"
 
 namespace cta { namespace eos {
 
-EOSReporter::EOSReporter(const std::string& hostURL, const std::string& queryValue):
-  m_fs(hostURL), m_query(queryValue) {}
+EOSReporter::EOSReporter(const std::string& hostURL, const std::string& queryValue, std::promise<void>& reporterState):
+  m_fs(hostURL), m_query(queryValue), m_reporterState(reporterState) {}
 
 
 void EOSReporter::reportArchiveFullyComplete() {
@@ -30,11 +32,48 @@ void EOSReporter::reportArchiveFullyComplete() {
   XrdCl::Buffer arg (m_query.size());
   arg.FromString(m_query);
   XrdCl::Buffer * resp = nullptr;
-  const uint16_t queryTimeout = 15; // Timeout in seconds that is rounded up to the nearest 15 seconds
-  XrdCl::XRootDStatus status=m_fs.Query(qcOpaque, arg, resp, queryTimeout);
+  XrdCl::XRootDStatus status=m_fs.Query(qcOpaque, arg, resp, CTA_EOS_QUERY_TIMEOUT);
   delete (resp);
   cta::exception::XrootCl::throwOnError(status,
       "In EOSReporter::reportArchiveFullyComplete(): failed to XrdCl::FileSystem::Query()");
 } 
 
+void EOSReporter::asyncReportArchiveFullyComplete() {
+  auto qcOpaque = XrdCl::QueryCode::OpaqueFile;
+  XrdCl::Buffer arg (m_query.size());
+  arg.FromString(m_query);
+  AsyncQueryHandler *handler = new AsyncQueryHandler(m_reporterState);
+  XrdCl::XRootDStatus status=m_fs.Query( qcOpaque, arg, handler, CTA_EOS_QUERY_TIMEOUT);
+  cta::exception::XrootCl::throwOnError(status,
+      "In EOSReporter::asyncReportArchiveFullyComplete(): failed to XrdCl::FileSystem::Query()");
+}
+
+//------------------------------------------------------------------------------
+//EOSReporter::AsyncQueryHandler::AsyncQueryHandler
+//------------------------------------------------------------------------------
+EOSReporter::AsyncQueryHandler::AsyncQueryHandler(std::promise<void> &handlerPromise):
+  m_handlerPromise(handlerPromise) {}
+
+//------------------------------------------------------------------------------
+//EOSReporter::AsyncQueryHandler::HandleResponse
+//------------------------------------------------------------------------------
+void EOSReporter::AsyncQueryHandler::HandleResponse(XrdCl::XRootDStatus *status,
+                                                    XrdCl::AnyObject    *response) {
+  try {
+    cta::exception::XrootCl::throwOnError(*status,
+      "In EOSReporter::AsyncQueryHandler::HandleResponse(): failed to XrdCl::FileSystem::Query()");
+  } catch (...) {
+    try {
+      // store anything thrown in the promise
+      m_handlerPromise.set_exception(std::current_exception());
+    } catch(...) {
+      // set_exception() may throw too
+    }
+  }
+
+  m_handlerPromise.set_value();
+  delete response;
+  delete status;
+  delete this;
+  }
 }} // namespace cta::disk
diff --git a/eos/EOSReporter.hpp b/eos/EOSReporter.hpp
index c3c78de642..c35fe54909 100644
--- a/eos/EOSReporter.hpp
+++ b/eos/EOSReporter.hpp
@@ -21,15 +21,28 @@
 #include "DiskReporter.hpp"
 #include <XrdCl/XrdClFileSystem.hh>
 
-namespace cta { namespace eos {
+#include <future>
 
+namespace cta { namespace eos {
+const uint16_t CTA_EOS_QUERY_TIMEOUT = 15; // Timeout in seconds that is rounded up to the nearest 15 seconds
+    
 class EOSReporter: public DiskReporter {
 public:
-  EOSReporter(const std::string & hostURL, const std::string & queryValue);
+  EOSReporter(const std::string & hostURL, const std::string & queryValue, std::promise<void> &reporterState);
   void reportArchiveFullyComplete() override;
+  void asyncReportArchiveFullyComplete() override;
 private:
   XrdCl::FileSystem m_fs;
   std::string m_query;
+  std::promise<void> &m_reporterState;
+  class AsyncQueryHandler: public XrdCl::ResponseHandler {
+  public:
+    AsyncQueryHandler(std::promise<void> &handlerPromise);
+    virtual void HandleResponse(XrdCl::XRootDStatus *status,
+                                  XrdCl::AnyObject    *response);
+  private:
+    std::promise<void> &m_handlerPromise;
+  };
 };
 
-}} // namespace cta::disk
\ No newline at end of file
+}} // namespace cta::disk
diff --git a/eos/NullReporter.hpp b/eos/NullReporter.hpp
index e4d66ebc22..86d1786a5c 100644
--- a/eos/NullReporter.hpp
+++ b/eos/NullReporter.hpp
@@ -26,6 +26,7 @@ class NullReporter: public DiskReporter {
 public:
   NullReporter() {};
   void reportArchiveFullyComplete() override {};
+  void asyncReportArchiveFullyComplete() override {};
 };
 
 }} // namespace cta::disk
\ No newline at end of file
diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp
index fd781dacba..2601a8f66c 100644
--- a/objectstore/ArchiveRequest.cpp
+++ b/objectstore/ArchiveRequest.cpp
@@ -461,6 +461,51 @@ const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() {
   return m_srcURL;
 }
 
+ArchiveRequest::AsyncJobSuccessfulUpdater * ArchiveRequest::asyncUpdateJobSuccessful(const uint16_t copyNumber ) { 
+  std::unique_ptr<AsyncJobSuccessfulUpdater> ret(new AsyncJobSuccessfulUpdater);  
+  // Passing a reference to the unique pointer led to strange behaviors.
+  auto & retRef = *ret;
+  ret->m_updaterCallback=
+    [this,copyNumber, &retRef](const std::string &in)->std::string { 
+      // We have a locked and fetched object, so we just need to work on its representation.
+      serializers::ObjectHeader oh;
+      oh.ParseFromString(in);
+      if (oh.type() != serializers::ObjectType::ArchiveRequest_t) {
+        std::stringstream err;
+        err << "In ArchiveRequest::asyncUpdateJobSuccessful()::lambda(): wrong object type: " << oh.type();
+        throw cta::exception::Exception(err.str());
+      }
+      serializers::ArchiveRequest payload;
+      payload.ParseFromString(oh.payload());
+      auto * jl = payload.mutable_jobs();
+      for (auto j=jl->begin(); j!=jl->end(); j++) {
+        if (j->copynb() == copyNumber) {
+          j->set_status(serializers::ArchiveJobStatus::AJS_Complete);
+          for (auto j2=jl->begin(); j2!=jl->end(); j2++) {
+            if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete && 
+                j2->status()!= serializers::ArchiveJobStatus::AJS_Failed) {
+                retRef.m_isLastJob = false;
+                oh.set_payload(payload.SerializePartialAsString());
+                return oh.SerializeAsString();
+            }
+          }
+          retRef.m_isLastJob = true;
+          oh.set_payload(payload.SerializePartialAsString());
+          throw cta::objectstore::Backend::AsyncUpdateWithDelete(oh.SerializeAsString());
+        }
+      }
+      std::stringstream err;
+      err << "In ArchiveRequest::asyncUpdateJobSuccessful()::lambda(): copyNb not found";
+      throw cta::exception::Exception(err.str());
+    };
+  ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(), ret->m_updaterCallback));
+  return ret.release();
+}
+
+void ArchiveRequest::AsyncJobSuccessfulUpdater::wait() {
+  m_backendUpdater->wait();
+}
+
 std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) {
   checkPayloadReadable();
   auto jl = m_payload.jobs();
diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp
index bf6a8f72d3..d56d3d0998 100644
--- a/objectstore/ArchiveRequest.hpp
+++ b/objectstore/ArchiveRequest.hpp
@@ -76,6 +76,19 @@ public:
   // An job owner updater factory. The owner MUST be previousOwner for the update to be executed.
   CTA_GENERATE_EXCEPTION_CLASS(WrongPreviousOwner);
   AsyncJobOwnerUpdater * asyncUpdateJobOwner(uint16_t copyNumber, const std::string & owner, const std::string &previousOwner);
+
+  // An asynchronous job updating class for success.
+  class AsyncJobSuccessfulUpdater {
+    friend class ArchiveRequest;
+  public:
+    void wait();
+    bool m_isLastJob;
+  private:
+    std::function<std::string(const std::string &)> m_updaterCallback;
+    std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater;
+  };
+  AsyncJobSuccessfulUpdater * asyncUpdateJobSuccessful(uint16_t copyNumber);
+
   // Get a job owner
   std::string getJobOwner(uint16_t copyNumber);
   // Request management ========================================================
diff --git a/objectstore/Backend.hpp b/objectstore/Backend.hpp
index a20bd81e86..7b9a5ca0db 100644
--- a/objectstore/Backend.hpp
+++ b/objectstore/Backend.hpp
@@ -109,6 +109,7 @@ public:
   CTA_GENERATE_EXCEPTION_CLASS(CouldNotUpdateValue);
   CTA_GENERATE_EXCEPTION_CLASS(CouldNotCommit);
   CTA_GENERATE_EXCEPTION_CLASS(CouldNotUnlock);
+  CTA_GENERATE_EXCEPTION_CLASS(AsyncUpdateWithDelete);
   
   /**
    * A base class handling asynchronous sequence of lock exclusive, fetch, call user 
diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp
index 47d890e427..18f7a3187d 100644
--- a/objectstore/BackendRados.cpp
+++ b/objectstore/BackendRados.cpp
@@ -382,25 +382,48 @@ void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion
                   std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") +
                   au.m_name + ": "+ ex.what());
             }
-            // Execute the user's callback. Let exceptions fly through. User knows his own exceptions.
-            value=au.m_update(value);
-            try {
-              // Prepare result in buffer list.
-              au.m_radosBufferList.clear();
-              au.m_radosBufferList.append(value);
-            } catch (std::exception & ex) {
-              throw CouldNotUpdateValue(
-                  std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") +
-                  au.m_name + ": " + ex.what());
+            
+            bool updateWithDelete = false;
+            try {      
+              // Execute the user's callback.
+              value=au.m_update(value);
+            } catch (AsyncUpdateWithDelete & ex) {
+              updateWithDelete = true;               
+            } catch (...) {
+              // Let exceptions fly through. User knows his own exceptions.
+              throw; 
             }
-            // Launch the write
-            librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr);
-            auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList);
-            aioc->release();
-            if (rc) {
-              cta::exception::Errnum errnum (-rc, 
-                std::string("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full(): ") + au.m_name);
-              throw Backend::CouldNotCommit(errnum.getMessageValue());
+             
+            if(updateWithDelete) {
+              try {
+                au.m_backend.remove(au.m_name);
+              } catch (cta::exception::Exception &ex) {
+                throw CouldNotUpdateValue(
+                    std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to remove value: ") +
+                    au.m_name + ex.what());
+              }
+              // Done!
+              ANNOTATE_HAPPENS_BEFORE(&au.m_job);
+              au.m_job.set_value();
+            } else {
+              try {
+                // Prepare result in buffer list.
+                au.m_radosBufferList.clear();
+                au.m_radosBufferList.append(value);
+              } catch (std::exception & ex) {
+                throw CouldNotUpdateValue(
+                    std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") + 
+                    au.m_name + ex.what());
+              }
+              // Launch the write
+              librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr);
+              auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList);
+              aioc->release();
+              if (rc) {
+                cta::exception::Errnum errnum (-rc, 
+                  "In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()" + au.m_name);
+                throw Backend::CouldNotCommit(errnum.getMessageValue());
+              }
             }
           } catch (...) {
             ANNOTATE_HAPPENS_BEFORE(&au.m_job);
diff --git a/objectstore/BackendVFS.cpp b/objectstore/BackendVFS.cpp
index 3cb0a2c99c..a8680b6565 100644
--- a/objectstore/BackendVFS.cpp
+++ b/objectstore/BackendVFS.cpp
@@ -329,13 +329,32 @@ BackendVFS::AsyncUpdater::AsyncUpdater(BackendVFS & be, const std::string& name,
         ANNOTATE_HAPPENS_BEFORE(&m_job);
         throw Backend::CouldNotFetch(ex.getMessageValue());
       }
-      // Let user's exceptions go through.
-      std::string postUpdateData=m_update(preUpdateData);
-      try {
-        m_backend.atomicOverwrite(m_name, postUpdateData);
-      } catch (cta::exception::Exception & ex) {
-        ANNOTATE_HAPPENS_BEFORE(&m_job);
-        throw Backend::CouldNotCommit(ex.getMessageValue());
+      
+      std::string postUpdateData;
+      bool updateWithDelete = false;
+      try {      
+        postUpdateData=m_update(preUpdateData);
+      } catch (AsyncUpdateWithDelete & ex) {
+        updateWithDelete = true;               
+      } catch (...) {
+        // Let user's exceptions go through.
+        throw; 
+      }
+      
+      if(updateWithDelete) {
+        try {
+          m_backend.remove(m_name);
+        } catch (cta::exception::Exception & ex) {
+          ANNOTATE_HAPPENS_BEFORE(&m_job);
+          throw Backend::CouldNotCommit(ex.getMessageValue());
+        }
+      } else { 
+        try {
+          m_backend.atomicOverwrite(m_name, postUpdateData);
+        } catch (cta::exception::Exception & ex) {
+          ANNOTATE_HAPPENS_BEFORE(&m_job);
+          throw Backend::CouldNotCommit(ex.getMessageValue());
+        }
       }
       try {
         sl->release();
diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp
index 727b1abf1a..f028186f82 100644
--- a/objectstore/ObjectOps.hpp
+++ b/objectstore/ObjectOps.hpp
@@ -125,6 +125,12 @@ public:
     m_payloadInterpreted = false;
   }
   
+  void resetValues () {
+    m_existingObject = false;
+    m_headerInterpreted = false;
+    m_payloadInterpreted = false;
+  }
+  
   void setOwner(const std::string & owner) {
     checkHeaderWritable();
     m_header.set_owner(owner);
diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp
index 10d9905f99..de37ff600d 100644
--- a/scheduler/ArchiveJob.cpp
+++ b/scheduler/ArchiveJob.cpp
@@ -41,28 +41,28 @@ cta::ArchiveJob::ArchiveJob(ArchiveMount &mount,
   tapeFile(tapeFile) {}
 
 //------------------------------------------------------------------------------
-// complete
+// asyncSetJobSucceed
 //------------------------------------------------------------------------------
-bool cta::ArchiveJob::complete() {
-  // First check that the block Id for the file has been set.
-  if (tapeFile.blockId ==
-      std::numeric_limits<decltype(tapeFile.blockId)>::max())
-    throw BlockIdNotSet("In cta::ArchiveJob::complete(): Block ID not set");
-  // Also check the checksum has been set
-  if (archiveFile.checksumType.empty() || archiveFile.checksumValue.empty() || 
-      tapeFile.checksumType.empty() || tapeFile.checksumValue.empty())
-    throw ChecksumNotSet("In cta::ArchiveJob::complete(): checksums not set");
-  // And matches
-  if (archiveFile.checksumType != tapeFile.checksumType || 
-      archiveFile.checksumValue != tapeFile.checksumValue)
-    throw ChecksumMismatch(std::string("In cta::ArchiveJob::complete(): checksum mismatch!")
-            +" Archive file checksum type: "+archiveFile.checksumType
-            +" Archive file checksum value: "+archiveFile.checksumValue
-            +" Tape file checksum type: "+tapeFile.checksumType
-            +" Tape file checksum value: "+tapeFile.checksumValue);
-  // We are good to go to record the data in the persistent storage.
-  // Record the data in the archiveNS. The checksum will be validated if already
-  // present, of inserted if not.
+void cta::ArchiveJob::asyncSetJobSucceed() {
+  m_dbJob->asyncSucceed();
+}
+
+//------------------------------------------------------------------------------
+// checkAndReportComplete
+//------------------------------------------------------------------------------
+bool cta::ArchiveJob::checkAndAsyncReportComplete() {
+  if (m_dbJob->checkSucceed()) {
+    std::unique_ptr<eos::DiskReporter> reporter(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState));
+    reporter->asyncReportArchiveFullyComplete();
+    return true;
+  }
+  return false;
+}
+
+//------------------------------------------------------------------------------
+// ArchiveJob::writeToCatalogue
+//------------------------------------------------------------------------------
+void cta::ArchiveJob::writeToCatalogue() {
   catalogue::TapeFileWritten fileReport;
   fileReport.archiveFileId = archiveFile.archiveFileID;
   fileReport.blockId = tapeFile.blockId;
@@ -82,18 +82,53 @@ bool cta::ArchiveJob::complete() {
   fileReport.tapeDrive = m_mount.getDrive();
   fileReport.vid = tapeFile.vid;
   m_catalogue.filesWrittenToTape (std::set<catalogue::TapeFileWritten>{fileReport});
-  //m_ns.addTapeFile(SecurityIdentity(UserIdentity(std::numeric_limits<uint32_t>::max(), 
-  //  std::numeric_limits<uint32_t>::max()), ""), archiveFile.fileId, nameServerTapeFile);
-  // We will now report the successful archival to the EOS instance.
-  // if  TODO TODO
-  // We can now record the success for the job in the database.
-  // If this is the last job of the request, we also report the success to the client.
-  if (m_dbJob->succeed()) {
-    std::unique_ptr<eos::DiskReporter> reporter(m_mount.createDiskReporter(m_dbJob->archiveReportURL));
-    reporter->reportArchiveFullyComplete();
-    return true;
-  }
-  return false;
+}
+//------------------------------------------------------------------------------
+// ArchiveJob::validateAndGetTapeFileWritten
+//------------------------------------------------------------------------------
+cta::catalogue::TapeFileWritten cta::ArchiveJob::validateAndGetTapeFileWritten() {
+  validate();
+  catalogue::TapeFileWritten fileReport;
+  fileReport.archiveFileId = archiveFile.archiveFileID;
+  fileReport.blockId = tapeFile.blockId;
+  fileReport.checksumType = tapeFile.checksumType;
+  fileReport.checksumValue = tapeFile.checksumValue;
+  fileReport.compressedSize = tapeFile.compressedSize;
+  fileReport.copyNb = tapeFile.copyNb;
+  fileReport.diskFileId = archiveFile.diskFileId;
+  fileReport.diskFileUser = archiveFile.diskFileInfo.owner;
+  fileReport.diskFileGroup = archiveFile.diskFileInfo.group;
+  fileReport.diskFilePath = archiveFile.diskFileInfo.path;
+  fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob;
+  fileReport.diskInstance = archiveFile.diskInstance;
+  fileReport.fSeq = tapeFile.fSeq;
+  fileReport.size = archiveFile.fileSize;
+  fileReport.storageClassName = archiveFile.storageClass;
+  fileReport.tapeDrive = m_mount.getDrive();
+  fileReport.vid = tapeFile.vid;
+  return fileReport;
+}
+
+//------------------------------------------------------------------------------
+// ArchiveJob::validate
+//------------------------------------------------------------------------------
+void cta::ArchiveJob::validate(){
+  // First check that the block Id for the file has been set.
+  if (tapeFile.blockId ==
+      std::numeric_limits<decltype(tapeFile.blockId)>::max())
+    throw BlockIdNotSet("In cta::ArchiveJob::validate(): Block ID not set");
+  // Also check the checksum has been set
+  if (archiveFile.checksumType.empty() || archiveFile.checksumValue.empty() || 
+      tapeFile.checksumType.empty() || tapeFile.checksumValue.empty())
+    throw ChecksumNotSet("In cta::ArchiveJob::validate(): checksums not set");
+  // And matches
+  if (archiveFile.checksumType != tapeFile.checksumType || 
+      archiveFile.checksumValue != tapeFile.checksumValue)
+    throw ChecksumMismatch(std::string("In cta::ArchiveJob::validate(): checksum mismatch!")
+            +" Archive file checksum type: "+archiveFile.checksumType
+            +" Archive file checksum value: "+archiveFile.checksumValue
+            +" Tape file checksum type: "+tapeFile.checksumType
+            +" Tape file checksum value: "+tapeFile.checksumValue);
 }
 
 //------------------------------------------------------------------------------
@@ -109,3 +144,10 @@ std::string cta::ArchiveJob::reportURL() {
 void cta::ArchiveJob::failed(const cta::exception::Exception &ex,  log::LogContext & lc) {
   m_dbJob->fail(lc);
 }
+
+//------------------------------------------------------------------------------
+// waitForReporting
+//------------------------------------------------------------------------------
+void cta::ArchiveJob::waitForReporting() {
+  m_reporterState.get_future().get();
+}
diff --git a/scheduler/ArchiveJob.hpp b/scheduler/ArchiveJob.hpp
index c042fe42f9..d94afd831d 100644
--- a/scheduler/ArchiveJob.hpp
+++ b/scheduler/ArchiveJob.hpp
@@ -25,6 +25,7 @@
 
 #include <stdint.h>
 #include <string>
+#include <future>
 
 namespace cta {
 
@@ -68,11 +69,37 @@ public:
   CTA_GENERATE_EXCEPTION_CLASS(BlockIdNotSet);
   CTA_GENERATE_EXCEPTION_CLASS(ChecksumNotSet);
   CTA_GENERATE_EXCEPTION_CLASS(ChecksumMismatch);
+  
+  /**
+   * Indicates that the job was successful and updates the backend store 
+   * asynchronously. 
+   */
+  virtual void asyncSetJobSucceed();
+  
+  /**
+   * Wait if the job was updated in the backend store asynchronously. 
+   * @return true if the archive was also sent to client asynchronously.
+   */
+  virtual bool checkAndAsyncReportComplete();
+  
+  /**
+   * Validate that archiveFile and tapeFile fields are set correctly for archive
+   * request.
+   * Throw appropriate exception if there is any problem.
+   */
+  virtual void validate();
+  
+  /**
+   * Update the catalog with the archive request.
+   */
+  virtual void writeToCatalogue();
+  
   /**
-   * Indicates that the job was successful and updates the backend store
-   * @return true if the archive was also reported to client.
+   * Validate that archiveFile and tapeFile fields are set correctly for archive
+   * request.
+   * @return The tapeFileWritten event for the catalog update.
    */
-  virtual bool complete();
+  virtual catalogue::TapeFileWritten validateAndGetTapeFileWritten();
   
   /**
    * Triggers a scheduler update following the failure of the job.
@@ -99,13 +126,19 @@ private:
    * The mount that generated this job
    */
   ArchiveMount &m_mount;
-  
+
   /**
    * Reference to the name server
    */
   catalogue::Catalogue &m_catalogue;
-public:
   
+  /**
+   * State for the asynchronous report to the client. 
+   */
+  std::promise<void> m_reporterState;
+      
+public:
+    
   CTA_GENERATE_EXCEPTION_CLASS(NotImplemented);
   
   /**
@@ -122,6 +155,11 @@ public:
    * The file archive result for the NS
    */
   common::dataStructures::TapeFile tapeFile;
+  
+  /**
+   * Wait for the reporterState is set by the reporting thread.
+   */
+  virtual void waitForReporting();
 
 }; // class ArchiveJob
 
diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp
index e3e429ec2b..93b6723504 100644
--- a/scheduler/ArchiveMount.cpp
+++ b/scheduler/ArchiveMount.cpp
@@ -77,8 +77,8 @@ uint32_t cta::ArchiveMount::getNbFiles() const {
 //------------------------------------------------------------------------------
 // createDiskReporter
 //------------------------------------------------------------------------------
-cta::eos::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL) {
-  return m_reporterFactory.createDiskReporter(URL);
+cta::eos::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL, std::promise<void> &reporterState) {
+  return m_reporterFactory.createDiskReporter(URL, reporterState);
 }
 
 //------------------------------------------------------------------------------
@@ -93,6 +93,12 @@ std::string cta::ArchiveMount::getMountTransactionId() const {
 }
 
 //------------------------------------------------------------------------------
+// updateCatalogueWithTapeFilesWritten
+//------------------------------------------------------------------------------
+void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten) {
+  m_catalogue.filesWrittenToTape(tapeFilesWritten);
+}
+
 // getNextJobBatch
 //------------------------------------------------------------------------------
 std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(uint64_t filesRequested, 
diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp
index 9f9bde38c2..f6584249d1 100644
--- a/scheduler/ArchiveMount.hpp
+++ b/scheduler/ArchiveMount.hpp
@@ -139,9 +139,17 @@ namespace cta {
     /**
      * Creates a disk reporter for the ArchiveJob (this is a wrapper).
      * @param URL: report address
-     * @return poitner to the reporter created.
+     * @param reporterState void promise to be set when the report is done asynchronously.
+     * @return pointer to the reporter created.
      */
-    eos::DiskReporter * createDiskReporter(std::string & URL);
+    eos::DiskReporter * createDiskReporter(std::string & URL, std::promise<void> &reporterState);
+    
+    /**
+     * Update the catalog with a set of TapeFileWritten events.
+     * 
+     * @param tapeFilesWritten The set of report events for the catalog update. 
+     */
+    void updateCatalogueWithTapeFilesWritten(const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten); 
     
     /**
      * Destructor.
@@ -159,7 +167,7 @@ namespace cta {
      * A reference to the file catalogue.
      */
     catalogue::Catalogue & m_catalogue;
-
+    
     /**
      * Internal tracking of the session completion
      */
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index 934c9ea25f..bc1148533c 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -2255,24 +2255,26 @@ void OStoreDB::ArchiveJob::bumpUpTapeFileCount(uint64_t newFileCount) {
 }
 
 //------------------------------------------------------------------------------
-// OStoreDB::ArchiveJob::succeed()
+// OStoreDB::ArchiveJob::asyncSucceed()
 //------------------------------------------------------------------------------
-bool OStoreDB::ArchiveJob::succeed() {
-  // Lock the request and set the job as successful.
-  objectstore::ScopedExclusiveLock atfrl(m_archiveRequest);
-  m_archiveRequest.fetch();
-  std::string atfrAddress = m_archiveRequest.getAddressIfSet();
-  bool lastJob=m_archiveRequest.setJobSuccessful(tapeFile.copyNb);
-  if (lastJob) {
-    m_archiveRequest.remove();
-  } else {
-    m_archiveRequest.commit();
+void OStoreDB::ArchiveJob::asyncSucceed() {  
+  m_jobUpdate.reset(m_archiveRequest.asyncUpdateJobSuccessful(tapeFile.copyNb));
+}
+
+//------------------------------------------------------------------------------
+// OStoreDB::ArchiveJob::checkSucceed()
+//------------------------------------------------------------------------------
+bool OStoreDB::ArchiveJob::checkSucceed() {  
+  m_jobUpdate->wait();
+  if (m_jobUpdate->m_isLastJob) {
+    m_archiveRequest.resetValues();
   }
   // We no more own the job (which could be gone)
   m_jobOwned = false;
   // Remove ownership from agent
-  m_agentReference.removeFromOwnership(atfrAddress, m_objectStore);
-  return lastJob;
+  const std::string atfrAddress = m_archiveRequest.getAddressIfSet();
+  m_agentReference.removeFromOwnership(atfrAddress, m_objectStore);  
+  return m_jobUpdate->m_isLastJob;
 }
 
 //------------------------------------------------------------------------------
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index ae73b7a016..feb9706f77 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -140,8 +140,9 @@ public:
   public:
     CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
     CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
-    bool succeed() override;
     void fail(log::LogContext & lc) override;
+    void asyncSucceed() override;
+    bool checkSucceed() override;
     void bumpUpTapeFileCount(uint64_t newFileCount) override;
     ~ArchiveJob() override;
   private:
@@ -156,6 +157,7 @@ public:
     objectstore::AgentReference & m_agentReference;
     objectstore::ArchiveRequest m_archiveRequest;
     ArchiveMount & m_archiveMount;
+    std::unique_ptr<objectstore::ArchiveRequest::AsyncJobSuccessfulUpdater> m_jobUpdate;
   };
   
   /* === Retrieve Mount handling ============================================ */
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index 217c0f251d..eff6e30dfc 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -185,9 +185,11 @@ public:
     std::string archiveReportURL;
     cta::common::dataStructures::ArchiveFile archiveFile;
     cta::common::dataStructures::TapeFile tapeFile;
-    /// Indicates a success to the DB. If this is the last job, return true.
-    virtual bool succeed() = 0;
     virtual void fail(log::LogContext & lc) = 0;
+    /// Indicates a success to the DB. 
+    virtual void asyncSucceed() = 0;
+    /// Check a succeed job status. If this is the last job, return true.
+    virtual bool checkSucceed() = 0;
     virtual void bumpUpTapeFileCount(uint64_t newFileCount) = 0;
     virtual ~ArchiveJob() {}
   };
diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp
index e923a331a2..04bed21469 100644
--- a/scheduler/SchedulerDatabaseTest.cpp
+++ b/scheduler/SchedulerDatabaseTest.cpp
@@ -205,7 +205,8 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
     if (aj.size()) {
       count++;
       //std::cout << aj->archiveFile.diskFileInfo.path << std::endl;
-      aj.front()->succeed();
+      aj.front()->asyncSucceed();
+      aj.front()->checkSucceed();
     }
     else
       done = true;
@@ -291,7 +292,8 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
     if (aj.size()) {
       count++;
       //std::cout << aj->archiveFile.diskFileInfo.path << std::endl;
-      aj.front()->succeed();
+      aj.front()->asyncSucceed();
+      aj.front()->checkSucceed();
     }
     else
       done = true;
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index eef9864aaa..6761270966 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -459,7 +459,10 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
     archiveJob->tapeFile.checksumValue = "1234abcd";
     archiveJob->tapeFile.compressedSize = archiveJob->archiveFile.fileSize;
     archiveJob->tapeFile.copyNb = 1;
-    archiveJob->complete();
+    archiveJob->validate();
+    archiveJob->writeToCatalogue();
+    archiveJob->asyncSetJobSucceed();
+    archiveJob->checkAndAsyncReportComplete();
     archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
     ASSERT_EQ(0, archiveJobBatch.size());
     archiveMount->complete();
diff --git a/scheduler/testingMocks/MockArchiveJob.hpp b/scheduler/testingMocks/MockArchiveJob.hpp
index 96375a20f7..53df1ac01b 100644
--- a/scheduler/testingMocks/MockArchiveJob.hpp
+++ b/scheduler/testingMocks/MockArchiveJob.hpp
@@ -33,14 +33,43 @@ namespace cta {
         completes(0), failures(0) {} 
       
     ~MockArchiveJob() throw() {} 
-
-    bool complete() override {
+    
+    void failed(const cta::exception::Exception& ex, log::LogContext & lc) override {
+      failures++;
+    }
+    
+    virtual void asyncSetJobSucceed() override {
       completes++;
+    }
+    virtual bool checkAndAsyncReportComplete() override {
       return false;
+    }    
+    virtual void validate() override  {}
+    virtual void writeToCatalogue() override {}
+    virtual catalogue::TapeFileWritten validateAndGetTapeFileWritten() override {
+      catalogue::TapeFileWritten fileReport;
+      fileReport.archiveFileId = archiveFile.archiveFileID;
+      fileReport.blockId = tapeFile.blockId;
+      fileReport.checksumType = tapeFile.checksumType;
+      fileReport.checksumValue = tapeFile.checksumValue;
+      fileReport.compressedSize = tapeFile.compressedSize;
+      fileReport.copyNb = tapeFile.copyNb;
+      fileReport.diskFileId = archiveFile.diskFileId;
+      fileReport.diskFileUser = archiveFile.diskFileInfo.owner;
+      fileReport.diskFileGroup = archiveFile.diskFileInfo.group;
+      fileReport.diskFilePath = archiveFile.diskFileInfo.path;
+      fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob;
+      fileReport.diskInstance = archiveFile.diskInstance;
+      fileReport.fSeq = tapeFile.fSeq;
+      fileReport.size = archiveFile.fileSize;
+      fileReport.storageClassName = archiveFile.storageClass;
+      fileReport.tapeDrive = "dummy";
+      fileReport.vid = tapeFile.vid;
+      return fileReport;
     }
-    
-    void failed(const cta::exception::Exception& ex, log::LogContext & lc) override {
+    virtual void failed(const cta::exception::Exception& ex) {
       failures++;
     }
+    virtual void retry() {}
   };
 }
diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp
index c5ad2e4609..0e004b22ee 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp
@@ -24,6 +24,7 @@
 #include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp"
 #include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp"
 #include "castor/tape/tapeserver/drive/DriveInterface.hpp"
+#include "catalogue/TapeFileWritten.hpp"
 
 #include <memory>
 #include <numeric>
@@ -205,59 +206,130 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
       reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing.");
       return;
     }
-    std::unique_ptr<cta::ArchiveJob> job;
-    try{
-      while(!reportPacker.m_successfulArchiveJobs.empty()) {
-        // Get the next job to report and make sure we will not attempt to process it twice.
-        job = std::move(reportPacker.m_successfulArchiveJobs.front());
-        reportPacker.m_successfulArchiveJobs.pop();
-        if (!job.get()) continue;
-        cta::log::ScopedParamContainer params(reportPacker.m_lc);
-        params.add("fileId", job->archiveFile.archiveFileID)
-              .add("diskInstance", job->archiveFile.diskInstance)
-              .add("diskFileId", job->archiveFile.diskFileId)
-              .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
-        if (job->complete()) {
-          params.add("reportURL", job->reportURL());
-          reportPacker.m_lc.log(cta::log::INFO,"Reported to the client a full file archival");
-        } else {
-          reportPacker.m_lc.log(cta::log::INFO, "Recorded the partial migration of a file");
-        }
-        job.reset(nullptr);
-      }
-      reportPacker.m_lc.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape");
-    } catch(const cta::exception::Exception& e){
-      cta::log::ScopedParamContainer params(reportPacker.m_lc);
-      params.add("exceptionMessageValue", e.getMessageValue());
-      if (job.get()) {
-        params.add("fileId", job->archiveFile.archiveFileID)
-              .add("diskInstance", job->archiveFile.diskInstance)
-              .add("diskFileId", job->archiveFile.diskFileId)
-              .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
-              .add("reportURL", job->reportURL());
-      }
-      const std::string msg_error="An exception was caught trying to call reportMigrationResults";
-      reportPacker.m_lc.log(cta::log::ERR, msg_error);
-      throw failedMigrationRecallResult(msg_error);
-    } catch(const std::exception& e){
-      cta::log::ScopedParamContainer params(reportPacker.m_lc);
-      params.add("exceptionWhat", e.what());
-      if (job.get()) {
-        params.add("fileId", job->archiveFile.archiveFileID)
-              .add("diskInstance", job->archiveFile.diskInstance)
-              .add("diskFileId", job->archiveFile.diskFileId)
-              .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
-      }
-      const std::string msg_error="An std::exception was caught trying to call reportMigrationResults";
-      reportPacker.m_lc.log(cta::log::ERR, msg_error);
-      throw failedMigrationRecallResult(msg_error);
-    }
+    proceedJobsBatch(reportPacker,std::move(reportPacker.m_successfulArchiveJobs), reportPacker.m_lc);    
   } else {
     // This is an abnormal situation: we should never flush after an error!
     reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client");
   }
 }
 
+//------------------------------------------------------------------------------
+//ReportFlush::proceedJobsBatch
+//------------------------------------------------------------------------------
+void MigrationReportPacker::ReportFlush::proceedJobsBatch(const MigrationReportPacker& reportPacker, std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, cta::log::LogContext &logContext){  
+  std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten;
+  std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;  
+  std::unique_ptr<cta::ArchiveJob> job;
+  try{
+    while(!successfulArchiveJobs.empty()) {
+      // Get the next job to report and make sure we will not attempt to process it twice.
+      job = std::move(successfulArchiveJobs.front());
+      successfulArchiveJobs.pop();
+      if (!job.get()) continue;        
+      tapeFilesWritten.insert(job->validateAndGetTapeFileWritten());
+      validatedSuccessfulArchiveJobs.emplace_back(std::move(job));      
+      job.reset(nullptr);
+    }
+    
+    updateCatalogueWithTapeFilesWritten(reportPacker, tapeFilesWritten, logContext);
+    asyncUpdateBackendWithJobsSucceeded(validatedSuccessfulArchiveJobs);
+    checkAndAsyncReportCompletedJobs(validatedSuccessfulArchiveJobs, logContext);   
+           
+    logContext.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape");    
+  } catch(const cta::exception::Exception& e){
+    cta::log::ScopedParamContainer params(logContext);
+    params.add("exceptionMessageValue", e.getMessageValue());
+    if (job.get()) {
+      params.add("fileId", job->archiveFile.archiveFileID)
+            .add("diskInstance", job->archiveFile.diskInstance)
+            .add("diskFileId", job->archiveFile.diskFileId)
+            .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
+            .add("reportURL", job->reportURL());
+    }
+    const std::string msg_error="An exception was caught trying to call reportMigrationResults";
+    logContext.log(cta::log::ERR, msg_error);
+    throw failedMigrationRecallResult(msg_error);
+  } catch(const std::exception& e){
+    cta::log::ScopedParamContainer params(logContext);
+    params.add("exceptionWhat", e.what());
+    if (job.get()) {
+      params.add("fileId", job->archiveFile.archiveFileID)
+            .add("diskInstance", job->archiveFile.diskInstance)
+            .add("diskFileId", job->archiveFile.diskFileId)
+            .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
+    }
+    const std::string msg_error="An std::exception was caught trying to call reportMigrationResults";
+    logContext.log(cta::log::ERR, msg_error);
+    throw failedMigrationRecallResult(msg_error);
+  }
+}
+
+//------------------------------------------------------------------------------
+//ReportFlush::asyncUpdateBackendWithJobsSucceeded
+//------------------------------------------------------------------------------
+void MigrationReportPacker::ReportFlush::asyncUpdateBackendWithJobsSucceeded(
+  const std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs) {
+  for (const auto &job: validatedSuccessfulArchiveJobs){
+    job->asyncSetJobSucceed();
+  }
+}
+
+//------------------------------------------------------------------------------
+//ReportFlush::checkAndAsyncReportCompletedJobs
+//------------------------------------------------------------------------------
+void MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs(
+   std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs,
+  cta::log::LogContext &logContext) {
+  std::list<std::unique_ptr <cta::ArchiveJob> > reportedArchiveJobs;
+  
+  for (auto &job: validatedSuccessfulArchiveJobs){
+    cta::log::ScopedParamContainer params(logContext);
+    params.add("fileId", job->archiveFile.archiveFileID)
+          .add("diskInstance", job->archiveFile.diskInstance)
+          .add("diskFileId", job->archiveFile.diskFileId)
+          .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
+    logContext.log(cta::log::DEBUG,
+      "In MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs()"
+      " check for async backend update finished");
+    if(job->checkAndAsyncReportComplete()) { 
+      params.add("reportURL", job->reportURL());    
+      reportedArchiveJobs.emplace_back(std::move(job));
+      logContext.log(cta::log::INFO,"Sent to the client a full file archival");
+    } else {
+      logContext.log(cta::log::INFO, "Recorded the partial migration of a file");
+    }
+  }
+  
+  for (const auto &job: reportedArchiveJobs){
+    try {
+      job->waitForReporting(); // should not be a deadWait as soon as we have a timeout on the xroot query
+      cta::log::ScopedParamContainer params(logContext);
+      params.add("reportURL", job->reportURL());
+      logContext.log(cta::log::INFO,"Reported to the client a full file archival");
+    } catch(cta::exception::Exception &ex) {
+      cta::log::ScopedParamContainer params(logContext);
+        params.add("reportURL", job->reportURL());
+        params.add("errorMessage", ex.getMessage().str());
+        logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:");
+    } catch(...) {
+      throw;
+    }
+  }
+}
+
+//------------------------------------------------------------------------------
+//ReportFlush::updateCatalogueWithTapeFilesWritten
+//------------------------------------------------------------------------------
+void MigrationReportPacker::ReportFlush::updateCatalogueWithTapeFilesWritten(
+        const MigrationReportPacker &reportPacker,
+        const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten,
+        cta::log::LogContext &logContext) {
+  reportPacker.m_archiveMount->updateCatalogueWithTapeFilesWritten(tapeFilesWritten);
+  cta::log::ScopedParamContainer params(logContext);
+  params.add("tapeFilesWritten", tapeFilesWritten.size());
+  logContext.log(cta::log::INFO,"Catalog updated for batch of jobs");    
+}
+
 //------------------------------------------------------------------------------
 //reportTapeFull()::execute
 //------------------------------------------------------------------------------
diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp
index dd014536c7..1bd7a6f2e0 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp
@@ -165,6 +165,18 @@ private:
       ReportFlush(drive::compressionStats compressStats):m_compressStats(compressStats){}
       
       void execute(MigrationReportPacker& reportPacker) override;
+      void proceedJobsBatch(const MigrationReportPacker& reportPacker,
+        std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, 
+        cta::log::LogContext &log);
+      void asyncUpdateBackendWithJobsSucceeded(
+        const std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs);
+      void checkAndAsyncReportCompletedJobs(
+         std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs,
+        cta::log::LogContext &logContext);
+      void updateCatalogueWithTapeFilesWritten(
+        const MigrationReportPacker &reportPacker,
+        const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten,
+        cta::log::LogContext &logContext);
   };
   class ReportTapeFull: public Report {
     public:
diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp
index 687e28a600..908f5e8c7f 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp
@@ -69,16 +69,42 @@ namespace unitTests {
        int & completes, int &failures):
     MockArchiveJob(am, catalogue), completesRef(completes), failuresRef(failures) {}
     
-    bool complete() override {
+    virtual void asyncSetJobSucceed() override {
       completesRef++;
+    }
+    
+    virtual bool checkAndAsyncReportComplete() override {
       return false;
     }
     
+    virtual void validate() override {}
+    virtual void writeToCatalogue()  override {}
+    virtual cta::catalogue::TapeFileWritten validateAndGetTapeFileWritten() override {
+      cta::catalogue::TapeFileWritten fileReport;
+      fileReport.archiveFileId = archiveFile.archiveFileID;
+      fileReport.blockId = tapeFile.blockId;
+      fileReport.checksumType = tapeFile.checksumType;
+      fileReport.checksumValue = tapeFile.checksumValue;
+      fileReport.compressedSize = tapeFile.compressedSize;
+      fileReport.copyNb = tapeFile.copyNb;
+      fileReport.diskFileId = archiveFile.diskFileId;
+      fileReport.diskFileUser = archiveFile.diskFileInfo.owner;
+      fileReport.diskFileGroup = archiveFile.diskFileInfo.group;
+      fileReport.diskFilePath = archiveFile.diskFileInfo.path;
+      fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob;
+      fileReport.diskInstance = archiveFile.diskInstance;
+      fileReport.fSeq = tapeFile.fSeq;
+      fileReport.size = archiveFile.fileSize;
+      fileReport.storageClassName = archiveFile.storageClass;
+      fileReport.tapeDrive = std::string("testDrive");
+      fileReport.vid = tapeFile.vid;
+      return fileReport;
+    }
+   
     
     void failed(const cta::exception::Exception& ex, cta::log::LogContext & lc) override {
       failuresRef++;
     }
-    
   private:
     int & completesRef;
     int & failuresRef;
@@ -86,7 +112,29 @@ namespace unitTests {
   
   TEST_F(castor_tape_tapeserver_daemon_MigrationReportPackerTest, MigrationReportPackerNominal) {
     cta::MockArchiveMount tam(*m_catalogue);
+    
+    const std::string vid1 = "VTEST001";
+    const std::string vid2 = "VTEST002";
+    const std::string logicalLibraryName = "logical_library_name";
+    const std::string tapePoolName = "tape_pool_name";
+    const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
+    const bool disabledValue = true;
+    const bool fullValue = false;
+    const std::string createTapeComment = "Create tape";
+    cta::common::dataStructures::SecurityIdentity admin = cta::common::dataStructures::SecurityIdentity("admin","localhost");
 
+    m_catalogue->createLogicalLibrary(admin, logicalLibraryName, "Create logical library");
+    m_catalogue->createTapePool(admin, tapePoolName, 2, true, "Create tape pool");
+    m_catalogue->createTape(admin, vid1, logicalLibraryName, tapePoolName, capacityInBytes,
+      disabledValue, fullValue, createTapeComment);
+
+    cta::common::dataStructures::StorageClass storageClass;
+    storageClass.diskInstance = "disk_instance";
+    storageClass.name = "storage_class";
+    storageClass.nbCopies = 1;
+    storageClass.comment = "Create storage class";
+    m_catalogue->createStorageClass(admin, storageClass);
+    
     ::testing::InSequence dummy;
     std::unique_ptr<cta::ArchiveJob> job1;
     int job1completes(0), job1failures(0);
@@ -95,6 +143,25 @@ namespace unitTests {
         new MockArchiveJobExternalStats(tam, *m_catalogue, job1completes, job1failures));
       job1.reset(mockJob.release());
     }
+    job1->archiveFile.archiveFileID=1;
+    job1->archiveFile.diskInstance="disk_instance";
+    job1->archiveFile.diskFileId="diskFileId1";
+    job1->archiveFile.diskFileInfo.path="filePath1";
+    job1->archiveFile.diskFileInfo.owner="testUser1";
+    job1->archiveFile.diskFileInfo.group="testGroup1";
+    job1->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob1";
+    job1->archiveFile.fileSize=1024;        
+    job1->archiveFile.checksumType="md5";
+    job1->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6";
+    job1->archiveFile.storageClass="storage_class";
+    job1->tapeFile.vid="VTEST001";
+    job1->tapeFile.fSeq=1;
+    job1->tapeFile.blockId=256;
+    job1->tapeFile.compressedSize=768;
+    job1->tapeFile.copyNb=1;
+    job1->tapeFile.checksumType="md5";
+    job1->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6";
+    
     std::unique_ptr<cta::ArchiveJob> job2;
     int job2completes(0), job2failures(0);
     {
@@ -102,7 +169,25 @@ namespace unitTests {
         new MockArchiveJobExternalStats(tam, *m_catalogue, job2completes, job2failures));
       job2.reset(mockJob.release());
     }
-
+    job2->archiveFile.archiveFileID=2;
+    job2->archiveFile.diskInstance="disk_instance";
+    job2->archiveFile.diskFileId="diskFileId2";
+    job2->archiveFile.diskFileInfo.path="filePath2";
+    job2->archiveFile.diskFileInfo.owner="testUser2";
+    job2->archiveFile.diskFileInfo.group="testGroup2";
+    job2->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob2";
+    job2->archiveFile.fileSize=1024;        
+    job2->archiveFile.checksumType="md5";
+    job2->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6";
+    job2->archiveFile.storageClass="storage_class";
+    job2->tapeFile.vid="VTEST001";
+    job2->tapeFile.fSeq=2;
+    job2->tapeFile.blockId=512;
+    job2->tapeFile.compressedSize=768;
+    job2->tapeFile.copyNb=1;
+    job2->tapeFile.checksumType="md5";
+    job2->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6";
+    
     cta::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerNominal",cta::log::DEBUG);
     cta::log::LogContext lc(log);
     tapeserver::daemon::MigrationReportPacker mrp(&tam,lc);
@@ -170,9 +255,31 @@ namespace unitTests {
     ASSERT_EQ(1, job3failures);
   }
 
-  TEST_F(castor_tape_tapeserver_daemon_MigrationReportPackerTest, MigrationReportPackerOneByteFile) {
+  TEST_F(castor_tape_tapeserver_daemon_MigrationReportPackerTest, MigrationReportPackerBadFile) {
     cta::MockArchiveMount tam(*m_catalogue);
+    
+    const std::string vid1 = "VTEST001";
+    const std::string vid2 = "VTEST002";
+    const std::string logicalLibraryName = "logical_library_name";
+    const std::string tapePoolName = "tape_pool_name";
+    const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
+    const bool disabledValue = true;
+    const bool fullValue = false;
+    const std::string createTapeComment = "Create tape";
+    cta::common::dataStructures::SecurityIdentity admin = cta::common::dataStructures::SecurityIdentity("admin","localhost");
 
+    m_catalogue->createLogicalLibrary(admin, logicalLibraryName, "Create logical library");
+    m_catalogue->createTapePool(admin, tapePoolName, 2, true, "Create tape pool");
+    m_catalogue->createTape(admin, vid1, logicalLibraryName, tapePoolName, capacityInBytes,
+      disabledValue, fullValue, createTapeComment);
+
+    cta::common::dataStructures::StorageClass storageClass;
+    storageClass.diskInstance = "disk_instance";
+    storageClass.name = "storage_class";
+    storageClass.nbCopies = 1;
+    storageClass.comment = "Create storage class";
+    m_catalogue->createStorageClass(admin, storageClass);
+    
     ::testing::InSequence dummy;
     std::unique_ptr<cta::ArchiveJob> migratedBigFile;
     int migratedBigFileCompletes(0), migratedBigFileFailures(0);
@@ -196,9 +303,62 @@ namespace unitTests {
       migratedNullFile.reset(mockJob.release());
     }
 
-    migratedBigFile->archiveFile.fileSize=100000;
-    migratedFileSmall->archiveFile.fileSize=1;
-    migratedNullFile->archiveFile.fileSize=0;
+    migratedBigFile->archiveFile.archiveFileID=4;
+    migratedBigFile->archiveFile.diskInstance="disk_instance";
+    migratedBigFile->archiveFile.diskFileId="diskFileId2";
+    migratedBigFile->archiveFile.diskFileInfo.path="filePath2";
+    migratedBigFile->archiveFile.diskFileInfo.owner="testUser2";
+    migratedBigFile->archiveFile.diskFileInfo.group="testGroup2";
+    migratedBigFile->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob2";
+    migratedBigFile->archiveFile.fileSize=100000;        
+    migratedBigFile->archiveFile.checksumType="md5";
+    migratedBigFile->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6";
+    migratedBigFile->archiveFile.storageClass="storage_class";
+    migratedBigFile->tapeFile.vid="VTEST001";
+    migratedBigFile->tapeFile.fSeq=1;
+    migratedBigFile->tapeFile.blockId=256;
+    migratedBigFile->tapeFile.compressedSize=768;
+    migratedBigFile->tapeFile.copyNb=1;
+    migratedBigFile->tapeFile.checksumType="md5";
+    migratedBigFile->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6";
+    
+    migratedFileSmall->archiveFile.archiveFileID=5;
+    migratedFileSmall->archiveFile.diskInstance="disk_instance";
+    migratedFileSmall->archiveFile.diskFileId="diskFileId3";
+    migratedFileSmall->archiveFile.diskFileInfo.path="filePath3";
+    migratedFileSmall->archiveFile.diskFileInfo.owner="testUser2";
+    migratedFileSmall->archiveFile.diskFileInfo.group="testGroup2";
+    migratedFileSmall->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob2";
+    migratedFileSmall->archiveFile.fileSize=1;        
+    migratedFileSmall->archiveFile.checksumType="md5";
+    migratedFileSmall->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6";
+    migratedFileSmall->archiveFile.storageClass="storage_class";
+    migratedFileSmall->tapeFile.vid="VTEST001";
+    migratedFileSmall->tapeFile.fSeq=2;
+    migratedFileSmall->tapeFile.blockId=512;
+    migratedFileSmall->tapeFile.compressedSize=1;
+    migratedFileSmall->tapeFile.copyNb=1;
+    migratedFileSmall->tapeFile.checksumType="md5";
+    migratedFileSmall->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6";
+    
+    migratedNullFile->archiveFile.archiveFileID=6;
+    migratedNullFile->archiveFile.diskInstance="disk_instance";
+    migratedNullFile->archiveFile.diskFileId="diskFileId4";
+    migratedNullFile->archiveFile.diskFileInfo.path="filePath4";
+    migratedNullFile->archiveFile.diskFileInfo.owner="testUser2";
+    migratedNullFile->archiveFile.diskFileInfo.group="testGroup2";
+    migratedNullFile->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob2";
+    migratedNullFile->archiveFile.fileSize=0;        
+    migratedNullFile->archiveFile.checksumType="md5";
+    migratedNullFile->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6";
+    migratedNullFile->archiveFile.storageClass="storage_class";
+    migratedNullFile->tapeFile.vid="VTEST001";
+    migratedNullFile->tapeFile.fSeq=3;
+    migratedNullFile->tapeFile.blockId=768;
+    migratedNullFile->tapeFile.compressedSize=0;
+    migratedNullFile->tapeFile.copyNb=1;
+    migratedNullFile->tapeFile.checksumType="md5";
+    migratedFileSmall->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; 
     
     cta::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerOneByteFile",cta::log::DEBUG);
     cta::log::LogContext lc(log);  
@@ -216,10 +376,12 @@ namespace unitTests {
     mrp.waitThread();
 
     std::string temp = log.getLog();
-    ASSERT_NE(std::string::npos, temp.find("Reported to the client that a batch of files was written on tape"));
-    ASSERT_EQ(1, tam.completes);
-    ASSERT_EQ(1, migratedBigFileCompletes);
-    ASSERT_EQ(1, migratedFileSmallCompletes);
-    ASSERT_EQ(1, migratedNullFileCompletes);
+    ASSERT_NE(std::string::npos, temp.find("TapeFileWrittenEvent is invalid"));
+    ASSERT_NE(std::string::npos, temp.find("Successfully closed client's session "
+                                           "after the failed report MigrationResult"));
+    ASSERT_EQ(0, tam.completes);
+    ASSERT_EQ(0, migratedBigFileCompletes);
+    ASSERT_EQ(0, migratedFileSmallCompletes);
+    ASSERT_EQ(0, migratedNullFileCompletes);
   } 
 }
-- 
GitLab