diff --git a/common/dataStructures/ArchiveFile.hpp b/common/dataStructures/ArchiveFile.hpp index 4f20ea68910091d0a26d2ebe646b0381b32beb40..9929eb50ed3570acba5f31fc7094ab7f05ec7a3d 100644 --- a/common/dataStructures/ArchiveFile.hpp +++ b/common/dataStructures/ArchiveFile.hpp @@ -64,7 +64,7 @@ struct ArchiveFile { std::string storageClass; DiskFileInfo diskFileInfo; /** - * This map represents the non-necessarily-exhaustive set of tape copies + * This list represents the non-necessarily-exhaustive set of tape copies * to be listed by the operator. For example, if the listing requested is * for a single tape, the map will contain only one element. */ diff --git a/objectstore/DriveState.cpp b/objectstore/DriveState.cpp index 634d9c71ae3d908023aeb6a86a7fec4619a649b9..7434180bad6c01642bc49f78577ee6b70296078e 100644 --- a/objectstore/DriveState.cpp +++ b/objectstore/DriveState.cpp @@ -174,6 +174,70 @@ void DriveState::setState(cta::common::dataStructures::DriveState& state) { } } +//------------------------------------------------------------------------------ +// DriveState::getDiskSpaceReservations()) +//------------------------------------------------------------------------------ +std::map<std::string, uint64_t> DriveState::getDiskSpaceReservations() { + checkHeaderReadable(); + std::map<std::string, uint64_t> ret; + for (auto &dsr: m_payload.disk_space_reservations()) { + ret[dsr.disk_system_name()] = dsr.reserved_bytes(); + } + return ret; +} + +//------------------------------------------------------------------------------ +// DriveState::addDiskSpaceReservation()) +//------------------------------------------------------------------------------ +void DriveState::addDiskSpaceReservation(const std::string& diskSystemName, uint64_t bytes) { + checkPayloadWritable(); + for (auto dsr: *m_payload.mutable_disk_space_reservations()) { + if (dsr.disk_system_name() == diskSystemName) { + dsr.set_reserved_bytes(dsr.reserved_bytes() + bytes); + return; + } + } + auto * newDsr = m_payload.mutable_disk_space_reservations()->Add(); + newDsr->set_disk_system_name(diskSystemName); + newDsr->set_reserved_bytes(bytes); +} + +//------------------------------------------------------------------------------ +// DriveState::substractDiskSpaceReservation()) +//------------------------------------------------------------------------------ +void DriveState::substractDiskSpaceReservation(const std::string& diskSystemName, uint64_t bytes) { + checkPayloadWritable(); + size_t index=0; + for (auto dsr: *m_payload.mutable_disk_space_reservations()) { + if (dsr.disk_system_name() == diskSystemName) { + if (bytes > dsr.reserved_bytes()) + throw NegativeDiskSpaceReservationReached( + "In DriveState::substractDiskSpaceReservation(): we would reach a negative reservation size."); + dsr.set_reserved_bytes(dsr.reserved_bytes() - bytes); + if (!dsr.reserved_bytes()) { + // We can remove this entry from the list. + auto * mdsr = m_payload.mutable_disk_space_reservations(); + mdsr->SwapElements(index, mdsr->size()-1); + mdsr->RemoveLast(); + } + return; + } else { + ++index; + } + } + if (bytes) + throw NegativeDiskSpaceReservationReached( + "In DriveState::substractDiskSpaceReservation(): Trying to substract bytes without previous reservation."); +} + +//------------------------------------------------------------------------------ +// DriveState::resetDiskSpaceReservation()) +//------------------------------------------------------------------------------ +void DriveState::resetDiskSpaceReservation() { + checkPayloadWritable(); + m_payload.mutable_disk_space_reservations()->Clear(); +} + //------------------------------------------------------------------------------ // DriveState::dump()) //------------------------------------------------------------------------------ diff --git a/objectstore/DriveState.hpp b/objectstore/DriveState.hpp index 4c7d8da08936aebeb5b3324d72a2ff6e5afe2eed..afe22a049c90c1fb586b68c4fe410c59c44f023a 100644 --- a/objectstore/DriveState.hpp +++ b/objectstore/DriveState.hpp @@ -46,6 +46,12 @@ public: cta::common::dataStructures::DriveState getState(); void setState(cta::common::dataStructures::DriveState & state); + std::map<std::string, uint64_t> getDiskSpaceReservations(); + void addDiskSpaceReservation(const std::string & diskSystemName, uint64_t bytes); + CTA_GENERATE_EXCEPTION_CLASS(NegativeDiskSpaceReservationReached); + void substractDiskSpaceReservation(const std::string & diskSystemName, uint64_t bytes); + void resetDiskSpaceReservation(); + /** * JSON dump of the drive state * @return diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index 6536126d60bbd223149e0e458869e32dc631a5d5..1765aaabcaf3823f77de6f4a9be213718b39d3bf 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -614,7 +614,7 @@ auto RetrieveQueue::dumpJobs() -> std::list<JobDump> { return ret; } -auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip) -> CandidateJobList { +auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, const std::set<std::string> & retrieveRequestsToSkip, const std::set<std::string> & diskSystemsToSkip) -> CandidateJobList { checkPayloadReadable(); CandidateJobList ret; for(auto & rqsp: m_payload.retrievequeueshards()) { @@ -623,7 +623,8 @@ auto RetrieveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std:: // Fetch the shard RetrieveQueueShard rqs(rqsp.address(), m_objectStore); rqs.fetchNoLock(); - auto shardCandidates = rqs.getCandidateJobList(maxBytes - ret.candidateBytes, maxFiles - ret.candidateFiles, retrieveRequestsToSkip); + auto shardCandidates = rqs.getCandidateJobList(maxBytes - ret.candidateBytes, maxFiles - ret.candidateFiles, + retrieveRequestsToSkip, diskSystemsToSkip); ret.candidateBytes += shardCandidates.candidateBytes; ret.candidateFiles += shardCandidates.candidateFiles; // We overwrite the remaining values each time as the previous diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index f3337bc2d78e4293252fb60eeadd9bfd84f9010e..c7a42a889eb010363e4bbc35cbf87d0bec9edc9c 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -115,7 +115,8 @@ public: }; // The set of retrieve requests to skip are requests previously identified by the caller as bad, // which still should be removed from the queue. They will be disregarded from listing. - CandidateJobList getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip); + CandidateJobList getCandidateList(uint64_t maxBytes, uint64_t maxFiles, const std::set<std::string> & retrieveRequestsToSkip, + const std::set<std::string> & diskSystemsToSkip); //! Return a summary of the number of jobs and number of bytes in the queue CandidateJobList getCandidateSummary(); diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 450108e59888b8ff103b024b33ae6ec55197a02e..fb932847b00597dafec95dcfa41d934afbc9c411 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -455,6 +455,7 @@ struct ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::PopCriteri files -= pes.files; return *this; } + std::set<std::string> diskSystemsToSkip; }; template<> @@ -486,12 +487,18 @@ struct ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::PoppedElem template<typename C> auto ContainerTraits<RetrieveQueue,C>:: -getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip, +getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elementsToSkip, log::LogContext &lc) -> PoppedElementsBatch { PoppedElementsBatch ret; - auto candidateJobsFromQueue = cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files, elemtsToSkip); + auto candidateJobsFromQueue = cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files, + elementsToSkip, + // This parameter is needed only in the specialized version: + // auto ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>::getPoppingElementsCandidates + // We provide an empty set here. + std::set<std::string>() + ); for(auto &cjfq : candidateJobsFromQueue.candidates) { ret.elements.emplace_back(PoppedElement{ cta::make_unique<RetrieveRequest>(cjfq.address, cont.m_objectStore), diff --git a/objectstore/RetrieveQueueShard.cpp b/objectstore/RetrieveQueueShard.cpp index 9c00e31696d4346e93a973c4b14a59944281a3ce..763bf584e259d0a9fc701c6037172d5e8930452d 100644 --- a/objectstore/RetrieveQueueShard.cpp +++ b/objectstore/RetrieveQueueShard.cpp @@ -62,7 +62,7 @@ void RetrieveQueueShard::garbageCollect(const std::string& presumedOwner, AgentR throw exception::Exception("In RetrieveQueueShard::garbageCollect(): garbage collection should not be necessary for this type of object."); } -RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip) { +RetrieveQueue::CandidateJobList RetrieveQueueShard::getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, const std::set<std::string> & retrieveRequestsToSkip, const std::set<std::string> & diskSystemsToSkip) { checkPayloadReadable(); RetrieveQueue::CandidateJobList ret; ret.remainingBytesAfterCandidates = m_payload.retrievejobstotalsize(); diff --git a/objectstore/RetrieveQueueShard.hpp b/objectstore/RetrieveQueueShard.hpp index 8eef6d0af623ed8038e1b97a21160a546d4d2920..fa798e4b439fca79104282dcb501b2ec0b53cdec 100644 --- a/objectstore/RetrieveQueueShard.hpp +++ b/objectstore/RetrieveQueueShard.hpp @@ -112,7 +112,8 @@ public: */ RemovalResult removeJobs(const std::list<std::string> & jobsToRemove); - RetrieveQueue::CandidateJobList getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, std::set<std::string> retrieveRequestsToSkip); + RetrieveQueue::CandidateJobList getCandidateJobList(uint64_t maxBytes, uint64_t maxFiles, + const std::set<std::string> & retrieveRequestsToSkip, const std::set<std::string> & diskSystemsToSkip); /** Re compute summaries in case they do not match the array content. */ void rebuild(); diff --git a/objectstore/RetrieveQueueTest.cpp b/objectstore/RetrieveQueueTest.cpp index 098114e6289ee943527374f1714ef091042cd2de..aa7d386797eae835bccd7a8954eaf32a7c1a7916 100644 --- a/objectstore/RetrieveQueueTest.cpp +++ b/objectstore/RetrieveQueueTest.cpp @@ -124,7 +124,8 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) { ASSERT_EQ(minStartTime, rq.getJobsSummary().oldestJobStartTime); uint64_t nextExpectedFseq=0; while (rq.getJobsSummary().jobs) { - auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>()); + auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>(), + std::set<std::string>()); std::set<std::string> jobsToSkip; std::list<std::string> jobsToDelete; for (auto &j: candidateJobs.candidates) { @@ -135,7 +136,7 @@ TEST(ObjectStore, RetrieveQueueShardingAndOrderingTest) { jobsToDelete.emplace_back(j.address); nextExpectedFseq++; } - auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip); + auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip, std::set<std::string>()); if (candidateJobs2.candidateFiles) { std::stringstream address; address << "someRequest-" << nextExpectedFseq; @@ -245,7 +246,8 @@ TEST(ObjectStore, RetrieveQueueActivityCounts) { ASSERT_EQ(0.2, jsB->weight); uint64_t nextExpectedFseq=0; while (rq.getJobsSummary().jobs) { - auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>()); + auto candidateJobs = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 50, std::set<std::string>(), + std::set<std::string>()); std::set<std::string> jobsToSkip; std::list<std::string> jobsToDelete; for (auto &j: candidateJobs.candidates) { @@ -256,7 +258,7 @@ TEST(ObjectStore, RetrieveQueueActivityCounts) { jobsToDelete.emplace_back(j.address); nextExpectedFseq++; } - auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip); + auto candidateJobs2 = rq.getCandidateList(std::numeric_limits<uint64_t>::max(), 1, jobsToSkip, std::set<std::string>()); if (candidateJobs2.candidateFiles) { std::stringstream address; address << "someRequest-" << nextExpectedFseq; diff --git a/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp b/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp index 90dcb21623eb00dfc456bde5dc00039d95cfa43d..6186f62eb8f4a8b3367ad5fc783a07b2f7daf890 100644 --- a/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp +++ b/objectstore/RetrieveQueueToTransferForUserAlgorithms.cpp @@ -41,12 +41,12 @@ addToLog(log::ScopedParamContainer ¶ms) const { template<> auto ContainerTraits<RetrieveQueue,RetrieveQueueToTransferForUser>:: -getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip, +getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elementsToSkip, log::LogContext &lc) -> PoppedElementsBatch { PoppedElementsBatch ret; - auto candidateJobsFromQueue = cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elemtsToSkip); + auto candidateJobsFromQueue = cont.getCandidateList(unfulfilledCriteria.bytes, unfulfilledCriteria.files, elementsToSkip, unfulfilledCriteria.diskSystemsToSkip); for(auto &cjfq : candidateJobsFromQueue.candidates) { ret.elements.emplace_back(PoppedElement{ cta::make_unique<RetrieveRequest>(cjfq.address, cont.m_objectStore), diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 18828a34014cd997a8d6ac7112a7537346476f66..f4e54e823871034a6583a4b6c28ffe2d3e7e02c4 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -34,6 +34,7 @@ enum ObjectType { RepackRequest_t = 11; RepackIndex_t = 12; RepackQueue_t = 13; + DiskSystemSpaceRegister_t = 14; GenericObject_t = 1000; } @@ -206,6 +207,12 @@ message ArchiveFile { // ------------- Drives handling ---------------------------------------------- +message DiskSpaceReservation { + // Each drive keeps tabs of its intended + required string disk_system_name = 5100; + required uint64 reserved_bytes = 5110; +} + message DriveState { required string drivename = 5000; required string host = 5001; @@ -241,6 +248,7 @@ message DriveState { optional uint64 next_priority = 5031; optional string next_activity = 5032; optional double next_activity_weight = 5033; + repeated DiskSpaceReservation disk_space_reservations = 5034; // TODO: implement or remove required EntryLog creationlog = 5023; } @@ -592,20 +600,3 @@ message RepackRequestQueuePointer { message RepackQueue { repeated RepackRequestQueuePointer repackrequestpointers = 12200; } - -message DiskSystemSpaceReservation { - required string holder = 12500; - required uint64 size = 12501; -} - -message DiskSystemSpace { - required string name = 12300; - required uint64 free_space = 12301; - required uint64 last_measurement_time = 12302; - required uint64 targeted_free_space = 12303; - repeated DiskSystemSpaceReservation reservations =12304; -} - -message DiskSystemSpaceRegistry { - repeated DiskSystemSpace disk_systems = 12400; -} diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index b0b7925d4a6f97385b64e475d96cfa73a99b390e..aabbd918f1190c9041b5e0aa240e9b2dbf62d508 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -2365,18 +2365,18 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> } } // We now have created the subrequests. Time to enqueue. + // TODO: the lock/fetch could be parallelized { objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue); - std::list<std::unique_ptr<objectstore::ScopedExclusiveLock>> locks; + std::list<objectstore::ScopedExclusiveLock> locks; for (auto &is: asyncInsertedSubrequestInfoList) { - locks.push_back(cta::make_unique<objectstore::ScopedExclusiveLock>(*is.request)); + locks.emplace_back(*is.request); is.request->fetch(); sorter.insertRetrieveRequest(is.request, *m_oStoreDB.m_agentReference, is.activeCopyNb, lc); } locks.clear(); sorter.flushAll(lc); } - } //------------------------------------------------------------------------------ @@ -2402,6 +2402,9 @@ void OStoreDB::RepackRequest::fail() { m_repackRequest.commit(); } +//------------------------------------------------------------------------------ +// OStoreDB::RepackRequest::requeueInToExpandQueue() +//------------------------------------------------------------------------------ void OStoreDB::RepackRequest::requeueInToExpandQueue(log::LogContext& lc){ ScopedExclusiveLock rrl(m_repackRequest); m_repackRequest.fetch(); @@ -2418,6 +2421,9 @@ void OStoreDB::RepackRequest::requeueInToExpandQueue(log::LogContext& lc){ rqteAlgo.referenceAndSwitchOwnership(nullopt, previousOwner, insertedElements, lc); } +//------------------------------------------------------------------------------ +// OStoreDB::RepackRequest::setExpandStartedAndChangeStatus() +//------------------------------------------------------------------------------ void OStoreDB::RepackRequest::setExpandStartedAndChangeStatus(){ ScopedExclusiveLock rrl(m_repackRequest); m_repackRequest.fetch(); @@ -2426,6 +2432,9 @@ void OStoreDB::RepackRequest::setExpandStartedAndChangeStatus(){ m_repackRequest.commit(); } +//------------------------------------------------------------------------------ +// OStoreDB::RepackRequest::fillLastExpandedFSeqAndTotalStatsFile() +//------------------------------------------------------------------------------ void OStoreDB::RepackRequest::fillLastExpandedFSeqAndTotalStatsFile(uint64_t& fSeq, TotalStatsFiles& totalStatsFiles) { ScopedExclusiveLock rrl(m_repackRequest); m_repackRequest.fetch(); @@ -3443,12 +3452,30 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo( // OStoreDB::RetrieveMount::getNextJobBatch() //------------------------------------------------------------------------------ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount::getNextJobBatch(uint64_t filesRequested, - uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) { + uint64_t bytesRequested, cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) { + // Pop a batch of files to retrieve and, for the ones having a documented disk system name, reserve the space + // that they will require. In case we cannot allocate the space for some of them, mark the destination filesystem as + // full and stop popping from it, after requeueing the jobs. typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransferForUser> RQAlgos; RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested); + popCriteria.diskSystemsToSkip = m_diskSystemsToSkip; auto jobs = rqAlgos.popNextBatch(mountInfo.vid, popCriteria, logContext); - // We can construct the return value + // Try and allocate data for the popped jobs. + // Compute the necessary space in each targeted disk system. + SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest; + std::map<std::string, uint64_t> spaceMap; + for (auto &j: jobs.elements) + if (j.diskSystemName) + diskSpaceReservationRequest.addRequest(j.diskSystemName.value(), j.archiveFile.fileSize); + // Get the existing reservation map from the other drives. + auto otherDrivesReservations = getExistingDrivesReservations(); + // Get the free space from disk systems involved. + + // If any file system does not have enough space, make it as full, requeue all (slight but rare inefficiency) + // and retry the pop. + + // Else, we can construct the return value (we did not hit any full disk system. std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> ret; for(auto &j : jobs.elements) { @@ -3466,6 +3493,71 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMou return ret; } +//------------------------------------------------------------------------------ +// OStoreDB::RetrieveMount::requeueJobBatch() +//------------------------------------------------------------------------------ +void OStoreDB::RetrieveMount::requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch, + log::LogContext& logContext) { + objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue); + std::list<std::shared_ptr<objectstore::RetrieveRequest>> rrlist; + std::list<objectstore::ScopedExclusiveLock> locks; + for (auto & j: jobBatch) { + cta::OStoreDB::RetrieveJob *rj = cta::OStoreDB::castFromSchedDBJob(j.get()); + auto rr = std::make_shared<objectstore::RetrieveRequest>(rj->m_retrieveRequest.getAddressIfSet(), m_oStoreDB.m_objectStore); + rrlist.push_back(rr); + locks.emplace_back(*rr); + rr->fetch(); + sorter.insertRetrieveRequest(rr, *m_oStoreDB.m_agentReference, rj->selectedCopyNb, logContext); + } + locks.clear(); + rrlist.clear(); + sorter.flushAll(logContext); +} + +//------------------------------------------------------------------------------ +// OStoreDB::RetrieveMount::getExistingDrivesReservations() +//------------------------------------------------------------------------------ +std::map<std::string, uint64_t> OStoreDB::RetrieveMount::getExistingDrivesReservations() { + objectstore::RootEntry re(m_oStoreDB.m_objectStore); + re.fetchNoLock(); + objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_oStoreDB.m_objectStore); + dr.fetchNoLock(); + auto driveAddresses = dr.getDriveAddresses(); + std::list <objectstore::DriveState> dsList; + std::list <std::unique_ptr<objectstore::DriveState::AsyncLockfreeFetcher>> dsFetchers; + for (auto &d: driveAddresses) { + dsList.emplace_back(d.driveStateAddress, m_oStoreDB.m_objectStore); + dsFetchers.emplace_back(dsList.back().asyncLockfreeFetch()); + } + auto dsf = dsFetchers.begin(); + std::map<std::string, uint64_t> ret; + for (auto &d: dsList) { + try { + (*dsf)->wait(); + dsf++; + for (auto &dsr: d.getDiskSpaceReservations()) { + try { + ret.at(dsr.first) += dsr.second; + } catch (std::out_of_range &) { + ret[dsr.first] = dsr.second; + } + } + } catch (objectstore::Backend::NoSuchObject) { + // If the drive status is not there, we just skip it. + dsf++; + } + } + return ret; +} + +//------------------------------------------------------------------------------ +// OStoreDB::RetrieveMount::reserveDiskSpace() +//------------------------------------------------------------------------------ +void OStoreDB::RetrieveMount::reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) { + // Try and add our reservation to the disk + throw exception::Exception("In OStoreDB::RetrieveMount::reserveDiskSpace(): not implemented."); +} + //------------------------------------------------------------------------------ // OStoreDB::RetrieveMount::complete() //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index d11b6c5277f72aa257f3bd3563e76ca2d41e4cb2..f2a6d0135c340cbf7559e19b5a33ce2c90fd6204 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -218,10 +218,14 @@ public: public: const MountInfo & getMountInfo() override; std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, - const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) override; - void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override; - void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) override; - void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override; + cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) override; + private: + void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch, + log::LogContext& logContext); + std::map<std::string, uint64_t> getExistingDrivesReservations(); + void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation); + std::set<std::string> m_diskSystemsToSkip; + public: void complete(time_t completionTime) override; void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override; void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override; diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp index d60545da2b8837ee88b1d6023584adefb3fad1ef..30fc80c2d3d92f91c36f88f9696311648141c525 100644 --- a/scheduler/RetrieveMount.cpp +++ b/scheduler/RetrieveMount.cpp @@ -141,35 +141,40 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc // Get the current file systems list from the catalogue disk::DiskSystemList diskSystemList; if (m_catalogue) diskSystemList = m_catalogue->getAllDiskSystems(); - // Try and get a new job from the DB - std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch; - { - retryBatchAllocation: - dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, m_fullDiskSystems, logContext); - // Compute the necessary space in each targeted disk system. - SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest; - std::map<std::string, uint64_t> spaceMap; - for (auto &j: dbJobBatch) - if (j->diskSystemName) - diskSpaceReservationRequest.addRequest(j->diskSystemName.value(), j->archiveFile.fileSize); - // Reserve the space. - // We will update this information on-demand during iterations if needed. - disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList); - retrySpaceAllocation: - try { - m_dbMount->reserveDiskSpace(diskSpaceReservationRequest); - } catch (SchedulerDatabase::OutdatedDiskSystemInformation &odsi) { - // Update information for missing/outdated disk systems. - diskSystemFreeSpaceList.fetchFileSystemFreeSpace(odsi.getDiskSsytems()); - goto retrySpaceAllocation; - } catch (SchedulerDatabase::FullDiskSystem &fds) { - // Mark the disk systems as full for the mount. Re-queue all requests, repeat the pop attempt. - for (auto &ds: fds.getDiskSsytems()) m_fullDiskSystems.insert(ds); - m_dbMount->requeueJobBatch(dbJobBatch); - dbJobBatch.clear(); - goto retryBatchAllocation; - } - } + // TODO: the diskSystemFreeSpaceList could be made a member of the retrieve mount and cache the fetched values, limiting the re-querying + // of the disk systems free space. + disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList (diskSystemList); + // Try and get a new job from the DB. The DB mount (in memory object) is taking care of reserving the free space for the popped + // elements and query the disk systems, via the diskSystemFreeSpaceList object. + auto dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, diskSystemFreeSpaceList, logContext); +// std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch; +// { +// retryBatchAllocation: +// dbJobBatch = m_dbMount->getNextJobBatch(filesRequested, bytesRequested, diskSystemFreeSpaceList, logContext); +// // Compute the necessary space in each targeted disk system. +// SchedulerDatabase::DiskSpaceReservationRequest diskSpaceReservationRequest; +// std::map<std::string, uint64_t> spaceMap; +// for (auto &j: dbJobBatch) +// if (j->diskSystemName) +// diskSpaceReservationRequest.addRequest(j->diskSystemName.value(), j->archiveFile.fileSize); +// // Reserve the space. +// // We will update this information on-demand during iterations if needed. +// disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList); +// retrySpaceAllocation: +// try { +// m_dbMount->reserveDiskSpace(diskSpaceReservationRequest); +// } catch (SchedulerDatabase::OutdatedDiskSystemInformation &odsi) { +// // Update information for missing/outdated disk systems. +// diskSystemFreeSpaceList.fetchFileSystemFreeSpace(odsi.getDiskSsytems()); +// goto retrySpaceAllocation; +// } catch (SchedulerDatabase::FullDiskSystem &fds) { +// // Mark the disk systems as full for the mount. Re-queue all requests, repeat the pop attempt. +// for (auto &ds: fds.getDiskSsytems()) m_fullDiskSystems.insert(ds); +// m_dbMount->requeueJobBatch(dbJobBatch, logContext); +// dbJobBatch.clear(); +// goto retryBatchAllocation; +// } +// } std::list<std::unique_ptr<RetrieveJob>> ret; // We prepare the response for (auto & sdrj: dbJobBatch) { diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 4eebfd616b6338bb57bf2967e437f180d9d8e6f1..ae09ddb2df68ed616cc93ae51cf9fcbf1334fcd3 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -349,27 +349,7 @@ public: std::map<std::string, uint64_t> m_spaceMap; }; -private: - class ProblemDiskSystemList: public exception::Exception { - using cta::exception::Exception::Exception; - public: - const std::set<std::string> &getDiskSsytems() { return m_outdatedDiskSystems; } - void addDiskSystem(const std::string &diskSystenName) { m_outdatedDiskSystems.insert(diskSystenName); } - private: - std::set<std::string> m_outdatedDiskSystems; - }; - public: - /** An exception allowing the reservation function to de called again with up to date free space information from the - * disk systems.*/ - class OutdatedDiskSystemInformation: public ProblemDiskSystemList { using ProblemDiskSystemList::ProblemDiskSystemList; }; - - /** An exception allowing the reservation system to report disk systems for which the free space could not be reserved */ - class FullDiskSystem: public ProblemDiskSystemList { using ProblemDiskSystemList::ProblemDiskSystemList; }; - - /** Clear all reservation for an agent. Used at agent cleanup and garbage collection time, so not in retrieve mount context. */ - void clearDiskReservation(const std::string); - class RetrieveMount { public: struct MountInfo { @@ -387,17 +367,8 @@ public: } mountInfo; virtual const MountInfo & getMountInfo() = 0; virtual std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextJobBatch(uint64_t filesRequested, - uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, log::LogContext& logContext) = 0; - virtual void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> & jobBatch) = 0; - - /* Mount-level disk reservation functions. */ - /** Attempt to reserve, can throw OutdatedDiskSystemInformation or FullDiskSystem. Does NOT proceed with any reservation - * in case of throw. */ - virtual void reserveDiskSpace(const DiskSpaceReservationRequest& diskSpaceReservation) = 0; - - /** Release some space for an agent and destination. */ - virtual void releaseDiskSpace(const std::string &reservingAgent, const std::string &diskSystemName, uint64_t size) = 0; - + uint64_t bytesRequested, cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, log::LogContext& logContext) = 0; +// virtual void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> & jobBatch, log::LogContext& logContext) = 0; virtual void complete(time_t completionTime) = 0; virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0; virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0; diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp index 126dd5096f2d1813a10978646fbea49fe701ccf5..247334a97afd3000d219cbcb9b8e6a44fad8b55f 100644 --- a/scheduler/SchedulerDatabaseTest.cpp +++ b/scheduler/SchedulerDatabaseTest.cpp @@ -315,11 +315,11 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) { // Create the disk system list cta::disk::DiskSystemList diskSystemList; + cta::disk::DiskSystemFreeSpaceList diskSystemFreeSpaceList(diskSystemList); diskSystemList.push_back(cta::disk::DiskSystem{"ds-A", "$root://a.disk.system/", "query:todo", 60, 10UL*1000*1000*1000, common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"}); diskSystemList.push_back(cta::disk::DiskSystem{"ds-B", "$root://b.disk.system/", "query:todo", 60, 10UL*1000*1000*1000, common::dataStructures::EntryLog(), common::dataStructures::EntryLog{},"No comment"}); - std::set<std::string> fullDiskSystems; // Inject 10 retrieve jobs to the db. const size_t filesToDo = 10; @@ -369,7 +369,7 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) { auto moutInfo = db.getMountInfo(lc); ASSERT_EQ(1, moutInfo->potentialMounts.size()); auto rm=moutInfo->createRetrieveMount("vid", "tapePool", "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr), cta::nullopt); - auto rjb = rm->getNextJobBatch(20,20*1000,fullDiskSystems, lc); + auto rjb = rm->getNextJobBatch(20,20*1000,diskSystemFreeSpaceList, lc); ASSERT_EQ(filesToDo, rjb.size()); std::list <cta::SchedulerDatabase::RetrieveJob*> jobBatch; for (auto &rj: rjb) { @@ -380,7 +380,7 @@ TEST_P(SchedulerDatabaseTest, popRetrieveRequestsWithDisksytem) { } rm->flushAsyncSuccessReports(jobBatch, lc); rjb.clear(); - ASSERT_EQ(0, rm->getNextJobBatch(20,20*1000,fullDiskSystems, lc).size()); + ASSERT_EQ(0, rm->getNextJobBatch(20,20*1000,diskSystemFreeSpaceList, lc).size()); rm->complete(time(nullptr)); rm.reset(nullptr); moutInfo.reset(nullptr); diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp index cc75055c248dc9073dbccd05a148e0dccb96e1f6..6ef2ede0c1585cf018fc927f4a392bdf56f1005d 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp @@ -43,10 +43,7 @@ namespace unitTests{ class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, - const std::set<std::string> &fullDiskSystems, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} - void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override { throw std::runtime_error("Not implemented");} - void reserveDiskSpace(const cta::SchedulerDatabase::DiskSpaceReservationRequest& diskSpaceReservation) override { throw std::runtime_error("Not implemented");} - void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override { throw std::runtime_error("Not implemented");} + cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp index 78237ffaf083b20f755d0f23674a7584bfff42fa..b3984a5f6aa9db4e09e3f5ed3d11aa4217d50214 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp @@ -36,10 +36,7 @@ namespace unitTests{ class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } - std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, const std::set<std::string> &fullDiskSystems, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} - void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override { throw std::runtime_error("Not implemented");} - void reserveDiskSpace(const cta::SchedulerDatabase::DiskSpaceReservationRequest& diskSpaceReservation) override { throw std::runtime_error("Not implemented");} - void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override { throw std::runtime_error("Not implemented");} + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp index fab688adffd4510ac88fdffff59037a15417b7af..0ee82c988bd61d0cec5a83a4203ba221721804ce 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp @@ -132,10 +132,7 @@ namespace unitTests class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, - const std::set<std::string> &fullDiskSystems, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} - void requeueJobBatch(std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> >& jobBatch) override { throw std::runtime_error("Not implemented");} - void reserveDiskSpace(const cta::SchedulerDatabase::DiskSpaceReservationRequest& diskSpaceReservation) override { throw std::runtime_error("Not implemented");} - void releaseDiskSpace(const std::string& reservingAgent, const std::string& diskSystemName, uint64_t size) override { throw std::runtime_error("Not implemented");} + cta::disk::DiskSystemFreeSpaceList & diskSystemFreeSpace, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }