From c4a7fe4cd8b9373c707ec23e52d4f377aa7cc8bb Mon Sep 17 00:00:00 2001 From: Cedric CAFFY <cedric.caffy@cern.ch> Date: Fri, 28 Jun 2019 14:28:07 +0200 Subject: [PATCH] Added reporting logs for reporting threads Modified the size of the batch retrieved for reporting --- scheduler/OStoreDB/OStoreDB.hpp | 2 +- scheduler/RepackReportThread.cpp | 23 ++++++++++++++++++++--- scheduler/RepackReportThread.hpp | 9 ++++++--- scheduler/RepackRequestManager.cpp | 5 ----- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index b428686560..b5468b38ad 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -512,7 +512,7 @@ public: std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextFailedArchiveRepackReportBatch(log::LogContext &lc); CTA_GENERATE_EXCEPTION_CLASS(NoRepackReportBatchFound); private: - const size_t c_repackReportBatchSize = 4000; + const size_t c_repackReportBatchSize = 500; public: /* === Drive state handling ============================================== */ diff --git a/scheduler/RepackReportThread.cpp b/scheduler/RepackReportThread.cpp index c128992eca..3e694bc884 100644 --- a/scheduler/RepackReportThread.cpp +++ b/scheduler/RepackReportThread.cpp @@ -23,16 +23,33 @@ namespace cta { } void RepackReportThread::run() { - m_runTimer.reset(); + utils::Timer totalTime; bool moreBatch = true; - while(m_runTimer.secs() < 30.0 && moreBatch){ + log::ScopedParamContainer params(m_lc); + params.add("reportingType",getReportingType()); + uint64_t numberOfBatchReported = 0; + while(totalTime.secs() < c_maxTimeToReport && moreBatch){ + utils::Timer t; + log::TimingList tl; cta::Scheduler::RepackReportBatch reportBatch = getNextRepackReportBatch(m_lc); - if(!reportBatch.empty()){ + tl.insertAndReset("getNextRepackReportBatchTime",t); + if(!reportBatch.empty()) { reportBatch.report(m_lc); + numberOfBatchReported++; + tl.insertAndReset("reportingTime",t); + log::ScopedParamContainer paramsReport(m_lc); + tl.addToLog(paramsReport); + m_lc.log(log::INFO,"In RepackReportThread::run(), reported a batch of reports."); } else { moreBatch = false; } } + if(numberOfBatchReported > 0){ + params.add("numberOfBatchReported",numberOfBatchReported); + params.add("totalRunTime",totalTime.secs()); + params.add("moreBatchToDo",moreBatch); + m_lc.log(log::INFO,"In RepackReportThread::run(), exiting."); + } } cta::Scheduler::RepackReportBatch RetrieveSuccessesRepackReportThread::getNextRepackReportBatch(log::LogContext &lc){ diff --git a/scheduler/RepackReportThread.hpp b/scheduler/RepackReportThread.hpp index 3b10270b37..670e2f76a6 100644 --- a/scheduler/RepackReportThread.hpp +++ b/scheduler/RepackReportThread.hpp @@ -27,13 +27,12 @@ public: RepackReportThread(Scheduler& scheduler, log::LogContext &lc):m_scheduler(scheduler),m_lc(lc){} virtual ~RepackReportThread(); void run(); - cta::utils::Timer m_runTimer; - double m_timeToReport; - bool m_batchProcessed; protected: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc) = 0; + virtual std::string getReportingType() = 0; Scheduler& m_scheduler; log::LogContext& m_lc; + const double c_maxTimeToReport = 30.0; }; class RetrieveSuccessesRepackReportThread: public RepackReportThread{ @@ -41,6 +40,7 @@ public: RetrieveSuccessesRepackReportThread(Scheduler& scheduler,log::LogContext& lc):RepackReportThread(scheduler,lc) {} private: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc); + virtual std::string getReportingType(){ return "RetrieveSuccesses"; } }; class ArchiveSuccessesRepackReportThread: public RepackReportThread{ @@ -48,6 +48,7 @@ public: ArchiveSuccessesRepackReportThread(Scheduler& scheduler,log::LogContext& lc):RepackReportThread(scheduler,lc) {} private: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc); + virtual std::string getReportingType(){ return "ArchiveSuccesses"; } }; class RetrieveFailedRepackReportThread: public RepackReportThread{ @@ -55,6 +56,7 @@ public: RetrieveFailedRepackReportThread(Scheduler& scheduler,log::LogContext& lc):RepackReportThread(scheduler,lc) {} private: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc); + virtual std::string getReportingType(){ return "RetrieveFailed"; } }; class ArchiveFailedRepackReportThread: public RepackReportThread{ @@ -62,6 +64,7 @@ public: ArchiveFailedRepackReportThread(Scheduler& scheduler,log::LogContext& lc):RepackReportThread(scheduler,lc) {} private: virtual cta::Scheduler::RepackReportBatch getNextRepackReportBatch(log::LogContext &lc); + virtual std::string getReportingType(){ return "ArchiveFailed"; } }; } \ No newline at end of file diff --git a/scheduler/RepackRequestManager.cpp b/scheduler/RepackRequestManager.cpp index 1baa69a13a..62a615b5f2 100644 --- a/scheduler/RepackRequestManager.cpp +++ b/scheduler/RepackRequestManager.cpp @@ -29,7 +29,6 @@ void RepackRequestManager::runOnePass(log::LogContext& lc) { utils::Timer t; log::TimingList timingList; // First expand any request to expand - // TODO: implement expansion // Next promote requests to ToExpand if needed //Putting pending repack request into the RepackQueueToExpand queue @@ -59,10 +58,6 @@ void RepackRequestManager::runOnePass(log::LogContext& lc) { rfrrt.run(); ArchiveFailedRepackReportThread afrrt(m_scheduler,lc); afrrt.run(); - // Do all round of repack subrequest reporting (heavy lifting is done internally). - /*for(auto& reportBatch: m_scheduler.getRepackReportBatches(lc)){ - reportBatch.report(lc); - }*/ } } -- GitLab