From 8df07add7e7deb8cff9f7a87c816fa176e9cc9a8 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Tue, 30 Jul 2019 11:01:25 +0200
Subject: [PATCH] #533  Added support for fetching free space from disk systems

The disk system is now vetoed for the duration of a mount if we fail to reserve free space.
---
 disk/DiskSystem.cpp             | 96 ++++++++++++++++++++++++++++++++-
 disk/DiskSystem.hpp             |  7 ++-
 scheduler/OStoreDB/OStoreDB.cpp | 84 ++++++++++++++++++++---------
 scheduler/OStoreDB/OStoreDB.hpp |  4 +-
 scheduler/SchedulerDatabase.cpp |  4 +-
 scheduler/SchedulerDatabase.hpp |  5 +-
 6 files changed, 164 insertions(+), 36 deletions(-)

diff --git a/disk/DiskSystem.cpp b/disk/DiskSystem.cpp
index 20b845952e..e52dbd6026 100644
--- a/disk/DiskSystem.cpp
+++ b/disk/DiskSystem.cpp
@@ -19,16 +19,26 @@
 #include "DiskSystem.hpp"
 
 #include <algorithm>
+#include "common/exception/Exception.hpp"
+#include "common/threading/SubProcess.hpp"
+#include "common/exception/Errnum.hpp"
+#include "common/utils/utils.hpp"
 
 namespace cta {
 namespace disk {
 
+//------------------------------------------------------------------------------
+// DiskSystemList::at()
+//------------------------------------------------------------------------------
 const DiskSystem& DiskSystemList::at(const std::string& name) {
   auto dsi = std::find_if(begin(), end(), [&](const DiskSystem& ds){ return ds.name == name;});
   if (dsi != end()) return *dsi;
   throw std::out_of_range("In DiskSystemList::at(): name not found.");
 }
 
+//------------------------------------------------------------------------------
+// DiskSystemList::getFSNAme()
+//------------------------------------------------------------------------------
 std::string DiskSystemList::getFSNAme(const std::string& fileURL) {
   // First if the regexes have not been created yet, do so.
   if (m_pointersAndRegexes.empty() && size()) {
@@ -45,8 +55,92 @@ std::string DiskSystemList::getFSNAme(const std::string& fileURL) {
       m_pointersAndRegexes.splice(m_pointersAndRegexes.begin(), m_pointersAndRegexes, pri);
     return pri->ds.name;
   }
-  throw std::out_of_range("In DiskSystemList::getFSNAme(): not match for fileURL");
+  throw std::out_of_range("In DiskSystemList::getFSNAme(): not match for fileURL");  
+}
+
+//------------------------------------------------------------------------------
+// DiskSystemFreeSpaceList::fetchFileSystemFreeSpace()
+//------------------------------------------------------------------------------
+void DiskSystemFreeSpaceList::fetchDiskSystemFreeSpace(const std::set<std::string>& diskSystems) {
+  // The real deal: go fetch the file system's free space.
+  cta::utils::Regex eosDiskSystem("^eos://(.*)$");
+  // For testing purposes
+  cta::utils::Regex constantFreeSpaceDiskSystem("^contantFreeSpace:(.*)");
+  for (auto const & ds: diskSystems) {
+    uint64_t freeSpace;
+    std::vector<std::string> regexResult;
+    regexResult = eosDiskSystem.exec(m_systemList.at(ds).freeSpaceQueryURL);
+    if (regexResult.size()) {
+      freeSpace = fetchEosFreeSpace(regexResult.at(1));
+      goto found;
+    }
+    regexResult = constantFreeSpaceDiskSystem.exec(m_systemList.at(ds).freeSpaceQueryURL);
+     if (regexResult.size()) {
+      freeSpace = fetchConstantFreeSpace(regexResult.at(1));
+      goto found;
+    }
+    throw exception::Exception("In DiskSystemFreeSpaceList::fetchDiskSystemFreeSpace(): could not interpret free space query URL.");
+  found:
+    DiskSystemFreeSpace & entry = operator [](ds);
+    entry.freeSpace = freeSpace;
+    entry.fetchTime = ::time(nullptr);
+    entry.targetedFreeSpace = m_systemList.at(ds).targetedFreeSpace;
+  }
+}
+
+//------------------------------------------------------------------------------
+// DiskSystemFreeSpaceList::fetchFileSystemFreeSpace()
+//------------------------------------------------------------------------------
+uint64_t DiskSystemFreeSpaceList::fetchEosFreeSpace(const std::string& instanceAddress) {
+  threading::SubProcess sp("/usr/bin/eos", {std::string("root://")+instanceAddress, "space", "ls", "-m"});
+  sp.wait();
+  try {
+    exception::Errnum::throwOnNonZero(sp.exitValue(), "In DiskSystemFreeSpaceList::fetchEosFreeSpace(), failed to call eos space ls -m");
+  } catch (exception::Exception & ex) {
+    ex.getMessage() << "stderr: " << sp.stderr();
+    throw;
+  }
+  if (sp.wasKilled()) {
+    exception::Exception ex("In DiskSystemFreeSpaceList::fetchEosFreeSpace(): eos space ls -m killed by signal: ");
+    ex.getMessage() << utils::toString(sp.killSignal());
+    throw ex;
+  }
+  // Look for the result line for default space.
+  std::istringstream spStdoutIss(sp.stdout());
+  std::string defaultSpaceLine;
+  utils::Regex defaultSpaceRe("^.*name=default .*$");
+  do {
+    std::string spStdoutLine;
+    std::getline(spStdoutIss, spStdoutLine);
+    auto res = defaultSpaceRe.exec(spStdoutLine);
+    if (res.size()) {
+      defaultSpaceLine = res.at(0);
+      goto defaultFound;
+    }
+  } while (!spStdoutIss.eof());
+  throw exception::Exception("In DiskSystemFreeSpaceList::fetchEosFreeSpace(): could not find line for default space.");
   
+defaultFound:
+  // Look for the parameters in the result line.
+  utils::Regex rwSpaceRegex("sum.stat.statfs.capacity\\?configstatus@rw=(\\d+) ");
+  auto rwSpaceRes = rwSpaceRegex.exec(defaultSpaceLine);
+  if (rwSpaceRes.empty())
+    throw exception::Exception(
+        "In DiskSystemFreeSpaceList::fetchEosFreeSpace(): failed to parse parameter sum.stat.statfs.capacity?configstatus@rw.");
+  utils::Regex usedSpaceRegex("sum.stat.statfs.usedbytes=(\\d+) ");
+  auto usedSpaceRes = usedSpaceRegex.exec(sp.stdout());
+  if (usedSpaceRes.empty())
+    throw exception::Exception("In DiskSystemFreeSpaceList::fetchEosFreeSpace(): failed to parse parameter sum.stat.statfs.usedbytes.");
+  return utils::toUint64(rwSpaceRes.at(1)) - utils::toUint64(usedSpaceRes.at(1));
 }
 
+//------------------------------------------------------------------------------
+// DiskSystemFreeSpaceList::fetchFileSystemFreeSpace()
+//------------------------------------------------------------------------------
+uint64_t DiskSystemFreeSpaceList::fetchConstantFreeSpace(const std::string& instanceAddress) {
+  return utils::toUint64(instanceAddress);
+}
+
+
+
 }} // namespace cta::disk
\ No newline at end of file
diff --git a/disk/DiskSystem.hpp b/disk/DiskSystem.hpp
index be4911ca76..4f0a5257a7 100644
--- a/disk/DiskSystem.hpp
+++ b/disk/DiskSystem.hpp
@@ -68,16 +68,19 @@ private:
 };
 
 struct DiskSystemFreeSpace {
-  uint64_t freeSize;
+  uint64_t freeSpace;
+  uint64_t targetedFreeSpace;
   time_t fetchTime;
 };
 
 class DiskSystemFreeSpaceList: public std::map<std::string, DiskSystemFreeSpace> {
 public:
   DiskSystemFreeSpaceList(DiskSystemList &diskSystemList): m_systemList(diskSystemList) {}
-  void fetchFileSystemFreeSpace(const std::set<std::string> &fileSystems);
+  void fetchDiskSystemFreeSpace(const std::set<std::string> &diskSystems);
 private:
   DiskSystemList &m_systemList;
+  uint64_t fetchEosFreeSpace(const std::string & instanceAddress);
+  uint64_t fetchConstantFreeSpace(const std::string & instanceAddress);
 };
 
 }} // namespace cta::common
diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index aabbd918f1..8034baf8dd 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -3456,26 +3456,57 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMou
   // Pop a batch of files to retrieve and, for the ones having a documented disk system name, reserve the space
   // that they will require. In case we cannot allocate the space for some of them, mark the destination filesystem as
   // full and stop popping from it, after requeueing the jobs.
+  bool failedAllocation = false;
+  SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest;
   typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> RQAlgos;
   RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
-  RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
-  popCriteria.diskSystemsToSkip = m_diskSystemsToSkip;
-  auto jobs = rqAlgos.popNextBatch(mountInfo.vid, popCriteria, logContext);
-  // Try and allocate data for the popped jobs.
-  // Compute the necessary space in each targeted disk system.
-  SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest;
-  std::map<std::string, uint64_t> spaceMap;
-  for (auto &j: jobs.elements)
-      if (j.diskSystemName)
-        diskSpaceReservationRequest.addRequest(j.diskSystemName.value(), j.archiveFile.fileSize);
-  // Get the existing reservation map from the other drives.
-  auto otherDrivesReservations = getExistingDrivesReservations();
-  // Get the free space from disk systems involved.
-  
-  // If any file system does not have enough space, make it as full, requeue all (slight but rare inefficiency) 
-  // and retry the pop.
-  
-  // Else, we can construct the return value (we did not hit any full disk system.
+  RQAlgos::PoppedElementsBatch jobs;
+  retryPop:
+  {
+    RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
+    popCriteria.diskSystemsToSkip = m_diskSystemsToSkip;
+    jobs = rqAlgos.popNextBatch(mountInfo.vid, popCriteria, logContext);
+    // Try and allocate data for the popped jobs.
+    // Compute the necessary space in each targeted disk system.
+    std::map<std::string, uint64_t> spaceMap;
+    for (auto &j: jobs.elements)
+        if (j.diskSystemName)
+          diskSpaceReservationRequest.addRequest(j.diskSystemName.value(), j.archiveFile.fileSize);
+    // Get the existing reservation map from drives (including this drive's previous pending reservations).
+    auto otherDrivesReservations = getExistingDrivesReservations();
+    // Get the free space from disk systems involved.
+    std::set<std::string> diskSystemNames;
+    for (auto const & dsrr: diskSpaceReservationRequest) diskSystemNames.insert(dsrr.first);
+    diskSystemFreeSpace.fetchDiskSystemFreeSpace(diskSystemNames);
+    // If any file system does not have enough space, mark it as full for this mount, requeue all (slight but rare inefficiency) 
+    // and retry the pop.
+    for (auto const & ds: diskSystemNames) {
+      if (diskSystemFreeSpace.at(ds).freeSpace < diskSpaceReservationRequest.at(ds) + diskSystemFreeSpace.at(ds).targetedFreeSpace) {
+        m_diskSystemsToSkip.insert(ds);
+        failedAllocation = true;
+        log::ScopedParamContainer params(logContext);
+        params.add("diskSystemName", ds)
+              .add("freeSpace", diskSystemFreeSpace.at(ds).freeSpace)
+              .add("spaceToReserve", diskSpaceReservationRequest.at(ds))
+              .add("targetedFreeSpace", diskSystemFreeSpace.at(ds).targetedFreeSpace);
+        logContext.log(log::WARNING, "In OStoreDB::RetrieveMount::getNextJobBatch(): could not allocated disk space for job batch.");
+      }  
+    }
+  }
+  if (failedAllocation) {
+    std::list<std::unique_ptr<OStoreDB::RetrieveJob>> rjl;
+    for (auto & jle: jobs.elements) rjl.emplace_back(new OStoreDB::RetrieveJob(jle.retrieveRequest->getAddressIfSet(), m_oStoreDB, this));
+    requeueJobBatch(rjl, logContext);
+    rjl.clear();
+    // Clean up for the next round of popping
+    jobs.summary.files=0;
+    jobs.elements.clear();
+    failedAllocation = false;
+    diskSpaceReservationRequest.clear();
+    goto retryPop;
+  }
+  this->reserveDiskSpace(diskSpaceReservationRequest, logContext);
+  // Allocation went fine, we can construct the return value (we did not hit any full disk system.
   std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> ret;
   for(auto &j : jobs.elements)
   {
@@ -3496,18 +3527,17 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMou
 //------------------------------------------------------------------------------
 // OStoreDB::RetrieveMount::requeueJobBatch()
 //------------------------------------------------------------------------------
-void OStoreDB::RetrieveMount::requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch,
+void OStoreDB::RetrieveMount::requeueJobBatch(std::list<std::unique_ptr<OStoreDB::RetrieveJob> >& jobBatch,
     log::LogContext& logContext) {
   objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue);
   std::list<std::shared_ptr<objectstore::RetrieveRequest>> rrlist;
   std::list<objectstore::ScopedExclusiveLock> locks;
   for (auto & j: jobBatch) {
-    cta::OStoreDB::RetrieveJob *rj = cta::OStoreDB::castFromSchedDBJob(j.get());
-    auto rr = std::make_shared<objectstore::RetrieveRequest>(rj->m_retrieveRequest.getAddressIfSet(), m_oStoreDB.m_objectStore);
+    auto rr = std::make_shared<objectstore::RetrieveRequest>(j->m_retrieveRequest.getAddressIfSet(), m_oStoreDB.m_objectStore);
     rrlist.push_back(rr);
     locks.emplace_back(*rr);
     rr->fetch();
-    sorter.insertRetrieveRequest(rr, *m_oStoreDB.m_agentReference, rj->selectedCopyNb, logContext);
+    sorter.insertRetrieveRequest(rr, *m_oStoreDB.m_agentReference, j->selectedCopyNb, logContext);
   }
   locks.clear();
   rrlist.clear();
@@ -3553,9 +3583,13 @@ std::map<std::string, uint64_t> OStoreDB::RetrieveMount::getExistingDrivesReserv
 //------------------------------------------------------------------------------
 // OStoreDB::RetrieveMount::reserveDiskSpace()
 //------------------------------------------------------------------------------
-void OStoreDB::RetrieveMount::reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) {
-  // Try and add our reservation to the disk
-  throw exception::Exception("In OStoreDB::RetrieveMount::reserveDiskSpace(): not implemented.");
+void OStoreDB::RetrieveMount::reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation, log::LogContext & lc) {
+  // Try add our reservation to the drive status.
+  objectstore::DriveState ds(m_oStoreDB.m_objectStore);
+  objectstore::ScopedExclusiveLock dsl;
+  Helpers::getLockedAndFetchedDriveState(ds, dsl, *m_oStoreDB.m_agentReference, mountInfo.drive, lc, Helpers::CreateIfNeeded::doNotCreate);
+  for (auto const & dsr: diskSpaceReservation) ds.addDiskSpaceReservation(dsr.first, dsr.second);
+  ds.commit();
 }
 
 //------------------------------------------------------------------------------
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index f2a6d0135c..8f01d437e1 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -220,10 +220,10 @@ public:
     std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, 
       cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) override;
   private:
