From b9f69318f4c23b7497f79e53ee7158866f37ecd1 Mon Sep 17 00:00:00 2001
From: Victor Kotlyar <Victor.Kotlyar@cern.ch>
Date: Wed, 2 Aug 2017 09:01:11 +0200
Subject: [PATCH] Implement batch reporting to the backend for successful
 retrieve jobs.

Proceed all successful reports asynchronously and periodically check
and clear statuses if they have finished.
In the end of session do the check/flush for all reports in the
successful reports queue.

Switch from synchronous rados remove to async aio_remove in
case of zero size object in BackendRados::AsyncUpdater
---
 objectstore/Backend.hpp                       |  29 +++
 objectstore/BackendRados.cpp                  | 179 +++++++++++++++++-
 objectstore/BackendRados.hpp                  |  41 ++++
 objectstore/BackendVFS.cpp                    |  40 ++++
 objectstore/BackendVFS.hpp                    |  18 ++
 objectstore/ObjectOps.hpp                     |   4 +-
 objectstore/RetrieveRequest.cpp               |  10 +
 objectstore/RetrieveRequest.hpp               |   9 +
 scheduler/OStoreDB/OStoreDB.cpp               |  28 +--
 scheduler/OStoreDB/OStoreDB.hpp               |   4 +-
 scheduler/RetrieveJob.cpp                     |  15 +-
 scheduler/RetrieveJob.hpp                     |  18 +-
 scheduler/SchedulerDatabase.hpp               |   3 +-
 scheduler/SchedulerTest.cpp                   |   3 +-
 scheduler/testingMocks/MockRetrieveJob.hpp    |   4 +-
 .../tapeserver/daemon/RecallReportPacker.cpp  |  90 ++++++++-
 .../tapeserver/daemon/RecallReportPacker.hpp  |  23 +++
 .../daemon/RecallReportPackerTest.cpp         |   5 +-
 tests/helgrind.suppr                          |  49 +++++
 19 files changed, 534 insertions(+), 38 deletions(-)

diff --git a/objectstore/Backend.hpp b/objectstore/Backend.hpp
index 7b9a5ca0db..0de0bbe4e7 100644
--- a/objectstore/Backend.hpp
+++ b/objectstore/Backend.hpp
@@ -108,6 +108,7 @@ public:
   CTA_GENERATE_EXCEPTION_CLASS(CouldNotFetch);
   CTA_GENERATE_EXCEPTION_CLASS(CouldNotUpdateValue);
   CTA_GENERATE_EXCEPTION_CLASS(CouldNotCommit);
+  CTA_GENERATE_EXCEPTION_CLASS(CouldNotDelete);
   CTA_GENERATE_EXCEPTION_CLASS(CouldNotUnlock);
   CTA_GENERATE_EXCEPTION_CLASS(AsyncUpdateWithDelete);
   
@@ -138,6 +139,34 @@ public:
    */
   virtual AsyncUpdater * asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) = 0;
   
+  
+  /**
+   * A base class handling asynchronous sequence of lock exclusive, delete.
+   * Each operation will be asynchronous, and the result
+   * (success or exception) will be returned via the wait() function call. 
+   */
+  class AsyncDeleter { 
+ public:
+    /**
+     * Waits for completion (success) of throws exception (failure).
+     */
+    virtual void wait() = 0;
+    
+    /**
+     * Destructor
+     */
+    virtual ~AsyncDeleter() {}
+  };
+  
+  /**
+   * Triggers the asynchronous object delete sequence, as described in 
+   * AsyncDeleter class description.
+   * 
+   * @param name The name of the object to be deleted.
+   * @return pointer to a newly created AsyncDeleter
+   */
+  virtual AsyncDeleter * asyncDelete(const std::string & name) = 0;
+  
   /**
    * Base class for the representation of the parameters of the BackendStore.
    */
diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp
index 18f7a3187d..4631049d8c 100644
--- a/objectstore/BackendRados.cpp
+++ b/objectstore/BackendRados.cpp
@@ -338,12 +338,17 @@ void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion,
           std::string("In BackendRados::AsyncUpdater::statCallback(): could not stat object: ") + au.m_name);
       throw Backend::NoSuchObject(errnum.getMessageValue());
     }
-    // Check the size. If zero, we locked an empty object: delete and throw an exception.
+    // Check the size. If zero, we locked an empty object: delete and throw an exception in the deleteCallback
     if (!au.m_size) {
-      // TODO. This is going to lock the callback thread of the rados context for a while.
-      // As this is not supposde to happen often, this is acceptable.
-      au.m_backend.remove(au.m_name);
-      throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::statCallback(): no such object: ") + au.m_name);
+      // launch the delete operation (async).
+      librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteEmptyCallback, nullptr);
+      auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc);
+      aioc->release();
+      if (rc) {
+        cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+au.m_name);
+        throw Backend::CouldNotDelete(errnum.getMessageValue());
+      }
+      return;
     }
     // Stat is done, we can launch the read operation (async).
     librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, fetchCallback, nullptr);
