From ad203f963d8f801d6a7ba954db4d2eb3e954e2d9 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Fri, 26 Jul 2019 17:55:06 +0200
Subject: [PATCH] #533 Changed strategy for implementation.

Moved the space reservation information to the DriveStatus object store object instead of a new central registry.
The central registry would have been a single point of contention as was the DriveRegistry before being split into
DriveStates. As the problem is so close to the one of the drive status, we can actually reuse the drive status for this purpose.

The algorithm will also change as we move the responsibility of querying the free space from the disk systems into the OStoreDb
object instead of the Scheduler. This leads to a slightly worth layering of responsibilities, making the OStoreDb::RetrieveMount
object a client of the disk::DiskSystemFreeSpaceList object.

The current implementation will also query the free space from the disk systems on each pop, instead of doing so in a globally
cached fashion. With the new model, we could cache the free space per drive (if needed), but not globally. This is not expected
to be a real issue and free space is a global counter in the disk system, expected to be readily available.
---
 common/dataStructures/ArchiveFile.hpp         |   2 +-
 objectstore/DriveState.cpp                    |  64 +++++++++++
 objectstore/DriveState.hpp                    |   6 ++
 objectstore/RetrieveQueue.cpp                 |   5 +-
 objectstore/RetrieveQueue.hpp                 |   3 +-
 objectstore/RetrieveQueueAlgorithms.hpp       |  11 +-
 objectstore/RetrieveQueueShard.cpp            |   2 +-
 objectstore/RetrieveQueueShard.hpp            |   3 +-
 objectstore/RetrieveQueueTest.cpp             |  10 +-
 ...trieveQueueToTransferForUserAlgorithms.cpp |   4 +-
 objectstore/cta.proto                         |  25 ++---
 scheduler/OStoreDB/OStoreDB.cpp               | 102 +++++++++++++++++-
 scheduler/OStoreDB/OStoreDB.hpp               |  12 ++-
 scheduler/RetrieveMount.cpp                   |  63 ++++++-----
 scheduler/SchedulerDatabase.hpp               |  33 +-----
 scheduler/SchedulerDatabaseTest.cpp           |   6 +-
 .../tapeserver/daemon/DiskWriteTaskTest.cpp   |   5 +-
 .../daemon/DiskWriteThreadPoolTest.cpp        |   5 +-
 .../daemon/RecallTaskInjectorTest.cpp         |   5 +-
 19 files changed, 251 insertions(+), 115 deletions(-)

diff --git a/common/dataStructures/ArchiveFile.hpp b/common/dataStructures/ArchiveFile.hpp
index 4f20ea6891..9929eb50ed 100644
--- a/common/dataStructures/ArchiveFile.hpp
+++ b/common/dataStructures/ArchiveFile.hpp
@@ -64,7 +64,7 @@ struct ArchiveFile {
   std::string storageClass;
   DiskFileInfo diskFileInfo;
   /**
-   * This map represents the non-necessarily-exhaustive set of tape copies 
+   * This list represents the non-necessarily-exhaustive set of tape copies 
    * to be listed by the operator. For example, if the listing requested is 
    * for a single tape, the map will contain only one element. 
    */
diff --git a/objectstore/DriveState.cpp b/objectstore/DriveState.cpp
index 634d9c71ae..7434180bad 100644
--- a/objectstore/DriveState.cpp
+++ b/objectstore/DriveState.cpp
@@ -174,6 +174,70 @@ void DriveState::setState(cta::common::dataStructures::DriveState& state) {
   }
 }
 
