diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index b428686560dec489e699ce268a24d3ec287802a9..b5468b38ad2c0b06737be946718a2d13667085d3 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -512,7 +512,7 @@ public: std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextFailedArchiveRepackReportBatch(log::LogContext &lc); CTA_GENERATE_EXCEPTION_CLASS(NoRepackReportBatchFound); private: - const size_t c_repackReportBatchSize = 4000; + const size_t c_repackReportBatchSize = 500; public: /* === Drive state handling ============================================== */ diff --git a/scheduler/RepackReportThread.cpp b/scheduler/RepackReportThread.cpp index c128992eca09cd072c7a08a410deed263271e4f1..3e694bc8842283b2d22f4908cc776699748343cf 100644 --- a/scheduler/RepackReportThread.cpp +++ b/scheduler/RepackReportThread.cpp @@ -23,16 +23,33 @@ namespace cta { } void RepackReportThread::run() { - m_runTimer.reset(); + utils::Timer totalTime; bool moreBatch = true; - while(m_runTimer.secs() < 30.0 && moreBatch){ + log::ScopedParamContainer params(m_lc); + params.add("reportingType",getReportingType()); + uint64_t numberOfBatchReported = 0; + while(totalTime.secs() < c_maxTimeToReport && moreBatch){ + utils::Timer t; + log::TimingList tl; cta::Scheduler::RepackReportBatch reportBatch = getNextRepackReportBatch(m_lc); - if(!reportBatch.empty()){ + tl.insertAndReset("getNextRepackReportBatchTime",t); + if(!reportBatch.empty()) { reportBatch.report(m_lc); + numberOfBatchReported++; + tl.insertAndReset("reportingTime",t); + log::ScopedParamContainer paramsReport(m_lc); + tl.addToLog(paramsReport); + m_lc.log(log::INFO,"In RepackReportThread::run(), reported a batch of reports."); } else { moreBatch = false; } } + if(numberOfBatchReported > 0){ + params.add("numberOfBatchReported",numberOfBatchReported); + params.add("totalRunTime",totalTime.secs()); + params.add("moreBatchToDo",moreBatch); + m_lc.log(log::INFO,"In RepackReportThread::run(), exiting."); + } } cta::Scheduler::RepackReportBatch RetrieveSuccessesRepackReportThread::getNextRepackReportBatch(log::LogContext &lc){ diff --git a/scheduler/RepackReportThread.hpp b/scheduler/RepackReportThread.hpp index 3b10270b37b27761b8e19df2b3732187d59ea858..670e2f76a639698a3b7b43bf5be7fdc517258aeb 100644 --- a/scheduler/RepackReportThread.hpp +++ b/scheduler/RepackReportThread.hpp @@ -27,13 +27,12 @@ public: RepackReportThread(Scheduler& scheduler, log::LogContext &lc):m_scheduler(scheduler),m_lc(lc){} virtual ~RepackReportThread(); void run(); - cta::utils::Timer m_runTimer; - double m_timeToReport; - bool m_batchProcessed; protected: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc) = 0; + virtual std::string getReportingType() = 0; Scheduler& m_scheduler; log::LogContext& m_lc; + const double c_maxTimeToReport = 30.0; }; class RetrieveSuccessesRepackReportThread: public RepackReportThread{ @@ -41,6 +40,7 @@ public: RetrieveSuccessesRepackReportThread(Scheduler& scheduler,log::LogContext& lc):RepackReportThread(scheduler,lc) {} private: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc); + virtual std::string getReportingType(){ return "RetrieveSuccesses"; } }; class ArchiveSuccessesRepackReportThread: public RepackReportThread{ @@ -48,6 +48,7 @@ public: ArchiveSuccessesRepackReportThread(Scheduler& scheduler,log::LogContext& lc):RepackReportThread(scheduler,lc) {} private: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc); + virtual std::string getReportingType(){ return "ArchiveSuccesses"; } }; class RetrieveFailedRepackReportThread: public RepackReportThread{ @@ -55,6 +56,7 @@ public: RetrieveFailedRepackReportThread(Scheduler& scheduler,log::LogContext& lc):RepackReportThread(scheduler,lc) {} private: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc); + virtual std::string getReportingType(){ return "RetrieveFailed"; } }; class ArchiveFailedRepackReportThread: public RepackReportThread{ @@ -62,6 +64,7 @@ public: ArchiveFailedRepackReportThread(Scheduler& scheduler,log::LogContext& lc):RepackReportThread(scheduler,lc) {} private: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc); + virtual std::string getReportingType(){ return "ArchiveFailed"; } }; } \ No newline at end of file diff --git a/scheduler/RepackRequestManager.cpp b/scheduler/RepackRequestManager.cpp index 1baa69a13a4eb66f93d52ccfac6c509346798f4e..62a615b5f2c1189cda4f81845d0f18879da19a25 100644 --- a/scheduler/RepackRequestManager.cpp +++ b/scheduler/RepackRequestManager.cpp @@ -29,7 +29,6 @@ void RepackRequestManager::runOnePass(log::LogContext& lc) { utils::Timer t; log::TimingList timingList; // First expand any request to expand - // TODO: implement expansion // Next promote requests to ToExpand if needed //Putting pending repack request into the RepackQueueToExpand queue @@ -59,10 +58,6 @@ void RepackRequestManager::runOnePass(log::LogContext& lc) { rfrrt.run(); ArchiveFailedRepackReportThread afrrt(m_scheduler,lc); afrrt.run(); - // Do all round of repack subrequest reporting (heavy lifting is done internally). - /*for(auto& reportBatch: m_scheduler.getRepackReportBatches(lc)){ - reportBatch.report(lc); - }*/ } }