-    void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch,
+    void requeueJobBatch(std::list<std::unique_ptr<OStoreDB::RetrieveJob> >& jobBatch,
       log::LogContext& logContext);
     std::map<std::string, uint64_t> getExistingDrivesReservations(); 
-    void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation);
+    void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation, log::LogContext & lc);
     std::set<std::string> m_diskSystemsToSkip;
   public:
     void complete(time_t completionTime) override;
diff --git a/scheduler/SchedulerDatabase.cpp b/scheduler/SchedulerDatabase.cpp
index fe5f00a8b8..4c1415c4ee 100644
--- a/scheduler/SchedulerDatabase.cpp
+++ b/scheduler/SchedulerDatabase.cpp
@@ -34,9 +34,9 @@ SchedulerDatabase::RepackRequestStatistics::RepackRequestStatistics() {
 
 void SchedulerDatabase::DiskSpaceReservationRequest::addRequest(const std::string& diskSystemName, uint64_t size) {
   try {
-    m_spaceMap.at(diskSystemName) += size;
+    at(diskSystemName) += size;
   } catch (std::out_of_range &) {
-    m_spaceMap[diskSystemName] = size;
+    operator[](diskSystemName) = size;
   }
 }
 
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index ae09ddb2df..e80e05d88f 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -342,11 +342,8 @@ public:
 
   class RetrieveJob;
   
-  class DiskSpaceReservationRequest {
-  public:
+  struct DiskSpaceReservationRequest: public std::map<std::string, uint64_t> {
     void addRequest(const std::string &diskSystemName, uint64_t size);
-  private:
-    std::map<std::string, uint64_t> m_spaceMap;
   };
   
 public:
-- 
GitLab