+//------------------------------------------------------------------------------
+// DriveState::getDiskSpaceReservations())
+//------------------------------------------------------------------------------
+std::map<std::string, uint64_t> DriveState::getDiskSpaceReservations() {
+  checkHeaderReadable();
+  std::map<std::string, uint64_t>  ret;
+  for (auto &dsr: m_payload.disk_space_reservations()) {
+    ret[dsr.disk_system_name()] = dsr.reserved_bytes();
+  }
+  return ret;
+}
+
+//------------------------------------------------------------------------------
+// DriveState::addDiskSpaceReservation())
+//------------------------------------------------------------------------------
+void DriveState::addDiskSpaceReservation(const std::string& diskSystemName, uint64_t bytes) {
+  checkPayloadWritable();
+  for (auto dsr: *m_payload.mutable_disk_space_reservations()) {
+    if (dsr.disk_system_name() == diskSystemName) {
+      dsr.set_reserved_bytes(dsr.reserved_bytes() + bytes);
+      return;
+    }
+  }
+  auto * newDsr = m_payload.mutable_disk_space_reservations()->Add();
+  newDsr->set_disk_system_name(diskSystemName);
+  newDsr->set_reserved_bytes(bytes);
+}
+
+//------------------------------------------------------------------------------
+// DriveState::substractDiskSpaceReservation())
+//------------------------------------------------------------------------------
+void DriveState::substractDiskSpaceReservation(const std::string& diskSystemName, uint64_t bytes) {
+  checkPayloadWritable();
+  size_t index=0;
+  for (auto dsr: *m_payload.mutable_disk_space_reservations()) {
+    if (dsr.disk_system_name() == diskSystemName) {
+      if (bytes > dsr.reserved_bytes())
+        throw NegativeDiskSpaceReservationReached(
+          "In DriveState::substractDiskSpaceReservation(): we would reach a negative reservation size.");
+      dsr.set_reserved_bytes(dsr.reserved_bytes() - bytes);
+      if (!dsr.reserved_bytes()) {
+        // We can remove this entry from the list.
+        auto * mdsr = m_payload.mutable_disk_space_reservations();
+        mdsr->SwapElements(index, mdsr->size()-1);
+        mdsr->RemoveLast();
+      }
+      return;
+    } else {
+      ++index;
+    }
+  }
+  if (bytes)
+    throw NegativeDiskSpaceReservationReached(
+      "In DriveState::substractDiskSpaceReservation(): Trying to substract bytes without previous reservation.");
+}
+
+//------------------------------------------------------------------------------
+// DriveState::resetDiskSpaceReservation())
+//------------------------------------------------------------------------------
+void DriveState::resetDiskSpaceReservation() {
+  checkPayloadWritable();
+  m_payload.mutable_disk_space_reservations()->Clear();
+}
+
 //------------------------------------------------------------------------------
 // DriveState::dump())
 //------------------------------------------------------------------------------
diff --git a/objectstore/DriveState.hpp b/objectstore/DriveState.hpp
index 4c7d8da089..afe22a049c 100644
--- a/objectstore/DriveState.hpp
+++ b/objectstore/DriveState.hpp
@@ -46,6 +46,12 @@ public:
   cta::common::dataStructures::DriveState getState();
   void setState(cta::common::dataStructures::DriveState & state);
   
