diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index c246e1d4d3557c8f2f7a10c9fc6f8ae93871440e..bfd518e9b01ab243189fb55259a0f7e9174b2d47 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -3155,4 +3155,10 @@ void OStoreDB::RetrieveJob::checkReportSucceedForRepack(){ m_jobSucceedForRepackReporter->wait(); } +std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> OStoreDB::getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) +{ + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> ret; + //TODO : Use Algorithms to retrieve the jobs from the RetrieveQueueToReportToRepackForSuccess + return ret; +} } // namespace cta diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index e1cf471b35d18123c98e55276853a165655b48f9..5a6b2160e158bfd64238ffbf1572ecae0506185e 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -309,6 +309,8 @@ public: std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext) override; + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) override; + /* === Repack requests handling =========================================== */ void queueRepack(const std::string& vid, const std::string& bufferURL, common::dataStructures::RepackInfo::Type repackType, log::LogContext &logContext) override; diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index 2bad25ac4473d6f81518c9d908e67e88608d13c1..b4434486c5076c7638f85c6f79ff8c48a5040746 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -146,6 +146,10 @@ public: return m_OStoreDB.getNextRetrieveJobsFailedBatch(filesRequested, lc); } + std::list<std::unique_ptr<RetrieveJob>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) override { + return m_OStoreDB.getNextSucceededRetrieveRequestForRepackBatch(filesRequested,lc); + } + void setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::TimingList & timingList, utils::Timer & t, log::LogContext& lc) override { m_OStoreDB.setJobBatchReported(jobsBatch, timingList, t, lc); diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index a0fedf915f9112f695b634bbd8bdfa1bff365635..f05b67d218faf96a6fdce507fde4139a8185f4bc 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -237,11 +237,6 @@ void Scheduler::queueRetrieve( lc.log(log::INFO, "Queued retrieve request"); } -void Scheduler::queueRetrieveRequestForRepack(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request, - std::list<uint64_t> copyNbs, log::LogContext &lc) -{ - -} //------------------------------------------------------------------------------ // deleteArchive @@ -325,6 +320,11 @@ void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliI lc.log(log::INFO, "In Scheduler::queueRepack(): success."); } +std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> Scheduler::getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) +{ + return m_db.getNextSucceededRetrieveRequestForRepackBatch(filesRequested,lc); +} + //------------------------------------------------------------------------------ // cancelRepack //------------------------------------------------------------------------------ diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 0d99c5638ce7e7f8850b1f034bed564a87c09761..e4f1bfc5887cc8e84c07e22bb1de713888d2815b 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -48,6 +48,7 @@ #include "scheduler/TapeMount.hpp" #include "scheduler/SchedulerDatabase.hpp" #include "scheduler/RepackRequest.hpp" +#include "objectstore/RetrieveRequest.hpp" #include "eos/DiskReporter.hpp" #include "eos/DiskReporterFactory.hpp" @@ -142,17 +143,6 @@ public: void queueRetrieve(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request, log::LogContext &lc); - - /** - * Queue a retrieve request specific for Repack - * @param instanceName - * @param request - * @param copyNbs - * @param lc - */ - void queueRetrieveRequestForRepack(const std::string &instanceName, const cta::common::dataStructures::RetrieveRequest &request, - std::list<uint64_t> copyNbs, log::LogContext &lc); - /** * Delete an archived file or a file which is in the process of being archived. * Throws a UserError exception in case of wrong request parameters (ex. unknown file id) @@ -208,6 +198,14 @@ public: std::list<cta::common::dataStructures::RepackInfo> getRepacks(); cta::common::dataStructures::RepackInfo getRepack(const std::string &vid); + /** + * Return the list of all RetrieveRequests that are in the RetrieveQueueToReportToRepackForSuccess + * @param nbRequests : The number of request we would like to return + * @param lc + * @return The list of all RetrieveRequests that are queued in the RetrieveQueueToReportToRepackForSuccess + */ + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t nbRequests, log::LogContext& lc); + void shrink(const cta::common::dataStructures::SecurityIdentity &cliIdentity, const std::string &tapepool); // removes extra tape copies from a specific pool(usually an "_2" pool) diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 027d3fa7368885423c2e2e12756c56fda36edb9d..d2f907750f5a75df40ca3ca374063fd656e8d2a4 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -433,6 +433,7 @@ public: */ virtual std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logContext) = 0; virtual std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext) = 0; + virtual std::list<std::unique_ptr<RetrieveJob>> getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& logContext) = 0; /*============ Label management: user side =================================*/ // TODO diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 2364e9327331c0294ebc0f455dfc38eaf11cb3f1..fc7d80d3b60327ae7ba077b2c630dafd481bda96 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -1331,6 +1331,9 @@ TEST_P(SchedulerTest, expandRepackRequest) { } } } + scheduler.waitSchedulerDbSubthreadsComplete(); + scheduler.getNextSucceededRetrieveRequestForRepackBatch(5,lc); + //scheduler.transformRetrieveRequestsToArchiveForRepack(lc); } #undef TEST_MOCK_DB