@@ -359,6 +364,23 @@ void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion,
   }
 }
 
+void BackendRados::AsyncUpdater::deleteEmptyCallback(librados::completion_t completion, void* pThis) {
+  AsyncUpdater & au = *((AsyncUpdater *) pThis);
+  try {
+    // Check that the object could be deleted.
+    if (rados_aio_get_return_value(completion)) {
+      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
+          std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): could not delete object: ") + au.m_name);
+      throw Backend::CouldNotDelete(errnum.getMessageValue());
+    }
+    // object deleted then throw an exception
+    throw Backend::NoSuchObject(std::string("In BackendRados::AsyncUpdater::deleteEmptyCallback(): no such object: ") + au.m_name);
+  } catch (...) {
+    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
+    au.m_job.set_exception(std::current_exception());
+  }
+}
+
 void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion, void* pThis) {
   AsyncUpdater & au = *((AsyncUpdater *) pThis);
   try {
@@ -484,6 +506,153 @@ void BackendRados::AsyncUpdater::wait() {
   ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
 }
 
+
+Backend::AsyncDeleter* BackendRados::asyncDelete(const std::string & name)
+{
+  return new AsyncDeleter(*this, name);
+}
+
+BackendRados::AsyncDeleter::AsyncDeleter(BackendRados& be, const std::string& name):
+  m_backend(be), m_name(name), m_job(), m_jobFuture(m_job.get_future()) {
+  // At construction time, we just fire a lock.
+  try {
+    // Rados does not have aio_lock, so we do it in an async.
+    // Operation is lock (synchronous), and then launch an async stat, then read.
+    // The async function never fails, exceptions go to the promise (as everywhere).
+    m_lockAsync.reset(new std::future<void>(std::async(std::launch::async,
+        [this](){
+          try {
+            m_lockClient = BackendRados::createUniqueClientId();
+            struct timeval tv;
+            tv.tv_usec = 0;
+            tv.tv_sec = 60;
+            int rc;
+            // TODO: could be improved (but need aio_lock in rados, not available at the time
+            // of writing).
+            // Crude backoff: we will measure the RTT of the call and backoff a faction of this amount multiplied
+            // by the number of tries (and capped by a maximum). Then the value will be randomized 
+            // (betweend and 50-150%)
+            size_t backoff=1;
+            utils::Timer t;
+            while (true) {
+              rc = m_backend.m_radosCtx.lock_exclusive(m_name, "lock", m_lockClient, "", &tv, 0);
+              if (-EBUSY != rc) break;
+              timespec ts;
+              auto wait=t.usecs(utils::Timer::resetCounter)*backoff++/c_backoffFraction;
+              wait = std::min(wait, c_maxWait);
+              if (backoff>c_maxBackoff) backoff=1;
+              // We need to get a random number [50, 150]
+              std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
+              std::uniform_int_distribution<size_t> distribution(50, 150);
+              decltype(wait) randFactor=distribution(dre);
+              wait=(wait * randFactor)/100;
+              ts.tv_sec = wait/(1000*1000);
+              ts.tv_nsec = (wait % (1000*1000)) * 1000;
+              nanosleep(&ts, nullptr);
+            }
+            if (rc) {
+              cta::exception::Errnum errnum(-rc,
+                std::string("In BackendRados::AsyncDeleter::statCallback::lock_lambda(): failed to librados::IoCtx::lock_exclusive: ") +
+                m_name + "/" + "lock" + "/" + m_lockClient + "//");
+              throw CouldNotLock(errnum.getMessageValue());
+            }
+            // Locking is done, we can launch the stat operation (async).
+            librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, statCallback, nullptr);
+            rc=m_backend.m_radosCtx.aio_stat(m_name, aioc, &m_size, &date);
+            aioc->release();
+            if (rc) {
+              cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncDeleter::AsyncDeleter::lock_lambda(): failed to launch aio_stat(): ")+m_name);
+              throw Backend::NoSuchObject(errnum.getMessageValue());
+            }
+          } catch (...) {
+            ANNOTATE_HAPPENS_BEFORE(&m_job);
+            m_job.set_exception(std::current_exception());
+          }
+        }
+        )));
+  } catch (...) {
+    ANNOTATE_HAPPENS_BEFORE(&m_job);
+    m_job.set_exception(std::current_exception());
+  }
+}
+
+void BackendRados::AsyncDeleter::statCallback(librados::completion_t completion, void* pThis) {
+  AsyncDeleter & au = *((AsyncDeleter *) pThis);
+  try {
+    // Get the object size (it's already locked).
+    if (rados_aio_get_return_value(completion)) {
+      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
+          std::string("In BackendRados::AsyncDeleter::statCallback(): could not stat object: ") + au.m_name);
+      throw Backend::NoSuchObject(errnum.getMessageValue());
+    }
+    // Check the size. If zero, we locked an empty object: delete and throw an exception.
+    if (!au.m_size) {
+      // launch the delete operation (async).
+      librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteEmptyCallback, nullptr);
+      auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc);
+      aioc->release();
+      if (rc) {
+        cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncDeleter::statCallback():"
+          " failed to launch aio_remove() for zero size object: ")+au.m_name);
+        throw Backend::CouldNotDelete(errnum.getMessageValue());
+      }
+      return;
+    }
+    // Stat is done, we can launch the delete operation (async).
+    librados::AioCompletion * aioc = librados::Rados::aio_create_completion(&au, deleteCallback, nullptr);
+    auto rc=au.m_backend.m_radosCtx.aio_remove(au.m_name, aioc);
+    aioc->release();
+    if (rc) {
+      cta::exception::Errnum errnum (-rc, std::string("In BackendRados::AsyncUpdater::statCallback(): failed to launch aio_remove(): ")+au.m_name);
+      throw Backend::CouldNotDelete(errnum.getMessageValue());
+    }
+  } catch (...) {
+    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
+    au.m_job.set_exception(std::current_exception());
+  }
+}
+
+void BackendRados::AsyncDeleter::deleteCallback(librados::completion_t completion, void* pThis) {
+  AsyncDeleter & au = *((AsyncDeleter *) pThis);
+  try {
+    // Check that the object could be deleted.
+    if (rados_aio_get_return_value(completion)) {
+      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
+          std::string("In BackendRados::AsyncDeleter::deleteCallback(): could not delete object: ") + au.m_name);
+      throw Backend::CouldNotDelete(errnum.getMessageValue());
+    }
+    // Done!
+    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
+    au.m_job.set_value();
+  } catch (...) {
+    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
+    au.m_job.set_exception(std::current_exception());
+  }
+}
+
+void BackendRados::AsyncDeleter::deleteEmptyCallback(librados::completion_t completion, void* pThis) {
+  AsyncDeleter & au = *((AsyncDeleter *) pThis);
+  try {
+    // Check that the object could be deleted.
+    if (rados_aio_get_return_value(completion)) {
+      cta::exception::Errnum errnum(-rados_aio_get_return_value(completion),
+          std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): could not delete object: ") + au.m_name);
+      throw Backend::CouldNotDelete(errnum.getMessageValue());
+    }
+    // object deleted then throw an exception
+    throw Backend::NoSuchObject(std::string("In BackendRados::AsyncDeleter::deleteEmptyCallback(): no such object: ") + au.m_name);
+  } catch (...) {
+    ANNOTATE_HAPPENS_BEFORE(&au.m_job);
+    au.m_job.set_exception(std::current_exception());
+  }
+}
+
+void BackendRados::AsyncDeleter::wait() {
+  m_jobFuture.get();
+  ANNOTATE_HAPPENS_AFTER(&m_job);
+  ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
+} 
+
 std::string BackendRados::Parameters::toStr() {
   std::stringstream ret;
   ret << "userId=" << m_userId << " pool=" << m_pool;
diff --git a/objectstore/BackendRados.hpp b/objectstore/BackendRados.hpp
index 81421b82fd..b4895ac574 100644
--- a/objectstore/BackendRados.hpp
+++ b/objectstore/BackendRados.hpp
@@ -120,6 +120,8 @@ public:
     std::unique_ptr<std::future<void>> m_updateAsync;
     /** The first callback operation (after checking existence) */
     static void statCallback(librados::completion_t completion, void *pThis);
+    /** Async delete in case of zero sized object */
+    static void deleteEmptyCallback(librados::completion_t completion, void *pThis);
     /** The second callback operation (after reading) */
     static void fetchCallback(librados::completion_t completion, void *pThis);
     /** The third callback operation (after writing) */
@@ -130,6 +132,45 @@ public:
   
   Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override;
 
+  /**
+   * A class following up the check existence-lock-delete. 
+   * Constructor implicitly starts the lock step.
+   */
+  class AsyncDeleter: public Backend::AsyncDeleter {
+  public:
+    AsyncDeleter(BackendRados & be, const std::string & name);
+    void wait() override;
+  private:
+    /** A reference to the backend */
+    BackendRados &m_backend;
+    /** The object name */
+    const std::string m_name;
+    /** Storage for stat operation (size) */
+    uint64_t m_size;
+    /** Storage for stat operation (date) */
+    time_t date;
+    /** The promise that will both do the job and allow synchronization with the caller. */
+    std::promise<void> m_job;
+    /** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */
+    std::future<void> m_jobFuture;
+    /** A future used to hold the structure of the lock operation. It will be either empty of complete at 
+     destruction time */
+    std::unique_ptr<std::future<void>> m_lockAsync;
+    /** A string used to identify the locker */
+    std::string m_lockClient;
+    /** A future the hole the the structure of the update operation. It will be either empty of complete at 
+     destruction time */
+    std::unique_ptr<std::future<void>> m_updateAsync;
+    /** The first callback operation (after checking existence) */
+    static void statCallback(librados::completion_t completion, void *pThis);
+    /** The second callback operation (after deleting) */
+    static void deleteCallback(librados::completion_t completion, void *pThis);
+    /** Async delete in case of zero sized object */
+    static void deleteEmptyCallback(librados::completion_t completion, void *pThis);
+  };
+  
+  Backend::AsyncDeleter* asyncDelete(const std::string & name) override;
+  
   class Parameters: public Backend::Parameters {
     friend class BackendRados;
   public:
diff --git a/objectstore/BackendVFS.cpp b/objectstore/BackendVFS.cpp
index a8680b6565..dbef4b2681 100644
--- a/objectstore/BackendVFS.cpp
+++ b/objectstore/BackendVFS.cpp
@@ -377,6 +377,46 @@ void BackendVFS::AsyncUpdater::wait() {
   ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
 }
 
+BackendVFS::AsyncDeleter::AsyncDeleter(BackendVFS & be, const std::string& name):
+  m_backend(be), m_name(name),
+  m_job(std::async(std::launch::async, 
+    [&](){
+      std::unique_ptr<ScopedLock> sl;
+      try { // locking already throws proper exceptions for no such file.
+        sl.reset(m_backend.lockExclusive(m_name));
+      } catch (Backend::NoSuchObject &) {
+        ANNOTATE_HAPPENS_BEFORE(&m_job);
+        throw;
+      } catch (cta::exception::Exception & ex) {
+        ANNOTATE_HAPPENS_BEFORE(&m_job);
+        throw Backend::CouldNotLock(ex.getMessageValue());
+      }
+      try {
+        m_backend.remove(m_name);
+      } catch (cta::exception::Exception & ex) {
+        ANNOTATE_HAPPENS_BEFORE(&m_job);
+        throw Backend::CouldNotDelete(ex.getMessageValue());
+      }
+      try {
+        sl->release();
+      } catch (cta::exception::Exception & ex) {
+        ANNOTATE_HAPPENS_BEFORE(&m_job);
+        throw Backend::CouldNotUnlock(ex.getMessageValue());
+      }
+      ANNOTATE_HAPPENS_BEFORE(&m_job);
+    })) 
+{}
+
+Backend::AsyncDeleter* BackendVFS::asyncDelete(const std::string & name) {
+  // Create the object. Done.
+  return new AsyncDeleter(*this, name);
+}
+
+void BackendVFS::AsyncDeleter::wait() {
+  m_job.get();
+  ANNOTATE_HAPPENS_AFTER(&m_job);
+  ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
+}
 
 std::string BackendVFS::Parameters::toStr() {
   std::stringstream ret;
diff --git a/objectstore/BackendVFS.hpp b/objectstore/BackendVFS.hpp
index d55523f294..51398cede0 100644
--- a/objectstore/BackendVFS.hpp
+++ b/objectstore/BackendVFS.hpp
@@ -107,9 +107,27 @@ public:
      /** The future that will both do the job and allow synchronization with the caller. */
     std::future<void> m_job;
   };
+
+  /**
+   * A class mimicking AIO using C++ async tasks
+   */
+  class AsyncDeleter: public Backend::AsyncDeleter {
+  public:
+    AsyncDeleter(BackendVFS & be, const std::string & name);
+    void wait() override;
+  private:
+    /** A reference to the backend */
+    BackendVFS &m_backend;
+    /** The object name */
+    const std::string m_name;
+     /** The future that will both do the job and allow synchronization with the caller. */
+    std::future<void> m_job;
+  };
   
   Backend::AsyncUpdater* asyncUpdate(const std::string & name, std::function <std::string(const std::string &)> & update) override;
   
+  Backend::AsyncDeleter* asyncDelete(const std::string & name) override;
+  
   class Parameters: public Backend::Parameters {
     friend class BackendVFS;
   public:
diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp
index 3dbd6b3aef..661c61024e 100644
--- a/objectstore/ObjectOps.hpp
+++ b/objectstore/ObjectOps.hpp
@@ -124,13 +124,13 @@ public:
     m_headerInterpreted = false;
     m_payloadInterpreted = false;
   }
-  
+   
   void resetValues () {
     m_existingObject = false;
     m_headerInterpreted = false;
     m_payloadInterpreted = false;
   }
-  
+   
   void setOwner(const std::string & owner) {
     checkHeaderWritable();
     m_header.set_owner(owner);
diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp
index f8623f3777..1c71cc2dbc 100644
--- a/objectstore/RetrieveRequest.cpp
+++ b/objectstore/RetrieveRequest.cpp
@@ -476,5 +476,15 @@ std::string RetrieveRequest::dump() {
   return headerDump;
 }
 
+RetrieveRequest::AsyncJobDeleter * RetrieveRequest::asyncDeleteJob() {
+  std::unique_ptr<AsyncJobDeleter> ret(new AsyncJobDeleter);
+  ret->m_backendDeleter.reset(m_objectStore.asyncDelete(getAddressIfSet()));
+  return ret.release();
+}
+
+void RetrieveRequest::AsyncJobDeleter::wait() {
+ m_backendDeleter->wait();
+}
+
 }} // namespace cta::objectstore
 
diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp
index 7a1f3f22c6..2e8efbed04 100644
--- a/objectstore/RetrieveRequest.hpp
+++ b/objectstore/RetrieveRequest.hpp
@@ -57,6 +57,15 @@ public:
     uint32_t totalRetries;
     // TODO: status
   };