+  std::map<std::string, uint64_t> getDiskSpaceReservations();
+  void addDiskSpaceReservation(const std::string & diskSystemName, uint64_t bytes);
+  CTA_GENERATE_EXCEPTION_CLASS(NegativeDiskSpaceReservationReached);
+  void substractDiskSpaceReservation(const std::string & diskSystemName, uint64_t bytes);
+  void resetDiskSpaceReservation();
+  
   /**
    * JSON dump of the drive state
    * @return 
diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp
index 6536126d60..1765aaabca 100644
--- a/objectstore/RetrieveQueue.cpp
+++ b/objectstore/RetrieveQueue.cpp
@@ -614,7 +614,7 @@ auto RetrieveQueue::dumpJobs() -> std::list<JobDump> {
   return ret;
 }
 
-auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip) -> CandidateJobList {
+auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, const std::set<std::string> & retrieveRequestsToSkip, const std::set<std::string> & diskSystemsToSkip) -> CandidateJobList {
   checkPayloadReadable();
   CandidateJobList ret;
   for(auto & rqsp: m_payload.retrievequeueshards()) {
@@ -623,7 +623,8 @@ auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::
       // Fetch the shard
       RetrieveQueueShard rqs(rqsp.address(), m_objectStore);
       rqs.fetchNoLock();
-      auto shardCandidates = rqs.getCandidateJobList(maxBytes - ret.candidateBytes, maxFiles - ret.candidateFiles, retrieveRequestsToSkip);
+      auto shardCandidates = rqs.getCandidateJobList(maxBytes - ret.candidateBytes, maxFiles - ret.candidateFiles,
+          retrieveRequestsToSkip, diskSystemsToSkip);
       ret.candidateBytes += shardCandidates.candidateBytes;
       ret.candidateFiles += shardCandidates.candidateFiles;
       // We overwrite the remaining values each time as the previous
diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp
index f3337bc2d7..c7a42a889e 100644
--- a/objectstore/RetrieveQueue.hpp
+++ b/objectstore/RetrieveQueue.hpp
@@ -115,7 +115,8 @@ public:
   };
   // The set of retrieve requests to skip are requests previously identified by the caller as bad,
   // which still should be removed from the queue. They will be disregarded from  listing.
-  CandidateJobList getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip);
+  CandidateJobList getCandidateList(uint64_t maxBytes, uint64_t maxFiles, const std::set<std::string> & retrieveRequestsToSkip,
+    const std::set<std::string> & diskSystemsToSkip);
 
   //! Return a summary of the number of jobs and number of bytes in the queue
   CandidateJobList getCandidateSummary();
diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp
index 450108e598..fb932847b0 100644
--- a/objectstore/RetrieveQueueAlgorithms.hpp
+++ b/objectstore/RetrieveQueueAlgorithms.hpp
@@ -455,6 +455,7 @@ struct ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::PopCriteri
     files -= pes.files;
     return *this;
   }
+  std::set<std::string> diskSystemsToSkip;
 };
 
 template<>
@@ -486,12 +487,18 @@ struct ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::PoppedElem
 
 template<typename C>
 auto ContainerTraits<RetrieveQueue,C>::
-getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip,
+getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elementsToSkip,
   log::LogContext &lc) -> PoppedElementsBatch
 {
   PoppedElementsBatch ret;
 
-  auto candidateJobsFromQueue = cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files, elemtsToSkip);
+  auto candidateJobsFromQueue = cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files,
+    elementsToSkip, 
+    // This parameter is needed only in the specialized version: 
+    // auto ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::getPoppingElementsCandidates
+    // We provide an empty set here.
+    std::set<std::string>()
+  );
   for(auto &cjfq : candidateJobsFromQueue.candidates) {
     ret.elements.emplace_back(PoppedElement{
       cta::make_unique<RetrieveRequest>(cjfq.address, cont.m_objectStore),
diff --git a/objectstore/RetrieveQueueShard.cpp b/objectstore/RetrieveQueueShard.cpp
index 9c00e31696..763bf584e2 100644
--- a/objectstore/RetrieveQueueShard.cpp
+++ b/objectstore/RetrieveQueueShard.cpp
@@ -62,7 +62,7 @@ void RetrieveQueueShard::garbageCollect(const std::string& presumedOwner, AgentR
   throw exception::Exception("In RetrieveQueueShard::garbageCollect(): garbage collection should not be necessary for this type of object.");
 }
 
-RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip) {
+RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, const std::set<std::string> & retrieveRequestsToSkip, const std::set<std::string> & diskSystemsToSkip) {
   checkPayloadReadable();
   RetrieveQueue::CandidateJobList ret;
   ret.remainingBytesAfterCandidates = m_payload.retrievejobstotalsize();
diff --git a/objectstore/RetrieveQueueShard.hpp b/objectstore/RetrieveQueueShard.hpp
index 8eef6d0af6..fa798e4b43 100644
--- a/objectstore/RetrieveQueueShard.hpp
+++ b/objectstore/RetrieveQueueShard.hpp
@@ -112,7 +112,8 @@ public:
    */
   RemovalResult removeJobs(const std::list<std::string> & jobsToRemove);
   
-  RetrieveQueue::CandidateJobList getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip);
+  RetrieveQueue::CandidateJobList getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles,
+    const std::set<std::string> & retrieveRequestsToSkip, const std::set<std::string> & diskSystemsToSkip);
   
   /** Re compute summaries in case they do not match the array content. */
   void rebuild();
diff --git a/objectstore/RetrieveQueueTest.cpp b/objectstore/RetrieveQueueTest.cpp
index 098114e628..aa7d386797 100644
--- a/objectstore/RetrieveQueueTest.cpp
+++ b/objectstore/RetrieveQueueTest.cpp
@@ -124,7 +124,8 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) {
     ASSERT_EQ(minStartTime, rq.getJobsSummary().oldestJobStartTime);
     uint64_t nextExpectedFseq=0;
     while (rq.getJobsSummary().jobs) {
-      auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>());
+      auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>(), 
+          std::set<std::string>());
       std::set<std::string> jobsToSkip;
       std::list<std::string> jobsToDelete;
       for (auto &j: candidateJobs.candidates) {
@@ -135,7 +136,7 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) {
         jobsToDelete.emplace_back(j.address);
         nextExpectedFseq++;
       }
