From 8641312c43c008c40c03557c1bdcd85b84bcdb1f Mon Sep 17 00:00:00 2001
From: Michael Davis <michael.davis@cern.ch>
Date: Fri, 11 Jan 2019 17:31:00 +0100
Subject: [PATCH] [os-failedrequests] Implements failedrequests ls --summary

---
 scheduler/OStoreDB/OStoreDB.cpp           | 36 +++++++++++++++
 scheduler/OStoreDB/OStoreDB.hpp           |  6 ++-
 scheduler/OStoreDB/OStoreDBFactory.hpp    |  8 ++++
 scheduler/Scheduler.cpp                   | 14 ++++++
 scheduler/Scheduler.hpp                   | 18 +++++---
 scheduler/SchedulerDatabase.hpp           | 13 +++++-
 xroot_plugins/XrdCtaFailedRequestLs.hpp   | 54 +++++++++++------------
 xroot_plugins/XrdSsiCtaRequestMessage.cpp |  2 +-
 8 files changed, 113 insertions(+), 38 deletions(-)

diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index 0361133acb..b2fb41a718 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -693,6 +693,24 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::getNextArch
   }
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::getArchiveJobsFailedSummary
+//------------------------------------------------------------------------------
+SchedulerDatabase::JobsFailedSummary OStoreDB::getArchiveJobsFailedSummary(log::LogContext &logContext) {
+  RootEntry re(m_objectStore);
+  re.fetchNoLock();
+
+  SchedulerDatabase::JobsFailedSummary ret;
+  auto queueList = re.dumpArchiveQueues(QueueType::FailedJobs);
+  for(auto &aj : queueList) {
+    ArchiveQueue aq(aj.address, m_objectStore);
+    auto summary = aq.getCandidateSummary();
+    ret.totalFiles += summary.candidateFiles;
+    ret.totalBytes += summary.candidateBytes;
+  }
+  return ret;
+}
+
 //------------------------------------------------------------------------------
 // OStoreDB::setArchiveJobBatchReported()
 //------------------------------------------------------------------------------
@@ -1115,6 +1133,24 @@ getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logCo
   }
 }
 
+//------------------------------------------------------------------------------
+// OStoreDB::getRetrieveJobsFailedSummary
+//------------------------------------------------------------------------------
+SchedulerDatabase::JobsFailedSummary OStoreDB::getRetrieveJobsFailedSummary(log::LogContext &logContext) {
+  RootEntry re(m_objectStore);
+  re.fetchNoLock();
+
+  SchedulerDatabase::JobsFailedSummary ret;
+  auto queueList = re.dumpRetrieveQueues(QueueType::FailedJobs);
+  for(auto &rj : queueList) {
+    RetrieveQueue rq(rj.address, m_objectStore);
+    auto summary = rq.getCandidateSummary();
+    ret.totalFiles += summary.candidateFiles;
+    ret.totalBytes += summary.candidateBytes;
+  }
+  return ret;
+}
+
 //------------------------------------------------------------------------------
 // OStoreDB::getNextRetrieveJobsFailedBatch()
 //------------------------------------------------------------------------------
diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp
index 9909faa019..0e48c8fe6c 100644
--- a/scheduler/OStoreDB/OStoreDB.hpp
+++ b/scheduler/OStoreDB/OStoreDB.hpp
@@ -255,6 +255,8 @@ public:
 
   std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > getNextArchiveJobsToReportBatch(uint64_t filesRequested, 
      log::LogContext & logContext) override;
+
+  JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &logContext) override;
   
   void setArchiveJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*> & jobsBatch,
      log::TimingList & timingList, utils::Timer & t, log::LogContext & lc) override;
@@ -289,7 +291,9 @@ public:
      log::TimingList & timingList, utils::Timer & t, log::LogContext & lc) override;
 
   std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext) override;
-  
+
+  JobsFailedSummary getRetrieveJobsFailedSummary(log::LogContext &logContext) override;
+
   /* === Drive state handling  ============================================== */
   /**
    * Get states of all drives.
diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp
index 5e39496282..b6020bfa77 100644
--- a/scheduler/OStoreDB/OStoreDBFactory.hpp
+++ b/scheduler/OStoreDB/OStoreDBFactory.hpp
@@ -138,6 +138,10 @@ public:
     return m_OStoreDB.getNextArchiveJobsToReportBatch(filesRequested, lc);
   }
 
+  JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &lc) override {
+    return m_OStoreDB.getArchiveJobsFailedSummary(lc);
+  }
+
   std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &lc) override {
     return m_OStoreDB.getNextRetrieveJobsToReportBatch(filesRequested, lc);
   }
@@ -146,6 +150,10 @@ public:
     return m_OStoreDB.getNextRetrieveJobsFailedBatch(filesRequested, lc);
   }
 
+  JobsFailedSummary getRetrieveJobsFailedSummary(log::LogContext &lc) override {
+    return m_OStoreDB.getRetrieveJobsFailedSummary(lc);
+  }
+
   void setArchiveJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::TimingList & timingList,
       utils::Timer & t, log::LogContext& lc) override {
     m_OStoreDB.setArchiveJobBatchReported(jobsBatch, timingList, t, lc);
diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp
index 768aaac4c4..bff62821b9 100644
--- a/scheduler/Scheduler.cpp
+++ b/scheduler/Scheduler.cpp
@@ -1079,6 +1079,13 @@ std::list<std::unique_ptr<ArchiveJob> > Scheduler::getNextArchiveJobsToReportBat
   return ret;
 }
 
+//------------------------------------------------------------------------------
+// getArchiveJobsFailedSummary
+//------------------------------------------------------------------------------
+SchedulerDatabase::JobsFailedSummary Scheduler::getArchiveJobsFailedSummary(log::LogContext &logContext) {
+  return m_db.getArchiveJobsFailedSummary(logContext);
+}
+
 //------------------------------------------------------------------------------
 // getNextRetrieveJobsToReportBatch
 //------------------------------------------------------------------------------
@@ -1114,6 +1121,13 @@ getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logCont
   return ret;
 }
 
+//------------------------------------------------------------------------------
+// getRetrieveJobsFailedSummary
+//------------------------------------------------------------------------------
+SchedulerDatabase::JobsFailedSummary Scheduler::getRetrieveJobsFailedSummary(log::LogContext &logContext) {
+  return m_db.getRetrieveJobsFailedSummary(logContext);
+}
+
 //------------------------------------------------------------------------------
 // reportArchiveJobsBatch
 //------------------------------------------------------------------------------
diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp
index 7f00f61e1c..3ff9f9ba67 100644
--- a/scheduler/Scheduler.hpp
+++ b/scheduler/Scheduler.hpp
@@ -74,7 +74,7 @@ class RetrieveJob;
 class Scheduler {
   
 public:
-  
+
   /**
    * Constructor.
    */
@@ -224,7 +224,6 @@ public:
     const std::string &driveName, const std::string &vid, const uint64_t numberOfFiles, const uint64_t fileSize, 
     const cta::common::dataStructures::TestSourceType testSourceType) const;
 
-
   std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob> > getPendingArchiveJobs(log::LogContext &lc) const;
   std::list<cta::common::dataStructures::ArchiveJob> getPendingArchiveJobs(const std::string &tapePoolName, log::LogContext &lc) const;
   std::map<std::string, std::list<cta::common::dataStructures::RetrieveJob> > getPendingRetrieveJobs(log::LogContext &lc) const;
@@ -319,7 +318,7 @@ public:
    */
   std::list<common::dataStructures::QueueAndMountSummary> getQueuesAndMountSummaries(log::LogContext & lc);
   