+  // An asynchronous job ownership updating class.
+  class AsyncJobDeleter {
+    friend class RetrieveRequest;
+  public:
+    void wait();
+  private:
+    std::unique_ptr<Backend::AsyncDeleter> m_backendDeleter;
+  };
+  AsyncJobDeleter * asyncDeleteJob();
   JobDump getJob(uint16_t copyNb);
   std::list<JobDump> getJobs();
   bool addJobFailure(uint16_t copyNumber, uint64_t mountId); /**< Returns true is the request is completely failed 
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index 4437c1427f..abc1acf316 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -2594,26 +2594,28 @@ OStoreDB::RetrieveJob::~RetrieveJob() {
 }
 
 //------------------------------------------------------------------------------
-// OStoreDB::RetrieveJob::succeed()
+// OStoreDB::RetrieveJob::asyncSucceed()
 //------------------------------------------------------------------------------
-void OStoreDB::RetrieveJob::succeed() {
-  // Lock the request and set the request as successful (delete it).
-  utils::Timer t;
-  objectstore::ScopedExclusiveLock rtfrl(m_retrieveRequest);
-  m_retrieveRequest.fetch();
-  std::string rtfrAddress = m_retrieveRequest.getAddressIfSet();
-  m_retrieveRequest.remove();
+void OStoreDB::RetrieveJob::asyncSucceed() {
+  // set the request as successful (delete it).
+  m_jobDelete.reset(m_retrieveRequest.asyncDeleteJob());
+}
+
+//------------------------------------------------------------------------------
+// OStoreDB::RetrieveJob::checkSucceed()
+//------------------------------------------------------------------------------
+void OStoreDB::RetrieveJob::checkSucceed() {
+  m_jobDelete->wait();
+  m_retrieveRequest.resetValues();
+  // We no more own the job (which could be gone)
   m_jobOwned = false;
   // Remove ownership form the agent
+  const std::string rtfrAddress = m_retrieveRequest.getAddressIfSet();
   m_agentReference.removeFromOwnership(rtfrAddress, m_objectStore);
-  log::LogContext lc(m_logger);
-  log::ScopedParamContainer params(lc);
-  params.add("requestObject", rtfrAddress)
-        .add("schedulerDbTime", t.secs());
-  lc.log(log::INFO, "In RetrieveJob::succeed(): deleted completed retrieve request.");
 }
 
 
+
 } // namespace cta
 
 
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index df6da7fc8b..9d22e05fe1 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -188,7 +188,8 @@ public:
   public:
     CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
     CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
-    virtual void succeed() override;
+    virtual void asyncSucceed() override;
+    virtual void checkSucceed() override;
     virtual void fail(log::LogContext &) override;
     virtual ~RetrieveJob() override;
   private:
@@ -202,6 +203,7 @@ public:
     objectstore::AgentReference & m_agentReference;
     objectstore::RetrieveRequest m_retrieveRequest;
     OStoreDB::RetrieveMount & m_retrieveMount;
+    std::unique_ptr<objectstore::RetrieveRequest::AsyncJobDeleter> m_jobDelete;
   };
   
   /* === Archive requests handling  ========================================= */
