Commit db09ce4c authored by Eric Cano's avatar Eric Cano
Browse files

#533: Partial implementation of disk system space reservation.

parent 34610d2b
......@@ -22,6 +22,7 @@
#include "common/dataStructures/EntryLog.hpp"
#include <string>
#include <list>
#include <set>
namespace cta { namespace disk {
......@@ -66,5 +67,18 @@ private:
};
struct DiskSystemFreeSpace {
uint64_t freeSize;
time_t fetchTime;
};
class DiskSystemFreeSpaceList: public std::map<std::string, DiskSystemFreeSpace> {
public:
DiskSystemFreeSpaceList(DiskSystemList &diskSystemList): m_systemList(diskSystemList) {}
void fetchFileSystemFreeSpace(const std::set<std::string> &fileSystems);
private:
DiskSystemList &m_systemList;
};
}} // namespace cta::common
......@@ -592,3 +592,20 @@ message RepackRequestQueuePointer {
message RepackQueue {
repeated RepackRequestQueuePointer repackrequestpointers = 12200;
}
message DiskSystemSpaceReservation {
required string holder = 12500;
required uint64 size = 12501;
}
message DiskSystemSpace {
required string name = 12300;
required uint64 free_space = 12301;
required uint64 last_measurement_time = 12302;
required uint64 targeted_free_space = 12303;
repeated DiskSystemSpaceReservation reservations =12304;
}
message DiskSystemSpaceRegistry {
repeated DiskSystemSpace disk_systems = 12400;
}
......@@ -3443,7 +3443,7 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo(
// OStoreDB::RetrieveMount::getNextJobBatch()
//------------------------------------------------------------------------------
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount::getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, disk::DiskSystemList& diskSystemList, log::LogContext& logContext) {
uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) {
typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> RQAlgos;
RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
......
......@@ -218,7 +218,10 @@ public:
public:
const MountInfo & getMountInfo() override;
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested,
disk::DiskSystemList& diskSystemList, log::LogContext& logContext) override;
const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) override;
void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override;
void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) override;
void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override;
void complete(time_t completionTime) override;
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override;
......
......@@ -19,6 +19,7 @@
#include "scheduler/RetrieveMount.hpp"
#include "common/Timer.hpp"
#include "common/log/TimingList.hpp"
#include "disk/DiskSystem.hpp"
//------------------------------------------------------------------------------
// constructor
......@@ -141,8 +142,34 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc
disk::DiskSystemList diskSystemList;
if (m_catalogue) diskSystemList = m_catalogue->getAllDiskSystems();
// Try and get a new job from the DB
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch(m_dbMount->getNextJobBatch(filesRequested,
bytesRequested, diskSystemList, logContext));
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch;
{
retryBatchAllocation:
dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, m_fullDiskSystems, logContext);
// Compute the necessary space in each targeted disk system.
SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest;
std::map<std::string, uint64_t> spaceMap;
for (auto &j: dbJobBatch)
if (j->diskSystemName)
diskSpaceReservationRequest.addRequest(j->diskSystemName.value(), j->archiveFile.fileSize);
// Reserve the space.
// We will update this information on-demand during iterations if needed.
disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList);
retrySpaceAllocation:
try {
m_dbMount->reserveDiskSpace(diskSpaceReservationRequest);
} catch (SchedulerDatabase::OutdatedDiskSystemInformation &odsi) {
// Update information for missing/outdated disk systems.
diskSystemFreeSpaceList.fetchFileSystemFreeSpace(odsi.getDiskSsytems());
goto retrySpaceAllocation;
} catch (SchedulerDatabase::FullDiskSystem &fds) {
// Mark the disk systems as full for the mount. Re-queue all requests, repeat the pop attempt.
for (auto &ds: fds.getDiskSsytems()) m_fullDiskSystems.insert(ds);
m_dbMount->requeueJobBatch(dbJobBatch);
dbJobBatch.clear();
goto retryBatchAllocation;
}
}
std::list<std::unique_ptr<RetrieveJob>> ret;
// We prepare the response
for (auto & sdrj: dbJobBatch) {
......
......@@ -233,6 +233,10 @@ namespace cta {
/** An initialized-once factory for archive reports (indirectly used by ArchiveJobs) */
disk::DiskReporterFactory m_reporterFactory;
/**
* Internal tracking of the full disk systems. It is one strike out (for the mount duration).
*/
std::set<std::string> m_fullDiskSystems;
}; // class RetrieveMount
......
......@@ -32,4 +32,13 @@ SchedulerDatabase::RepackRequestStatistics::RepackRequestStatistics() {
operator [](s) = 0;
}
void SchedulerDatabase::DiskSpaceReservationRequest::addRequest(const std::string& diskSystemName, uint64_t size) {
try {
m_spaceMap.at(diskSystemName) += size;
} catch (std::out_of_range &) {
m_spaceMap[diskSystemName] = size;
}
}
} //namespace cta
......@@ -341,6 +341,35 @@ public:
/*============ Retrieve management: tape server side ======================*/
class RetrieveJob;
class DiskSpaceReservationRequest {
public:
void addRequest(const std::string &diskSystemName, uint64_t size);
private:
std::map<std::string, uint64_t> m_spaceMap;
};
private:
class ProblemDiskSystemList: public exception::Exception {
using cta::exception::Exception::Exception;
public:
const std::set<std::string> &getDiskSsytems() { return m_outdatedDiskSystems; }
void addDiskSystem(const std::string &diskSystenName) { m_outdatedDiskSystems.insert(diskSystenName); }
private:
std::set<std::string> m_outdatedDiskSystems;
};
public:
/** An exception allowing the reservation function to de called again with up to date free space information from the
* disk systems.*/
class OutdatedDiskSystemInformation: public ProblemDiskSystemList { using ProblemDiskSystemList::ProblemDiskSystemList; };
/** An exception allowing the reservation system to report disk systems for which the free space could not be reserved */
class FullDiskSystem: public ProblemDiskSystemList { using ProblemDiskSystemList::ProblemDiskSystemList; };
/** Clear all reservation for an agent. Used at agent cleanup and garbage collection time, so not in retrieve mount context. */
void clearDiskReservation(const std::string);
class RetrieveMount {
public:
struct MountInfo {
......@@ -358,7 +387,17 @@ public:
} mountInfo;
virtual const MountInfo & getMountInfo() = 0;
virtual std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, disk::DiskSystemList & diskSystemList, log::LogContext& logContext) = 0;
uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) = 0;
virtual void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> & jobBatch) = 0;
/* Mount-level disk reservation functions. */
/** Attempt to reserve, can throw OutdatedDiskSystemInformation or FullDiskSystem. Does NOT proceed with any reservation
* in case of throw. */
virtual void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) = 0;
/** Release some space for an agent and destination. */
virtual void releaseDiskSpace(const std::string &reservingAgent, const std::string &diskSystemName, uint64_t size) = 0;
virtual void complete(time_t completionTime) = 0;
virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0;
virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0;
......
......@@ -319,6 +319,7 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) {
common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"});
diskSystemList.push_back(cta::disk::DiskSystem{"ds-B", "$root://b.disk.system/", "query:todo", 60, 10UL*1000*1000*1000,
common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"});
std::set<std::string> fullDiskSystems;
// Inject 10 retrieve jobs to the db.
const size_t filesToDo = 10;
......@@ -368,7 +369,7 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) {
auto moutInfo = db.getMountInfo(lc);
ASSERT_EQ(1, moutInfo->potentialMounts.size());
auto rm=moutInfo->createRetrieveMount("vid", "tapePool", "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr), cta::nullopt);
auto rjb = rm->getNextJobBatch(20,20*1000,diskSystemList, lc);
auto rjb = rm->getNextJobBatch(20,20*1000,fullDiskSystems, lc);
ASSERT_EQ(filesToDo, rjb.size());
std::list <cta::SchedulerDatabase::RetrieveJob*> jobBatch;
for (auto &rj: rjb) {
......@@ -379,7 +380,7 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) {
}
rm->flushAsyncSuccessReports(jobBatch, lc);
rjb.clear();
ASSERT_EQ(0, rm->getNextJobBatch(20,20*1000,diskSystemList, lc).size());
ASSERT_EQ(0, rm->getNextJobBatch(20,20*1000,fullDiskSystems, lc).size());
rm->complete(time(nullptr));
rm.reset(nullptr);
moutInfo.reset(nullptr);
......
......@@ -43,7 +43,11 @@ namespace unitTests{
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested,
cta::disk::DiskSystemList& diskSystemList, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
const std::set<std::string> &fullDiskSystems, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override { throw std::runtime_error("Not implemented");}
void reserveDiskSpace(const cta::SchedulerDatabase::DiskSpaceReservationRequest& diskSpaceReservation) override { throw std::runtime_error("Not implemented");}
void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
......
......@@ -36,7 +36,11 @@ namespace unitTests{
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::disk::DiskSystemList& diskSystemList, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override { throw std::runtime_error("Not implemented");}
void reserveDiskSpace(const cta::SchedulerDatabase::DiskSpaceReservationRequest& diskSpaceReservation) override { throw std::runtime_error("Not implemented");}
void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
......
......@@ -132,7 +132,10 @@ namespace unitTests
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested,
cta::disk::DiskSystemList& diskSystemList, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
const std::set<std::string> &fullDiskSystems, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override { throw std::runtime_error("Not implemented");}
void reserveDiskSpace(const cta::SchedulerDatabase::DiskSpaceReservationRequest& diskSpaceReservation) override { throw std::runtime_error("Not implemented");}
void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment