Commit 6385126d authored by Eric Cano's avatar Eric Cano
Browse files

#533 Changed strategy for implementation.

Moved the space reservation information to the DriveStatus object store object instead of a new central registry.
The central registry would have been a single point of contention as was the DriveRegistry before being split into
DriveStates. As the problem is so close to the one of the drive status, we can actually reuse the drive status for this purpose.

The algorithm will also change as we move the responsibility of querying the free space from the disk systems into the OStoreDb
object instead of the Scheduler. This leads to a slightly worth layering of responsibilities, making the OStoreDb::RetrieveMount
object a client of the disk::DiskSystemFreeSpaceList object.

The current implementation will also query the free space from the disk systems on each pop, instead of doing so in a globally
cached fashion. With the new model, we could cache the free space per drive (if needed), but not globally. This is not expected
to be a real issue and free space is a global counter in the disk system, expected to be readily available.
parent e152a3fb
......@@ -64,7 +64,7 @@ struct ArchiveFile {
std::string storageClass;
DiskFileInfo diskFileInfo;
/**
* This map represents the non-necessarily-exhaustive set of tape copies
* This list represents the non-necessarily-exhaustive set of tape copies
* to be listed by the operator. For example, if the listing requested is
* for a single tape, the map will contain only one element.
*/
......
......@@ -174,6 +174,70 @@ void DriveState::setState(cta::common::dataStructures::DriveState& state) {
}
}
//------------------------------------------------------------------------------
// DriveState::getDiskSpaceReservations())
//------------------------------------------------------------------------------
std::map<std::string, uint64_t> DriveState::getDiskSpaceReservations() {
checkHeaderReadable();
std::map<std::string, uint64_t> ret;
for (auto &dsr: m_payload.disk_space_reservations()) {
ret[dsr.disk_system_name()] = dsr.reserved_bytes();
}
return ret;
}
//------------------------------------------------------------------------------
// DriveState::addDiskSpaceReservation())
//------------------------------------------------------------------------------
void DriveState::addDiskSpaceReservation(const std::string& diskSystemName, uint64_t bytes) {
checkPayloadWritable();
for (auto dsr: *m_payload.mutable_disk_space_reservations()) {
if (dsr.disk_system_name() == diskSystemName) {
dsr.set_reserved_bytes(dsr.reserved_bytes() + bytes);
return;
}
}
auto * newDsr = m_payload.mutable_disk_space_reservations()->Add();
newDsr->set_disk_system_name(diskSystemName);
newDsr->set_reserved_bytes(bytes);
}
//------------------------------------------------------------------------------
// DriveState::substractDiskSpaceReservation())
//------------------------------------------------------------------------------
void DriveState::substractDiskSpaceReservation(const std::string& diskSystemName, uint64_t bytes) {
checkPayloadWritable();
size_t index=0;
for (auto dsr: *m_payload.mutable_disk_space_reservations()) {
if (dsr.disk_system_name() == diskSystemName) {
if (bytes > dsr.reserved_bytes())
throw NegativeDiskSpaceReservationReached(
"In DriveState::substractDiskSpaceReservation(): we would reach a negative reservation size.");
dsr.set_reserved_bytes(dsr.reserved_bytes() - bytes);
if (!dsr.reserved_bytes()) {
// We can remove this entry from the list.
auto * mdsr = m_payload.mutable_disk_space_reservations();
mdsr->SwapElements(index, mdsr->size()-1);
mdsr->RemoveLast();
}
return;
} else {
++index;
}
}
if (bytes)
throw NegativeDiskSpaceReservationReached(
"In DriveState::substractDiskSpaceReservation(): Trying to substract bytes without previous reservation.");
}
//------------------------------------------------------------------------------
// DriveState::resetDiskSpaceReservation())
//------------------------------------------------------------------------------
void DriveState::resetDiskSpaceReservation() {
checkPayloadWritable();
m_payload.mutable_disk_space_reservations()->Clear();
}
//------------------------------------------------------------------------------
// DriveState::dump())
//------------------------------------------------------------------------------
......
......@@ -46,6 +46,12 @@ public:
cta::common::dataStructures::DriveState getState();
void setState(cta::common::dataStructures::DriveState & state);
std::map<std::string, uint64_t> getDiskSpaceReservations();
void addDiskSpaceReservation(const std::string & diskSystemName, uint64_t bytes);
CTA_GENERATE_EXCEPTION_CLASS(NegativeDiskSpaceReservationReached);
void substractDiskSpaceReservation(const std::string & diskSystemName, uint64_t bytes);
void resetDiskSpaceReservation();
/**
* JSON dump of the drive state
* @return
......
......@@ -614,7 +614,7 @@ auto RetrieveQueue::dumpJobs() -> std::list<JobDump> {
return ret;
}
auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip) -> CandidateJobList {
auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, const std::set<std::string> & retrieveRequestsToSkip, const std::set<std::string> & diskSystemsToSkip) -> CandidateJobList {
checkPayloadReadable();
CandidateJobList ret;
for(auto & rqsp: m_payload.retrievequeueshards()) {
......@@ -623,7 +623,8 @@ auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::
// Fetch the shard
RetrieveQueueShard rqs(rqsp.address(), m_objectStore);
rqs.fetchNoLock();
auto shardCandidates = rqs.getCandidateJobList(maxBytes - ret.candidateBytes, maxFiles - ret.candidateFiles, retrieveRequestsToSkip);
auto shardCandidates = rqs.getCandidateJobList(maxBytes - ret.candidateBytes, maxFiles - ret.candidateFiles,
retrieveRequestsToSkip, diskSystemsToSkip);
ret.candidateBytes += shardCandidates.candidateBytes;
ret.candidateFiles += shardCandidates.candidateFiles;
// We overwrite the remaining values each time as the previous
......
......@@ -115,7 +115,8 @@ public:
};
// The set of retrieve requests to skip are requests previously identified by the caller as bad,
// which still should be removed from the queue. They will be disregarded from listing.
CandidateJobList getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip);
CandidateJobList getCandidateList(uint64_t maxBytes, uint64_t maxFiles, const std::set<std::string> & retrieveRequestsToSkip,
const std::set<std::string> & diskSystemsToSkip);
//! Return a summary of the number of jobs and number of bytes in the queue
CandidateJobList getCandidateSummary();
......
......@@ -455,6 +455,7 @@ struct ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::PopCriteri
files -= pes.files;
return *this;
}
std::set<std::string> diskSystemsToSkip;
};
template<>
......@@ -486,12 +487,18 @@ struct ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::PoppedElem
template<typename C>
auto ContainerTraits<RetrieveQueue,C>::
getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip,
getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elementsToSkip,
log::LogContext &lc) -> PoppedElementsBatch
{
PoppedElementsBatch ret;
auto candidateJobsFromQueue = cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files, elemtsToSkip);
auto candidateJobsFromQueue = cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files,
elementsToSkip,
// This parameter is needed only in the specialized version:
// auto ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::getPoppingElementsCandidates
// We provide an empty set here.
std::set<std::string>()
);
for(auto &cjfq : candidateJobsFromQueue.candidates) {
ret.elements.emplace_back(PoppedElement{
cta::make_unique<RetrieveRequest>(cjfq.address, cont.m_objectStore),
......
......@@ -62,7 +62,7 @@ void RetrieveQueueShard::garbageCollect(const std::string& presumedOwner, AgentR
throw exception::Exception("In RetrieveQueueShard::garbageCollect(): garbage collection should not be necessary for this type of object.");
}
RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip) {
RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, const std::set<std::string> & retrieveRequestsToSkip, const std::set<std::string> & diskSystemsToSkip) {
checkPayloadReadable();
RetrieveQueue::CandidateJobList ret;
ret.remainingBytesAfterCandidates = m_payload.retrievejobstotalsize();
......
......@@ -112,7 +112,8 @@ public:
*/
RemovalResult removeJobs(const std::list<std::string> & jobsToRemove);
RetrieveQueue::CandidateJobList getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip);
RetrieveQueue::CandidateJobList getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles,
const std::set<std::string> & retrieveRequestsToSkip, const std::set<std::string> & diskSystemsToSkip);
/** Re compute summaries in case they do not match the array content. */
void rebuild();
......
......@@ -124,7 +124,8 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) {
ASSERT_EQ(minStartTime, rq.getJobsSummary().oldestJobStartTime);
uint64_t nextExpectedFseq=0;
while (rq.getJobsSummary().jobs) {
auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>());
auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>(),
std::set<std::string>());
std::set<std::string> jobsToSkip;
std::list<std::string> jobsToDelete;
for (auto &j: candidateJobs.candidates) {
......@@ -135,7 +136,7 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) {
jobsToDelete.emplace_back(j.address);
nextExpectedFseq++;
}
auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip);
auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip, std::set<std::string>());
if (candidateJobs2.candidateFiles) {
std::stringstream address;
address << "someRequest-" << nextExpectedFseq;
......@@ -245,7 +246,8 @@ TEST(ObjectStore, RetrieveQueueActivityCounts) {
ASSERT_EQ(0.2, jsB->weight);
uint64_t nextExpectedFseq=0;
while (rq.getJobsSummary().jobs) {
auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>());
auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>(),
std::set<std::string>());
std::set<std::string> jobsToSkip;
std::list<std::string> jobsToDelete;
for (auto &j: candidateJobs.candidates) {
......@@ -256,7 +258,7 @@ TEST(ObjectStore, RetrieveQueueActivityCounts) {
jobsToDelete.emplace_back(j.address);
nextExpectedFseq++;
}
auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip);
auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip, std::set<std::string>());
if (candidateJobs2.candidateFiles) {
std::stringstream address;
address << "someRequest-" << nextExpectedFseq;
......
......@@ -41,12 +41,12 @@ addToLog(log::ScopedParamContainer &params) const {
template<>
auto ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::
getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip,
getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elementsToSkip,
log::LogContext &lc) -> PoppedElementsBatch
{
PoppedElementsBatch ret;
auto candidateJobsFromQueue = cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip);
auto candidateJobsFromQueue = cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elementsToSkip, unfulfilledCriteria.diskSystemsToSkip);
for(auto &cjfq : candidateJobsFromQueue.candidates) {
ret.elements.emplace_back(PoppedElement{
cta::make_unique<RetrieveRequest>(cjfq.address, cont.m_objectStore),
......
......@@ -34,6 +34,7 @@ enum ObjectType {
RepackRequest_t = 11;
RepackIndex_t = 12;
RepackQueue_t = 13;
DiskSystemSpaceRegister_t = 14;
GenericObject_t = 1000;
}
......@@ -206,6 +207,12 @@ message ArchiveFile {
// ------------- Drives handling ----------------------------------------------
message DiskSpaceReservation {
// Each drive keeps tabs of its intended
required string disk_system_name = 5100;
required uint64 reserved_bytes = 5110;
}
message DriveState {
required string drivename = 5000;
required string host = 5001;
......@@ -241,6 +248,7 @@ message DriveState {
optional uint64 next_priority = 5031;
optional string next_activity = 5032;
optional double next_activity_weight = 5033;
repeated DiskSpaceReservation disk_space_reservations = 5034;
// TODO: implement or remove required EntryLog creationlog = 5023;
}
......@@ -592,20 +600,3 @@ message RepackRequestQueuePointer {
message RepackQueue {
repeated RepackRequestQueuePointer repackrequestpointers = 12200;
}
message DiskSystemSpaceReservation {
required string holder = 12500;
required uint64 size = 12501;
}
message DiskSystemSpace {
required string name = 12300;
required uint64 free_space = 12301;
required uint64 last_measurement_time = 12302;
required uint64 targeted_free_space = 12303;
repeated DiskSystemSpaceReservation reservations =12304;
}
message DiskSystemSpaceRegistry {
repeated DiskSystemSpace disk_systems = 12400;
}
......@@ -2365,18 +2365,18 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
}
}
// We now have created the subrequests. Time to enqueue.
// TODO: the lock/fetch could be parallelized
{
objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue);
std::list<std::unique_ptr<objectstore::ScopedExclusiveLock>> locks;
std::list<objectstore::ScopedExclusiveLock> locks;
for (auto &is: asyncInsertedSubrequestInfoList) {
locks.push_back(cta::make_unique<objectstore::ScopedExclusiveLock>(*is.request));
locks.emplace_back(*is.request);
is.request->fetch();
sorter.insertRetrieveRequest(is.request, *m_oStoreDB.m_agentReference, is.activeCopyNb, lc);
}
locks.clear();
sorter.flushAll(lc);
}
}
//------------------------------------------------------------------------------
......@@ -2402,6 +2402,9 @@ void OStoreDB::RepackRequest::fail() {
m_repackRequest.commit();
}
//------------------------------------------------------------------------------
// OStoreDB::RepackRequest::requeueInToExpandQueue()
//------------------------------------------------------------------------------
void OStoreDB::RepackRequest::requeueInToExpandQueue(log::LogContext& lc){
ScopedExclusiveLock rrl(m_repackRequest);
m_repackRequest.fetch();
......@@ -2418,6 +2421,9 @@ void OStoreDB::RepackRequest::requeueInToExpandQueue(log::LogContext& lc){
rqteAlgo.referenceAndSwitchOwnership(nullopt, previousOwner, insertedElements, lc);
}
//------------------------------------------------------------------------------
// OStoreDB::RepackRequest::setExpandStartedAndChangeStatus()
//------------------------------------------------------------------------------
void OStoreDB::RepackRequest::setExpandStartedAndChangeStatus(){
ScopedExclusiveLock rrl(m_repackRequest);
m_repackRequest.fetch();
......@@ -2426,6 +2432,9 @@ void OStoreDB::RepackRequest::setExpandStartedAndChangeStatus(){
m_repackRequest.commit();
}
//------------------------------------------------------------------------------
// OStoreDB::RepackRequest::fillLastExpandedFSeqAndTotalStatsFile()
//------------------------------------------------------------------------------
void OStoreDB::RepackRequest::fillLastExpandedFSeqAndTotalStatsFile(uint64_t& fSeq, TotalStatsFiles& totalStatsFiles) {
ScopedExclusiveLock rrl(m_repackRequest);
m_repackRequest.fetch();
......@@ -3443,12 +3452,30 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo(
// OStoreDB::RetrieveMount::getNextJobBatch()
//------------------------------------------------------------------------------
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount::getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) {
uint64_t bytesRequested, cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) {
// Pop a batch of files to retrieve and, for the ones having a documented disk system name, reserve the space
// that they will require. In case we cannot allocate the space for some of them, mark the destination filesystem as
// full and stop popping from it, after requeueing the jobs.
typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> RQAlgos;
RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
popCriteria.diskSystemsToSkip = m_diskSystemsToSkip;
auto jobs = rqAlgos.popNextBatch(mountInfo.vid, popCriteria, logContext);
// We can construct the return value
// Try and allocate data for the popped jobs.
// Compute the necessary space in each targeted disk system.
SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest;
std::map<std::string, uint64_t> spaceMap;
for (auto &j: jobs.elements)
if (j.diskSystemName)
diskSpaceReservationRequest.addRequest(j.diskSystemName.value(), j.archiveFile.fileSize);
// Get the existing reservation map from the other drives.
auto otherDrivesReservations = getExistingDrivesReservations();
// Get the free space from disk systems involved.
// If any file system does not have enough space, make it as full, requeue all (slight but rare inefficiency)
// and retry the pop.
// Else, we can construct the return value (we did not hit any full disk system.
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> ret;
for(auto &j : jobs.elements)
{
......@@ -3466,6 +3493,71 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMou
return ret;
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::requeueJobBatch()
//------------------------------------------------------------------------------
void OStoreDB::RetrieveMount::requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch,
log::LogContext& logContext) {
objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue);
std::list<std::shared_ptr<objectstore::RetrieveRequest>> rrlist;
std::list<objectstore::ScopedExclusiveLock> locks;
for (auto & j: jobBatch) {
cta::OStoreDB::RetrieveJob *rj = cta::OStoreDB::castFromSchedDBJob(j.get());
auto rr = std::make_shared<objectstore::RetrieveRequest>(rj->m_retrieveRequest.getAddressIfSet(), m_oStoreDB.m_objectStore);
rrlist.push_back(rr);
locks.emplace_back(*rr);
rr->fetch();
sorter.insertRetrieveRequest(rr, *m_oStoreDB.m_agentReference, rj->selectedCopyNb, logContext);
}
locks.clear();
rrlist.clear();
sorter.flushAll(logContext);
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::getExistingDrivesReservations()
//------------------------------------------------------------------------------
std::map<std::string, uint64_t> OStoreDB::RetrieveMount::getExistingDrivesReservations() {
objectstore::RootEntry re(m_oStoreDB.m_objectStore);
re.fetchNoLock();
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_oStoreDB.m_objectStore);
dr.fetchNoLock();
auto driveAddresses = dr.getDriveAddresses();
std::list <objectstore::DriveState> dsList;
std::list <std::unique_ptr<objectstore::DriveState::AsyncLockfreeFetcher>> dsFetchers;
for (auto &d: driveAddresses) {
dsList.emplace_back(d.driveStateAddress, m_oStoreDB.m_objectStore);
dsFetchers.emplace_back(dsList.back().asyncLockfreeFetch());
}
auto dsf = dsFetchers.begin();
std::map<std::string, uint64_t> ret;
for (auto &d: dsList) {
try {
(*dsf)->wait();
dsf++;
for (auto &dsr: d.getDiskSpaceReservations()) {
try {
ret.at(dsr.first) += dsr.second;
} catch (std::out_of_range &) {
ret[dsr.first] = dsr.second;
}
}
} catch (objectstore::Backend::NoSuchObject) {
// If the drive status is not there, we just skip it.
dsf++;
}
}
return ret;
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::reserveDiskSpace()
//------------------------------------------------------------------------------
void OStoreDB::RetrieveMount::reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) {
// Try and add our reservation to the disk
throw exception::Exception("In OStoreDB::RetrieveMount::reserveDiskSpace(): not implemented.");
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::complete()
//------------------------------------------------------------------------------
......
......@@ -218,10 +218,14 @@ public:
public:
const MountInfo & getMountInfo() override;
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested,
const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) override;
void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override;
void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) override;
void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override;
cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) override;
private:
void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch,
log::LogContext& logContext);
std::map<std::string, uint64_t> getExistingDrivesReservations();
void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation);
std::set<std::string> m_diskSystemsToSkip;
public:
void complete(time_t completionTime) override;
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override;
......
......@@ -141,35 +141,40 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc
// Get the current file systems list from the catalogue
disk::DiskSystemList diskSystemList;
if (m_catalogue) diskSystemList = m_catalogue->getAllDiskSystems();
// Try and get a new job from the DB
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch;
{
retryBatchAllocation:
dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, m_fullDiskSystems, logContext);
// Compute the necessary space in each targeted disk system.
SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest;
std::map<std::string, uint64_t> spaceMap;
for (auto &j: dbJobBatch)
if (j->diskSystemName)
diskSpaceReservationRequest.addRequest(j->diskSystemName.value(), j->archiveFile.fileSize);
// Reserve the space.
// We will update this information on-demand during iterations if needed.
disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList);
retrySpaceAllocation:
try {
m_dbMount->reserveDiskSpace(diskSpaceReservationRequest);
} catch (SchedulerDatabase::OutdatedDiskSystemInformation &odsi) {
// Update information for missing/outdated disk systems.
diskSystemFreeSpaceList.fetchFileSystemFreeSpace(odsi.getDiskSsytems());
goto retrySpaceAllocation;
} catch (SchedulerDatabase::FullDiskSystem &fds) {
// Mark the disk systems as full for the mount. Re-queue all requests, repeat the pop attempt.
for (auto &ds: fds.getDiskSsytems()) m_fullDiskSystems.insert(ds);
m_dbMount->requeueJobBatch(dbJobBatch);
dbJobBatch.clear();
goto retryBatchAllocation;
}
}
// TODO: the diskSystemFreeSpaceList could be made a member of the retrieve mount and cache the fetched values, limiting the re-querying
// of the disk systems free space.
disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList (diskSystemList);
// Try and get a new job from the DB. The DB mount (in memory object) is taking care of reserving the free space for the popped
// elements and query the disk systems, via the diskSystemFreeSpaceList object.
auto dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, diskSystemFreeSpaceList, logContext);
// std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch;
// {
// retryBatchAllocation:
// dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, diskSystemFreeSpaceList, logContext);
// // Compute the necessary space in each targeted disk system.
// SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest;
// std::map<std::string, uint64_t> spaceMap;
// for (auto &j: dbJobBatch)
// if (j->diskSystemName)
// diskSpaceReservationRequest.addRequest(j->diskSystemName.value(), j->archiveFile.fileSize);
// // Reserve the space.
// // We will update this information on-demand during iterations if needed.
// disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList);
// retrySpaceAllocation:
// try {
// m_dbMount->reserveDiskSpace(diskSpaceReservationRequest);
// } catch (SchedulerDatabase::OutdatedDiskSystemInformation &odsi) {
// // Update information for missing/outdated disk systems.
// diskSystemFreeSpaceList.fetchFileSystemFreeSpace(odsi.getDiskSsytems());
// goto retrySpaceAllocation;
// } catch (SchedulerDatabase::FullDiskSystem &fds) {
// // Mark the disk systems as full for the mount. Re-queue all requests, repeat the pop attempt.
// for (auto &ds: fds.getDiskSsytems()) m_fullDiskSystems.insert(ds);
// m_dbMount->requeueJobBatch(dbJobBatch, logContext);
// dbJobBatch.clear();
// goto retryBatchAllocation;
// }
// }
std::list<std::unique_ptr<RetrieveJob>> ret;
// We prepare the response
for (auto & sdrj: dbJobBatch) {
......
......@@ -349,27 +349,7 @@ public:
std::map<std::string, uint64_t> m_spaceMap;
};
private:
class ProblemDiskSystemList: public exception::Exception {
using cta::exception::Exception::Exception;
public:
const std::set<std::string> &getDiskSsytems() { return m_outdatedDiskSystems; }
void addDiskSystem(const std::string &diskSystenName) { m_outdatedDiskSystems.insert(diskSystenName); }
private:
std::set<std::string> m_outdatedDiskSystems;
};
public:
/** An exception allowing the reservation function to de called again with up to date free space information from the
* disk systems.*/
class OutdatedDiskSystemInformation: public ProblemDiskSystemList { using ProblemDiskSystemList::ProblemDiskSystemList; };
/** An exception allowing the reservation system to report disk systems for which the free space could not be reserved */
class FullDiskSystem: public ProblemDiskSystemList { using ProblemDiskSystemList::ProblemDiskSystemList; };
/** Clear all reservation for an agent. Used at agent cleanup and garbage collection time, so not in retrieve mount context. */
void clearDiskReservation(const std::string);
class RetrieveMount {
public:
struct MountInfo {
......@@ -387,17 +367,8 @@ public:
} mountInfo;
virtual const MountInfo & getMountInfo() = 0;
virtual std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) = 0;
virtual void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> & jobBatch) = 0;
/* Mount-level disk reservation functions. */
/** Attempt to reserve, can throw OutdatedDiskSystemInformation or FullDiskSystem. Does NOT proceed with any reservation
* in case of throw. */
virtual void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) = 0;
/** Release some space for an agent and destination. */
virtual void releaseDiskSpace(const std::string &reservingAgent, const std::string &diskSystemName, uint64_t size) = 0;
uint64_t bytesRequested, cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) = 0;
// virtual void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> & jobBatch, log::LogContext& logContext) = 0;
virtual void complete(time_t completionTime) = 0;
virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0;
virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0;
......
......@@ -315,11 +315,11 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) {
// Create the disk system list
cta::disk::DiskSystemList diskSystemList;
cta::disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList);
diskSystemList.push_back(cta::disk::DiskSystem{"ds-A", "$root://a.disk.system/", "query:todo", 60, 10UL*1000*1000*1000,
common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"});
diskSystemList.push_back(cta::disk::DiskSystem{"ds-B", "$root://b.disk.system/", "query:todo", 60, 10UL*1000*1000*1000,
common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"});
std::set<std::string> fullDiskSystems;