diff --git a/scheduler/RetrieveJob.cpp b/scheduler/RetrieveJob.cpp
index d27da1c188..f9a31574ee 100644
--- a/scheduler/RetrieveJob.cpp
+++ b/scheduler/RetrieveJob.cpp
@@ -40,12 +40,19 @@ cta::RetrieveJob::RetrieveJob(RetrieveMount &mount,
   transferredSize(std::numeric_limits<decltype(transferredSize)>::max()) {}
 
 //------------------------------------------------------------------------------
-// complete
+// asyncComplete
 //------------------------------------------------------------------------------
-void cta::RetrieveJob::complete() {
-  m_dbJob->succeed();
+void cta::RetrieveJob::asyncComplete() {
+  m_dbJob->asyncSucceed();
 }
-  
+
+//------------------------------------------------------------------------------
+// checkComplete
+//------------------------------------------------------------------------------
+void cta::RetrieveJob::checkComplete() {
+  m_dbJob->checkSucceed();
+}
+
 //------------------------------------------------------------------------------
 // failed
 //------------------------------------------------------------------------------
diff --git a/scheduler/RetrieveJob.hpp b/scheduler/RetrieveJob.hpp
index d17ccde95f..ef61d045b6 100644
--- a/scheduler/RetrieveJob.hpp
+++ b/scheduler/RetrieveJob.hpp
@@ -72,14 +72,20 @@ public:
    * Destructor.
    */
   virtual ~RetrieveJob() throw();
-
+  
+  /**
+   * Asynchronously indicates to the backend that the job was successful. 
+   * The checksum and the size of the transfer should already stored in the 
+   * object beforehand. Result setting and calling complete are done in 2 
+   * different threads (disk write and reporter thread, respectively).
+   */
+  virtual void asyncComplete();
+  
   /**
-   * Indicates that the job was successful. The checksum and the size of the 
-   * transfer should already stored in the object beforehand. Result setting
-   * and calling complete are done in 2 different threads (disk write and 
-   * reporter thread, respectively).
+   * Check that asynchronous complete is finished and cleanup the job structures
+   * 
    */
-  virtual void complete();
+  virtual void checkComplete();
   
   /**
    * Indicates that the job failed. Like for complete(), reason for failure
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index 89426ea1ed..d4321688e6 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -336,7 +336,8 @@ public:
     cta::common::dataStructures::RetrieveRequest retrieveRequest;
     cta::common::dataStructures::ArchiveFile archiveFile;
     uint64_t selectedCopyNb;
-    virtual void succeed() = 0;
+    virtual void asyncSucceed() = 0;
+    virtual void checkSucceed() = 0;
     virtual void fail(log::LogContext &) = 0;
     virtual ~RetrieveJob() {}
   };
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index 35bcc403a4..ba9b9a845b 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -521,7 +521,8 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
     ASSERT_EQ(1, jobBatch.size());
     retrieveJob.reset(jobBatch.front().release());
     ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get());
-    retrieveJob->complete();
+    retrieveJob->asyncComplete();
+    retrieveJob->checkComplete();
     jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
     ASSERT_EQ(0, jobBatch.size());
   }
diff --git a/scheduler/testingMocks/MockRetrieveJob.hpp b/scheduler/testingMocks/MockRetrieveJob.hpp
index 3693a0a8f8..dbb9d2d3a1 100644
--- a/scheduler/testingMocks/MockRetrieveJob.hpp
+++ b/scheduler/testingMocks/MockRetrieveJob.hpp
@@ -33,8 +33,8 @@ namespace cta {
     cta::PositioningMethod::ByBlock), completes(0), failures(0) {
       archiveFile.tapeFiles[1];
     } 
-    
-    virtual void complete() override { completes++;  }
+    virtual void asyncComplete() override { completes++;  }
+    virtual void checkComplete() override {}
     virtual void failed(cta::log::LogContext &) override { failures++; };
     
     ~MockRetrieveJob() throw() {} 
diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp
index 7b7576c502..2466fe5483 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp
@@ -112,7 +112,14 @@ void RecallReportPacker::reportTestGoingToEnd(){
 //ReportSuccessful::execute
 //------------------------------------------------------------------------------
 void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
-  m_successfulRetrieveJob->complete();
+  m_successfulRetrieveJob->asyncComplete();
+}
+
+//------------------------------------------------------------------------------
+//ReportSuccessful::waitForAsyncExecuteFinished
+//------------------------------------------------------------------------------
+void RecallReportPacker::ReportSuccessful::waitForAsyncExecuteFinished(){
+  m_successfulRetrieveJob->checkComplete();
 }
 
 //------------------------------------------------------------------------------
@@ -217,6 +224,8 @@ void RecallReportPacker::WorkerThread::run(){
   m_parent.m_lc.pushOrReplace(Param("thread", "RecallReportPacker"));
   m_parent.m_lc.log(cta::log::DEBUG, "Starting RecallReportPacker thread");
   bool endFound = false;
+  
+  std::list <std::unique_ptr<Report>> reportedSuccessfully;
   while(1) {
     std::string debugType;
     std::unique_ptr<Report> rep(m_parent.m_fifo.pop());
@@ -242,6 +251,17 @@ void RecallReportPacker::WorkerThread::run(){
     // as opposed to migrations where one failure fails the session.
     try {
       rep->execute(m_parent);
+      if (typeid(*rep) == typeid(RecallReportPacker::ReportSuccessful)) {
+        reportedSuccessfully.emplace_back(std::move(rep)); 
+        cta::utils::Timer timing;
+        const unsigned int checkedReports = m_parent.flushCheckAndFinishAsyncExecute(reportedSuccessfully);    
+        if(checkedReports) {
+          cta::log::ScopedParamContainer params(m_parent.m_lc);
+          params.add("checkedReports", checkedReports)
+                .add("reportingTime", timing.secs());
+          m_parent.m_lc.log(cta::log::DEBUG, "After flushCheckAndFinishAsyncExecute()");
+        }
+      }
     } catch(const cta::exception::Exception& e){
       //we get there because to tried to close the connection and it failed
       //either from the catch a few lines above or directly from rep->execute
@@ -275,6 +295,42 @@ void RecallReportPacker::WorkerThread::run(){
     }
     if (endFound) break;
   }
+  
+  try {
+    cta::utils::Timer timing;
+    const unsigned int checkedReports = m_parent.fullCheckAndFinishAsyncExecute(reportedSuccessfully); 
+    if (checkedReports) {
+      cta::log::ScopedParamContainer params(m_parent.m_lc);
+      params.add("checkedReports", checkedReports)
+            .add("reportingTime", timing.secs());
+      m_parent.m_lc.log(cta::log::DEBUG, "After fullCheckAndFinishAsyncExecute()");    
+    } 
+  } catch(const cta::exception::Exception& e){
+      cta::log::ScopedParamContainer params(m_parent.m_lc);
+      params.add("exceptionWhat", e.getMessageValue())
+            .add("exceptionType", typeid(e).name());
+      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a CTA exception.");
+      if (m_parent.m_watchdog) {
+        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
+        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
+      } 
+  } catch(const std::exception& e){
+      cta::log::ScopedParamContainer params(m_parent.m_lc);
+      params.add("exceptionWhat", e.what())
+            .add("exceptionType", typeid(e).name());
+      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got a standard exception.");
+      if (m_parent.m_watchdog) {
+        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
+        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
+      }
+  } catch(...){
+      m_parent.m_lc.log(cta::log::ERR, "Tried to report and got an unknown exception.");
+      if (m_parent.m_watchdog) {
+        m_parent.m_watchdog->addToErrorCount("Error_clientCommunication");
+        m_parent.m_watchdog->addParameter(cta::log::Param("status","failure"));
+      }
+  } 
+  
   // Drain the fifo in case we got an exception
   if (!endFound) {
     while (1) {
@@ -305,6 +361,38 @@ bool RecallReportPacker::errorHappened() {
   return m_errorHappened || (m_watchdog && m_watchdog->errorHappened());
 }
 
+//------------------------------------------------------------------------------
+//flushCheckAndFinishAsyncExecute()
+//------------------------------------------------------------------------------
+unsigned int RecallReportPacker::flushCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully) {
+  unsigned int checkedReports = 0;
+  if (reportedSuccessfully.size() >= RECALL_REPORT_PACKER_FLUSH_SIZE) {
+    while (!reportedSuccessfully.empty()) {
+      std::unique_ptr<Report> report=std::move(reportedSuccessfully.back());
+      reportedSuccessfully.pop_back();
+      if (!report.get()) continue;
+      report->waitForAsyncExecuteFinished();
+      checkedReports ++;
+    }
+  }  
+  return checkedReports;
+}
+
+//------------------------------------------------------------------------------
+//fullCheckAndFinishAsyncExecute()
+//------------------------------------------------------------------------------
+unsigned int RecallReportPacker::fullCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully) {
+  unsigned int checkedReports = 0;
+  while (!reportedSuccessfully.empty()) {
+    std::unique_ptr<Report> report=std::move(reportedSuccessfully.back());
+    reportedSuccessfully.pop_back();
+    if (!report.get()) continue;
+    report->waitForAsyncExecuteFinished();
+    checkedReports++;
+  }
+  return checkedReports;
+}
+
 //------------------------------------------------------------------------------
 //reportTapeDone()
 //------------------------------------------------------------------------------
diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp
index d917307c2b..3d8defd399 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp
@@ -125,6 +125,7 @@ private:
   public:
     virtual ~Report(){}
     virtual void execute(RecallReportPacker& packer)=0;
+    virtual void waitForAsyncExecuteFinished() {};
     virtual bool goingToEnd() {return false;}
   };
   class ReportTestGoingToEnd :  public Report {
@@ -144,6 +145,7 @@ private:
     ReportSuccessful(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob): 
     m_successfulRetrieveJob(std::move(successfulRetrieveJob)){}
     void execute(RecallReportPacker& reportPacker) override;
+    void waitForAsyncExecuteFinished() override;
   };
   class ReportError : public Report {
     /**
@@ -220,6 +222,27 @@ private:
    * Tracking of the disk thread end
    */
   bool m_diskThreadComplete;
+public:
+  /*
+   * Check if flush limit is reached and proceed finish procedure for async execute
+   *  
+   * @param reportedSuccessfuly The successful reports to check
+   * @return The number of reports proceeded
+   */
+  unsigned int flushCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully);
+  
+  /*
+   * Proceed finish procedure for async execute for all reports.
+   *  
+   * @param reportedSuccessfuly The successful reports to check
+   * @return The number of reports proceeded
+   */
+  unsigned int fullCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully);
+  
+  /*
+   * The limit for successful reports to trigger flush.
+   */
+  const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 32;
 };
 
 }}}}
diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp
index 9f254e454d..ff950c5d0e 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp
@@ -51,11 +51,12 @@ protected:
   public:
     MockRetrieveJobExternalStats(cta::RetrieveMount & rm, int & completes, int &failures):
     MockRetrieveJob(rm), completesRef(completes), failuresRef(failures) {}
-    
-    virtual void complete() override {
+
+    virtual void asyncComplete() override {
       completesRef++;
     }
     
+    virtual void checkComplete() override {}
     
     virtual void failed(cta::log::LogContext &) override {
       failuresRef++;
diff --git a/tests/helgrind.suppr b/tests/helgrind.suppr
index 0d762ca6a9..f21bee3b7e 100644
--- a/tests/helgrind.suppr
+++ b/tests/helgrind.suppr
@@ -469,6 +469,55 @@
    fun:_ZN9__gnu_cxx13new_allocatorINSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncUpdaterC1ERS6_RKSsRSt8functionIFSsSA_EEEUlvE_vEEvEEE9constructISI_ISH_EEEvPT_DpOT0_
 }
 
+{
+   SharePtrRace3
+   Helgrind:Race
+   fun:_ZNSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS1_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_vEEED1Ev
+   fun:_ZN9__gnu_cxx13new_allocatorINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS3_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS8_RKSsEUlvE_vEEvEC1EOSF_EUlvE_vEEEEE7destroyISL_EEvPT_
+   fun:_ZNSt16allocator_traitsISaINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEEEE10_S_destroyISK_EENSt9enable_ifIXsrNSM_16__destroy_helperIT_EE5valueEvE4typeERSL_PSQ_
+   fun:_ZNSt16allocator_traitsISaINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEEEE7destroyISK_EEvRSL_PT_
+   fun:_ZNSt23_Sp_counted_ptr_inplaceINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEESaISK_ELN9__gnu_cxx12_Lock_policyE2EE10_M_disposeEv
+   fun:_ZNSt16_Sp_counted_baseILN9__gnu_cxx12_Lock_policyE2EE10_M_releaseEv
+   fun:_ZNSt14__shared_countILN9__gnu_cxx12_Lock_policyE2EED1Ev
+   fun:_ZNSt12__shared_ptrINSt6thread10_Impl_baseELN9__gnu_cxx12_Lock_policyE2EED1Ev
+   fun:_ZNSt10shared_ptrINSt6thread10_Impl_baseEED1Ev
+   fun:_ZNSt6threadC1IZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_IEEEOT_DpOT0_
+   fun:_ZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_
+   fun:_ZN9__gnu_cxx13new_allocatorINSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEEE9constructISE_ISD_EEEvPT_DpOT0_
+}
+
+{
+   SharePtrRace4
+   Helgrind:Race
+   fun:_ZNSt6thread10_Impl_baseD1Ev
+   fun:_ZNSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS1_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_vEEED1Ev
+   fun:_ZN9__gnu_cxx13new_allocatorINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS3_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS8_RKSsEUlvE_vEEvEC1EOSF_EUlvE_vEEEEE7destroyISL_EEvPT_
+   fun:_ZNSt16allocator_traitsISaINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEEEE10_S_destroyISK_EENSt9enable_ifIXsrNSM_16__destroy_helperIT_EE5valueEvE4typeERSL_PSQ_
+   fun:_ZNSt16allocator_traitsISaINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEEEE7destroyISK_EEvRSL_PT_
+   fun:_ZNSt23_Sp_counted_ptr_inplaceINSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS2_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS7_RKSsEUlvE_vEEvEC1EOSE_EUlvE_vEEEESaISK_ELN9__gnu_cxx12_Lock_policyE2EE10_M_disposeEv
+   fun:_ZNSt16_Sp_counted_baseILN9__gnu_cxx12_Lock_policyE2EE10_M_releaseEv
+   fun:_ZNSt14__shared_countILN9__gnu_cxx12_Lock_policyE2EED1Ev
+   fun:_ZNSt12__shared_ptrINSt6thread10_Impl_baseELN9__gnu_cxx12_Lock_policyE2EED1Ev
+   fun:_ZNSt10shared_ptrINSt6thread10_Impl_baseEED1Ev
+   fun:_ZNSt6threadC1IZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_IEEEOT_DpOT0_
+   fun:_ZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_
+}
+
+{
+   SharePtrRace5
+   Helgrind:Race
+   fun:_ZSt9call_onceIMNSt13__future_base11_State_baseEFvRSt8functionIFSt10unique_ptrINS0_12_Result_baseENS4_8_DeleterEEvEERbEIKPS1_St17reference_wrapperIS8_ESF_IbEEEvRSt9once_flagOT_DpOT0_
+   fun:_ZNSt13__future_base11_State_base13_M_set_resultESt8functionIFSt10unique_ptrINS_12_Result_baseENS3_8_DeleterEEvEEb
+   fun:_ZZNSt13__future_base17_Async_state_implISt12_Bind_simpleIFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_ENKUlvE_clEv
+   fun:_ZNSt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_EUlvE_vEE9_M_invokeIIEEEvSt12_Index_tupleIIXspT_EEE
+   fun:_ZNSt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS4_RKSsEUlvE_vEEvEC1EOSB_EUlvE_vEEclEv
+   fun:_ZNSt6thread5_ImplISt12_Bind_simpleIFZNSt13__future_base17_Async_state_implIS1_IFZN3cta11objectstore10BackendVFS12AsyncDeleterC1ERS6_RKSsEUlvE_vEEvEC1EOSD_EUlvE_vEEE6_M_runEv
+   obj:/usr/lib64/libstdc++.so.6.0.19
+   fun:mythread_wrapper
+   fun:start_thread
+   fun:clone
+}
+
 {
    AsyncGccRace
    Helgrind:Race
-- 
GitLab