-      auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip);
+      auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip, std::set<std::string>());
       if (candidateJobs2.candidateFiles) {
         std::stringstream address;
         address << "someRequest-" << nextExpectedFseq;
@@ -245,7 +246,8 @@ TEST(ObjectStore, RetrieveQueueActivityCounts) {
     ASSERT_EQ(0.2, jsB->weight);
     uint64_t nextExpectedFseq=0;
     while (rq.getJobsSummary().jobs) {
-      auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>());
+      auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>(),
+          std::set<std::string>());
       std::set<std::string> jobsToSkip;
       std::list<std::string> jobsToDelete;
       for (auto &j: candidateJobs.candidates) {
@@ -256,7 +258,7 @@ TEST(ObjectStore, RetrieveQueueActivityCounts) {
         jobsToDelete.emplace_back(j.address);
         nextExpectedFseq++;
       }
-      auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip);
+      auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip, std::set<std::string>());
       if (candidateJobs2.candidateFiles) {
         std::stringstream address;
         address << "someRequest-" << nextExpectedFseq;
diff --git a/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp b/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp
index 90dcb21623..6186f62eb8 100644
--- a/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp
+++ b/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp
@@ -41,12 +41,12 @@ addToLog(log::ScopedParamContainer &params) const {
 
 template<>
 auto ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::
-getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip,
+getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elementsToSkip,
   log::LogContext &lc) -> PoppedElementsBatch
 {
   PoppedElementsBatch ret;
 
-  auto candidateJobsFromQueue = cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip);
+  auto candidateJobsFromQueue = cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elementsToSkip, unfulfilledCriteria.diskSystemsToSkip);
   for(auto &cjfq : candidateJobsFromQueue.candidates) {
     ret.elements.emplace_back(PoppedElement{
       cta::make_unique<RetrieveRequest>(cjfq.address, cont.m_objectStore),
diff --git a/objectstore/cta.proto b/objectstore/cta.proto
index 18828a3401..f4e54e8238 100644
--- a/objectstore/cta.proto
+++ b/objectstore/cta.proto
@@ -34,6 +34,7 @@ enum ObjectType {
   RepackRequest_t = 11;
   RepackIndex_t = 12;
   RepackQueue_t = 13;
+  DiskSystemSpaceRegister_t = 14;
   GenericObject_t = 1000;
 }
 
@@ -206,6 +207,12 @@ message ArchiveFile {
 
 // ------------- Drives handling  ----------------------------------------------
 
+message DiskSpaceReservation {
+  // Each drive keeps tabs of its intended 
+  required string disk_system_name = 5100;
+  required uint64 reserved_bytes = 5110;
+}
+
 message DriveState {
   required string drivename = 5000;
   required string host = 5001;
@@ -241,6 +248,7 @@ message DriveState {
   optional uint64 next_priority = 5031;
   optional string next_activity = 5032;
   optional double next_activity_weight = 5033;
+  repeated DiskSpaceReservation disk_space_reservations = 5034;
 // TODO: implement or remove  required EntryLog creationlog = 5023;
 }
 
@@ -592,20 +600,3 @@ message RepackRequestQueuePointer {
 message RepackQueue {
   repeated RepackRequestQueuePointer repackrequestpointers = 12200;
 }
-
-message DiskSystemSpaceReservation {
-  required string holder = 12500;
-  required uint64 size = 12501;
-}
-
-message DiskSystemSpace {
-  required string name = 12300;
-  required uint64 free_space = 12301;
-  required uint64 last_measurement_time = 12302;
-  required uint64 targeted_free_space = 12303;
-  repeated DiskSystemSpaceReservation reservations =12304;
-}
-
-message DiskSystemSpaceRegistry {
-  repeated DiskSystemSpace disk_systems = 12400;
-}
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index b0b7925d4a..aabbd918f1 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -2365,18 +2365,18 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
     }
   }
   // We now have created the subrequests. Time to enqueue.
+  // TODO: the lock/fetch could be parallelized
   {
     objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue);
-    std::list<std::unique_ptr<objectstore::ScopedExclusiveLock>> locks;
+    std::list<objectstore::ScopedExclusiveLock> locks;
     for (auto &is: asyncInsertedSubrequestInfoList) {
-      locks.push_back(cta::make_unique<objectstore::ScopedExclusiveLock>(*is.request));
+      locks.emplace_back(*is.request);
       is.request->fetch();
       sorter.insertRetrieveRequest(is.request, *m_oStoreDB.m_agentReference, is.activeCopyNb, lc);
     }
     locks.clear();
     sorter.flushAll(lc);
   }
-  
 }
 
 //------------------------------------------------------------------------------
@@ -2402,6 +2402,9 @@ void OStoreDB::RepackRequest::fail() {
   m_repackRequest.commit();
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::RepackRequest::requeueInToExpandQueue()
+//------------------------------------------------------------------------------
 void OStoreDB::RepackRequest::requeueInToExpandQueue(log::LogContext& lc){
   ScopedExclusiveLock rrl(m_repackRequest);
   m_repackRequest.fetch();
@@ -2418,6 +2421,9 @@ void OStoreDB::RepackRequest::requeueInToExpandQueue(log::LogContext& lc){
   rqteAlgo.referenceAndSwitchOwnership(nullopt, previousOwner, insertedElements, lc);
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::RepackRequest::setExpandStartedAndChangeStatus()
+//------------------------------------------------------------------------------
 void OStoreDB::RepackRequest::setExpandStartedAndChangeStatus(){
   ScopedExclusiveLock rrl(m_repackRequest);
   m_repackRequest.fetch();
@@ -2426,6 +2432,9 @@ void OStoreDB::RepackRequest::setExpandStartedAndChangeStatus(){
   m_repackRequest.commit();
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::RepackRequest::fillLastExpandedFSeqAndTotalStatsFile()
+//------------------------------------------------------------------------------
 void OStoreDB::RepackRequest::fillLastExpandedFSeqAndTotalStatsFile(uint64_t& fSeq, TotalStatsFiles& totalStatsFiles) {
   ScopedExclusiveLock rrl(m_repackRequest);
   m_repackRequest.fetch();
@@ -3443,12 +3452,30 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo(
 // OStoreDB::RetrieveMount::getNextJobBatch()
 //------------------------------------------------------------------------------
 std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount::getNextJobBatch(uint64_t filesRequested, 
-    uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) {
+    uint64_t bytesRequested, cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) {
+  // Pop a batch of files to retrieve and, for the ones having a documented disk system name, reserve the space
+  // that they will require. In case we cannot allocate the space for some of them, mark the destination filesystem as
+  // full and stop popping from it, after requeueing the jobs.
   typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> RQAlgos;
   RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
   RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
+  popCriteria.diskSystemsToSkip = m_diskSystemsToSkip;
   auto jobs = rqAlgos.popNextBatch(mountInfo.vid, popCriteria, logContext);
-  // We can construct the return value
+  // Try and allocate data for the popped jobs.
+  // Compute the necessary space in each targeted disk system.
+  SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest;
+  std::map<std::string, uint64_t> spaceMap;
+  for (auto &j: jobs.elements)
+      if (j.diskSystemName)
+        diskSpaceReservationRequest.addRequest(j.diskSystemName.value(), j.archiveFile.fileSize);
+  // Get the existing reservation map from the other drives.
+  auto otherDrivesReservations = getExistingDrivesReservations();
+  // Get the free space from disk systems involved.
+  
+  // If any file system does not have enough space, make it as full, requeue all (slight but rare inefficiency) 
+  // and retry the pop.
+  
+  // Else, we can construct the return value (we did not hit any full disk system.
   std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> ret;
   for(auto &j : jobs.elements)
   {
@@ -3466,6 +3493,71 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMou
   return ret;
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::RetrieveMount::requeueJobBatch()
+//------------------------------------------------------------------------------
+void OStoreDB::RetrieveMount::requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch,
+    log::LogContext& logContext) {
+  objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue);
+  std::list<std::shared_ptr<objectstore::RetrieveRequest>> rrlist;
+  std::list<objectstore::ScopedExclusiveLock> locks;
+  for (auto & j: jobBatch) {
+    cta::OStoreDB::RetrieveJob *rj = cta::OStoreDB::castFromSchedDBJob(j.get());
+    auto rr = std::make_shared<objectstore::RetrieveRequest>(rj->m_retrieveRequest.getAddressIfSet(), m_oStoreDB.m_objectStore);
+    rrlist.push_back(rr);
+    locks.emplace_back(*rr);
+    rr->fetch();
+    sorter.insertRetrieveRequest(rr, *m_oStoreDB.m_agentReference, rj->selectedCopyNb, logContext);
+  }
+  locks.clear();
+  rrlist.clear();
+  sorter.flushAll(logContext);
+}
+
+//------------------------------------------------------------------------------
+// OStoreDB::RetrieveMount::getExistingDrivesReservations()
+//------------------------------------------------------------------------------
+std::map<std::string, uint64_t> OStoreDB::RetrieveMount::getExistingDrivesReservations() {
+  objectstore::RootEntry re(m_oStoreDB.m_objectStore);
+  re.fetchNoLock();
+  objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_oStoreDB.m_objectStore);
+  dr.fetchNoLock();
+  auto driveAddresses = dr.getDriveAddresses();
+  std::list <objectstore::DriveState> dsList;
+  std::list <std::unique_ptr<objectstore::DriveState::AsyncLockfreeFetcher>> dsFetchers;
+  for (auto &d: driveAddresses) {
+    dsList.emplace_back(d.driveStateAddress, m_oStoreDB.m_objectStore);
+    dsFetchers.emplace_back(dsList.back().asyncLockfreeFetch());
+  }
+  auto dsf = dsFetchers.begin();
+  std::map<std::string, uint64_t> ret;
+  for (auto &d: dsList) {
+    try {
+      (*dsf)->wait();
+      dsf++;
+      for (auto &dsr: d.getDiskSpaceReservations()) {
+        try {
+          ret.at(dsr.first) += dsr.second;
+        } catch (std::out_of_range &) {
+          ret[dsr.first] = dsr.second;
+        }
+      }
+    } catch (objectstore::Backend::NoSuchObject) {
+      // If the drive status is not there, we just skip it.
+      dsf++;
+    }
+  }
+  return ret;
+}
+
+//------------------------------------------------------------------------------
+// OStoreDB::RetrieveMount::reserveDiskSpace()
+//------------------------------------------------------------------------------
+void OStoreDB::RetrieveMount::reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) {
+  // Try and add our reservation to the disk
+  throw exception::Exception("In OStoreDB::RetrieveMount::reserveDiskSpace(): not implemented.");
+}
+
 //------------------------------------------------------------------------------
 // OStoreDB::RetrieveMount::complete()
 //------------------------------------------------------------------------------
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index d11b6c5277..f2a6d0135c 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -218,10 +218,14 @@ public:
   public:
     const MountInfo & getMountInfo() override;
     std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, 
-      const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) override;
-    void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override;
-    void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) override;
-    void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override;
+      cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) override;
+  private:
+    void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch,
+      log::LogContext& logContext);
+    std::map<std::string, uint64_t> getExistingDrivesReservations(); 
+    void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation);
+    std::set<std::string> m_diskSystemsToSkip;
+  public:
     void complete(time_t completionTime) override;
     void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
     void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override;
diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp
index d60545da2b..30fc80c2d3 100644
--- a/scheduler/RetrieveMount.cpp
+++ b/scheduler/RetrieveMount.cpp
@@ -141,35 +141,40 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc
   // Get the current file systems list from the catalogue
   disk::DiskSystemList diskSystemList;
   if (m_catalogue) diskSystemList = m_catalogue->getAllDiskSystems();
-  // Try and get a new job from the DB
-  std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch;
-  { 
-    retryBatchAllocation:
-    dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, m_fullDiskSystems, logContext);
-    // Compute the necessary space in each targeted disk system.
-    SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest;
-    std::map<std::string, uint64_t> spaceMap;
-    for (auto &j: dbJobBatch)
-      if (j->diskSystemName)
-        diskSpaceReservationRequest.addRequest(j->diskSystemName.value(), j->archiveFile.fileSize);
-    // Reserve the space.
-    // We will update this information on-demand during iterations if needed. 
-    disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList);
-    retrySpaceAllocation:
-    try {
-      m_dbMount->reserveDiskSpace(diskSpaceReservationRequest);
-    } catch (SchedulerDatabase::OutdatedDiskSystemInformation &odsi) {
-      // Update information for missing/outdated disk systems.
-      diskSystemFreeSpaceList.fetchFileSystemFreeSpace(odsi.getDiskSsytems());
-      goto retrySpaceAllocation;
-    } catch (SchedulerDatabase::FullDiskSystem &fds) {
-      // Mark the disk systems as full for the mount. Re-queue all requests, repeat the pop attempt.
-      for (auto &ds: fds.getDiskSsytems()) m_fullDiskSystems.insert(ds);
-      m_dbMount->requeueJobBatch(dbJobBatch);
-      dbJobBatch.clear();
-      goto retryBatchAllocation;
-    }
-  }
+  // TODO: the diskSystemFreeSpaceList could be made a member of the retrieve mount and cache the fetched values, limiting the re-querying
+  // of the disk systems free space.
+  disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList (diskSystemList);
+  // Try and get a new job from the DB. The DB mount (in memory object) is taking care of reserving the free space for the popped 
+  // elements and query the disk systems, via the diskSystemFreeSpaceList object.
+  auto dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, diskSystemFreeSpaceList, logContext);
+//  std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch;
+//  { 
+//    retryBatchAllocation:
+//    dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, diskSystemFreeSpaceList, logContext);
+//    // Compute the necessary space in each targeted disk system.
+//    SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest;
+//    std::map<std::string, uint64_t> spaceMap;
+//    for (auto &j: dbJobBatch)
+//      if (j->diskSystemName)
+//        diskSpaceReservationRequest.addRequest(j->diskSystemName.value(), j->archiveFile.fileSize);
+//    // Reserve the space.
+//    // We will update this information on-demand during iterations if needed. 
+//    disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList);
+//    retrySpaceAllocation:
+//    try {
+//      m_dbMount->reserveDiskSpace(diskSpaceReservationRequest);
+//    } catch (SchedulerDatabase::OutdatedDiskSystemInformation &odsi) {
+//      // Update information for missing/outdated disk systems.
+//      diskSystemFreeSpaceList.fetchFileSystemFreeSpace(odsi.getDiskSsytems());
+//      goto retrySpaceAllocation;
+//    } catch (SchedulerDatabase::FullDiskSystem &fds) {
+//      // Mark the disk systems as full for the mount. Re-queue all requests, repeat the pop attempt.
+//      for (auto &ds: fds.getDiskSsytems()) m_fullDiskSystems.insert(ds);
+//      m_dbMount->requeueJobBatch(dbJobBatch, logContext);
+//      dbJobBatch.clear();
+//      goto retryBatchAllocation;
+//    }
+//  }
   std::list<std::unique_ptr<RetrieveJob>> ret;
   // We prepare the response
   for (auto & sdrj: dbJobBatch) {
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index 4eebfd616b..ae09ddb2df 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -349,27 +349,7 @@ public:
     std::map<std::string, uint64_t> m_spaceMap;
   };
   