-  /*============== Archive reporting support =================================*/
+  /*======================== Archive reporting support =======================*/
   /**
    * Batch job factory
    * 
@@ -335,7 +334,10 @@ public:
   void reportArchiveJobsBatch(std::list<std::unique_ptr<ArchiveJob>> & archiveJobsBatch,
       eos::DiskReporterFactory & reporterFactory, log::TimingList&, utils::Timer &, log::LogContext &);
 
-  /* ============================== Retrieve reporting support ============================== */
+  /*======================= Failed archive jobs support ======================*/
+  SchedulerDatabase::JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &lc);
+
+  /*======================= Retrieve reporting support =======================*/
   /*!
    * Batch job factory
    * 
@@ -364,9 +366,11 @@ public:
    */
   std::list<std::unique_ptr<RetrieveJob>>
   getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext);
-  
-public:    
-  /*============== Administrator management ==================================*/
+
+  /*====================== Failed retrieve jobs support ======================*/
+  SchedulerDatabase::JobsFailedSummary getRetrieveJobsFailedSummary(log::LogContext &lc);
+
+  /*======================== Administrator management ========================*/
   void authorizeAdmin(const cta::common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc);
 
 private:
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index 0680271482..7187b0905b 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -192,7 +192,16 @@ public:
    */
   virtual std::list<std::unique_ptr<ArchiveJob>> getNextArchiveJobsToReportBatch(uint64_t filesRequested,
     log::LogContext & logContext) = 0;
-  
+
+  /*======================= Failed archive jobs support ======================*/
+  struct JobsFailedSummary {
+     JobsFailedSummary(uint64_t f = 0, uint64_t b = 0) : totalFiles(f), totalBytes(b) {}
+     uint64_t totalFiles;
+     uint64_t totalBytes;
+  };
+
+  virtual JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &logContext) = 0;
+
   /**
    * Set a batch of jobs as reported (modeled on ArchiveMount::setJobBatchSuccessful().
    * @param jobsBatch
@@ -379,6 +388,8 @@ public:
   
   virtual std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext) = 0;
 
+  virtual JobsFailedSummary getRetrieveJobsFailedSummary(log::LogContext &logContext) = 0;
+
   /*============ Label management: user side =================================*/
   // TODO
   
diff --git a/xroot_plugins/XrdCtaFailedRequestLs.hpp b/xroot_plugins/XrdCtaFailedRequestLs.hpp
index f7ceca7291..01c919e5b5 100644
--- a/xroot_plugins/XrdCtaFailedRequestLs.hpp
+++ b/xroot_plugins/XrdCtaFailedRequestLs.hpp
@@ -19,10 +19,8 @@
 #pragma once
 
 #include <XrdSsiPbOStreamBuffer.hpp>
-#include <objectstore/ArchiveQueue.hpp>
-#include <objectstore/RetrieveQueue.hpp>
 #include <scheduler/Scheduler.hpp>
-#include <scheduler/RetrieveJob.hpp>
+//#include <scheduler/RetrieveJob.hpp>
 
 
 
@@ -35,13 +33,14 @@ class FailedRequestLsStream : public XrdSsiStream
 {
 public:
    FailedRequestLsStream(Scheduler &scheduler, bool is_archive, bool is_retrieve,
-      bool is_log_entries, bool is_summary) :
+      bool is_log_entries, bool is_summary, cta::log::LogContext &lc) :
       XrdSsiStream(XrdSsiStream::isActive),
       m_scheduler(scheduler),
       m_isArchive(is_archive),
       m_isRetrieve(is_retrieve),
       m_isLogEntries(is_log_entries),
-      m_isSummary(is_summary)
+      m_isSummary(is_summary),
+      m_lc(lc)
    {
       XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "FailedRequestLsStream() constructor");
    }
@@ -88,6 +87,7 @@ public:
             GetBuffSummary(streambuf);
             dlen = streambuf->Size();
             last = true;
+            XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
             return streambuf;
          }
 #if 0
@@ -141,7 +141,10 @@ public:
          dlen = streambuf->Size();
          XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
       } catch(cta::exception::Exception &ex) {
-         throw std::runtime_error(ex.getMessage().str());
+         std::ostringstream errMsg;
+         errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what();
+         eInfo.Set(errMsg.str().c_str(), ECANCELED);
+         delete streambuf;
       } catch(std::exception &ex) {
          std::ostringstream errMsg;
          errMsg << __FUNCTION__ << " failed: " << ex.what();
@@ -156,38 +159,31 @@ public:
       return streambuf;
    }
 