-private:
-  class ProblemDiskSystemList: public exception::Exception {
-    using cta::exception::Exception::Exception;
-  public:
-    const std::set<std::string> &getDiskSsytems() { return m_outdatedDiskSystems; }
-    void addDiskSystem(const std::string &diskSystenName) { m_outdatedDiskSystems.insert(diskSystenName); }
-  private:
-    std::set<std::string> m_outdatedDiskSystems;
-  };
-  
 public:
-  /** An exception allowing the reservation function to de called again with up to date free space information from the 
-   * disk systems.*/
-  class OutdatedDiskSystemInformation: public ProblemDiskSystemList { using ProblemDiskSystemList::ProblemDiskSystemList; };
-  
-  /** An exception allowing the reservation system to report disk systems for which the free space could not be reserved */
-  class FullDiskSystem: public ProblemDiskSystemList { using ProblemDiskSystemList::ProblemDiskSystemList; };
-  
-  /** Clear all reservation for an agent. Used at agent cleanup and garbage collection time, so not in retrieve mount context. */
-  void clearDiskReservation(const std::string);
-  
   class RetrieveMount {
   public:
     struct MountInfo {
@@ -387,17 +367,8 @@ public:
     } mountInfo;
     virtual const MountInfo & getMountInfo() = 0;
     virtual std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextJobBatch(uint64_t filesRequested,
-      uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) = 0;
-    virtual void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> & jobBatch) = 0;
-  
-    /* Mount-level disk reservation functions. */
-    /** Attempt to reserve, can throw OutdatedDiskSystemInformation or FullDiskSystem. Does NOT proceed with any reservation
-     * in case of throw. */
-    virtual void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) = 0;
-
-    /** Release some space for an agent and destination. */
-    virtual void releaseDiskSpace(const std::string &reservingAgent, const std::string &diskSystemName, uint64_t size) = 0;
-    
+      uint64_t bytesRequested, cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) = 0;
+//    virtual void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> & jobBatch, log::LogContext& logContext) = 0; 
     virtual void complete(time_t completionTime) = 0;
     virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0;
     virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0;
diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp
index 126dd5096f..247334a97a 100644
--- a/scheduler/SchedulerDatabaseTest.cpp
+++ b/scheduler/SchedulerDatabaseTest.cpp
@@ -315,11 +315,11 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) {
 
   // Create the disk system list
   cta::disk::DiskSystemList diskSystemList;
+  cta::disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList);
   diskSystemList.push_back(cta::disk::DiskSystem{"ds-A", "$root://a.disk.system/", "query:todo", 60, 10UL*1000*1000*1000,
       common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"});
   diskSystemList.push_back(cta::disk::DiskSystem{"ds-B", "$root://b.disk.system/", "query:todo", 60, 10UL*1000*1000*1000,
       common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"});
-  std::set<std::string> fullDiskSystems;
   
   // Inject 10 retrieve jobs to the db.
   const size_t filesToDo = 10;
@@ -369,7 +369,7 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) {
   auto moutInfo = db.getMountInfo(lc);
   ASSERT_EQ(1, moutInfo->potentialMounts.size());
   auto rm=moutInfo->createRetrieveMount("vid", "tapePool", "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr), cta::nullopt);
-  auto rjb = rm->getNextJobBatch(20,20*1000,fullDiskSystems, lc);
+  auto rjb = rm->getNextJobBatch(20,20*1000,diskSystemFreeSpaceList, lc);
   ASSERT_EQ(filesToDo, rjb.size());
   std::list <cta::SchedulerDatabase::RetrieveJob*> jobBatch;
   for (auto &rj: rjb) {
@@ -380,7 +380,7 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) {
   }
   rm->flushAsyncSuccessReports(jobBatch, lc);
   rjb.clear();
-  ASSERT_EQ(0, rm->getNextJobBatch(20,20*1000,fullDiskSystems, lc).size());
+  ASSERT_EQ(0, rm->getNextJobBatch(20,20*1000,diskSystemFreeSpaceList, lc).size());
   rm->complete(time(nullptr));
   rm.reset(nullptr);
   moutInfo.reset(nullptr);
diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp
index cc75055c24..6ef2ede0c1 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp
@@ -43,10 +43,7 @@ namespace unitTests{
   class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
     const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
     std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested,
-      const std::set<std::string> &fullDiskSystems, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
-    void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override { throw std::runtime_error("Not implemented");}
-    void reserveDiskSpace(const cta::SchedulerDatabase::DiskSpaceReservationRequest& diskSpaceReservation) override { throw std::runtime_error("Not implemented");}
-    void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override { throw std::runtime_error("Not implemented");}
+      cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
 
     void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
     void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp
index 78237ffaf0..b3984a5f6a 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp
@@ -36,10 +36,7 @@ namespace unitTests{
   
   class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
     const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
-    std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
-    void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override { throw std::runtime_error("Not implemented");}
-    void reserveDiskSpace(const cta::SchedulerDatabase::DiskSpaceReservationRequest& diskSpaceReservation) override { throw std::runtime_error("Not implemented");}
-    void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override { throw std::runtime_error("Not implemented");}
+    std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
 
     void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
     void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp
index fab688adff..0ee82c988b 100644
--- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp
+++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp
@@ -132,10 +132,7 @@ namespace unitTests
   class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
     const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
     std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested,
-      const std::set<std::string> &fullDiskSystems, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
-    void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override { throw std::runtime_error("Not implemented");}
-    void reserveDiskSpace(const cta::SchedulerDatabase::DiskSpaceReservationRequest& diskSpaceReservation) override { throw std::runtime_error("Not implemented");}
-    void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override { throw std::runtime_error("Not implemented");}
+      cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
     void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
     void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
     void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
-- 
GitLab