-#if 0
-   // failed archive jobs
-   auto archive_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc);
-   responseTable.push_back({ "archive", std::to_string(archive_summary.candidateFiles), std::to_string(archive_summary.candidateBytes) });
-
-   // failed retrieve jobs
-   auto retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc);
-   responseTable.push_back({ "retrieve", std::to_string(retrieve_summary.candidateFiles), std::to_string(retrieve_summary.candidateBytes) });
-#endif
    void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf) {
-
-      cta::objectstore::ArchiveQueue::CandidateJobList archive_summary;
-      cta::objectstore::RetrieveQueue::CandidateJobList retrieve_summary;
-
-      Data record;
+      SchedulerDatabase::JobsFailedSummary archive_summary;
+      SchedulerDatabase::JobsFailedSummary retrieve_summary;
 
       if(m_isArchive) {
+         Data record;
+         archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc);
          record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::ARCHIVE_REQUEST);
-         record.mutable_frls_summary()->set_total_files(archive_summary.candidateFiles);
-         record.mutable_frls_summary()->set_total_size(archive_summary.candidateBytes);
+         record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles);
+         record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes);
          streambuf->Push(record);
       }
       if(m_isRetrieve) {
+         Data record;
+         retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc);
          record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::RETRIEVE_REQUEST);
-         record.mutable_frls_summary()->set_total_files(retrieve_summary.candidateFiles);
-         record.mutable_frls_summary()->set_total_size(retrieve_summary.candidateBytes);
+         record.mutable_frls_summary()->set_total_files(retrieve_summary.totalFiles);
+         record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes);
          streambuf->Push(record);
       }
       if(m_isArchive && m_isRetrieve) {
+         Data record;
          record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::TOTAL);
-         record.mutable_frls_summary()->set_total_files(archive_summary.candidateFiles + retrieve_summary.candidateFiles);
-         record.mutable_frls_summary()->set_total_size(archive_summary.candidateBytes + retrieve_summary.candidateBytes);
+         record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles);
+         record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes + retrieve_summary.totalBytes);
          streambuf->Push(record);
       }
 
@@ -195,14 +191,16 @@ public:
    }
 
 private:
-   Scheduler &m_scheduler;                 //!< Reference to CTA Scheduler
+   cta::Scheduler &m_scheduler;            //!< Reference to CTA Scheduler
 
    bool m_isArchive;                       //!< List failed archive requests
    bool m_isRetrieve;                      //!< List failed retrieve requests
    bool m_isLogEntries;                    //!< Show failure log messages (verbose)
    bool m_isSummary;                       //!< Short summary of number of failures
 
-   static constexpr const char* const LOG_SUFFIX  = "FailedRequestLsStream";    //!< Identifier for log messages
+   cta::log::LogContext &m_lc;             //!< Reference to CTA Log Context
+
+   static constexpr const char* const LOG_SUFFIX  = "FailedRequestLsStream";    //!< Identifier for SSI log messages
 };
 
 }} // namespace cta::xrd
diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
index 61b6d8f802..1820f64973 100644
--- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp
+++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
@@ -1049,7 +1049,7 @@ void RequestMessage::processFailedRequest_Ls(const cta::admin::AdminCmd &admincm
 
    // Create a XrdSsi stream object to return the results
    stream = new FailedRequestLsStream(m_scheduler, is_archive, is_retrieve,
-      has_flag(OptionBoolean::SHOW_LOG_ENTRIES), has_flag(OptionBoolean::SUMMARY));
+      has_flag(OptionBoolean::SHOW_LOG_ENTRIES), has_flag(OptionBoolean::SUMMARY), m_lc);
 
    // Should the client display column headers?
    if(has_flag(OptionBoolean::SHOW_HEADER)) {
-- 
GitLab