From 0905605a851b654768e817796c1cd324b0cece90 Mon Sep 17 00:00:00 2001
From: Michael Davis <michael.davis@cern.ch>
Date: Wed, 5 Jun 2019 17:00:37 +0200
Subject: [PATCH] Makes FailedRequestLs class a subclass of XrdCtaStream

---
 xroot_plugins/XrdCtaArchiveFileLs.hpp     |   4 +-
 xroot_plugins/XrdCtaFailedRequestLs.hpp   | 192 +++++++++-------------
 xroot_plugins/XrdSsiCtaRequestMessage.cpp |  53 ++----
 3 files changed, 100 insertions(+), 149 deletions(-)

diff --git a/xroot_plugins/XrdCtaArchiveFileLs.hpp b/xroot_plugins/XrdCtaArchiveFileLs.hpp
index 67017598f8..dccc2cdd8d 100644
--- a/xroot_plugins/XrdCtaArchiveFileLs.hpp
+++ b/xroot_plugins/XrdCtaArchiveFileLs.hpp
@@ -45,12 +45,12 @@ private:
    */
   virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf);
 
-  static constexpr const char* const LOG_SUFFIX  = "ArchiveFileLsStream";    //!< Identifier for log messages
-
   cta::catalogue::TapeFileSearchCriteria  m_searchCriteria;                  //!< Search criteria
   catalogue::ArchiveFileItor              m_archiveFileItor;                 //!< Iterator across files which have been archived
   bool                                    m_isSummary;                       //!< Full listing or short summary?
   bool                                    m_isSummaryDone;                   //!< Summary has been sent
+
+  static constexpr const char* const LOG_SUFFIX  = "ArchiveFileLsStream";    //!< Identifier for log messages
 };
 
 
diff --git a/xroot_plugins/XrdCtaFailedRequestLs.hpp b/xroot_plugins/XrdCtaFailedRequestLs.hpp
index 3a36f26f3c..99a13ea910 100644
--- a/xroot_plugins/XrdCtaFailedRequestLs.hpp
+++ b/xroot_plugins/XrdCtaFailedRequestLs.hpp
@@ -18,9 +18,9 @@
 
 #pragma once
 
-#include <XrdSsiPbOStreamBuffer.hpp>
-#include <scheduler/Scheduler.hpp>
-#include <scheduler/OStoreDB/OStoreDB.hpp>
+#include <xroot_plugins/XrdCtaStream.hpp>
+#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp>
+//#include <scheduler/OStoreDB/OStoreDB.hpp>
 
 
 
@@ -29,70 +29,42 @@ namespace cta { namespace xrd {
 /*!
  * Stream object which implements "failedrequest ls" command.
  */
-class FailedRequestLsStream : public XrdSsiStream
+class FailedRequestLsStream : public XrdCtaStream
 {
 public:
   /*!
    * Constructor
    *
+   * @param[in]    requestMsg              RequestMessage containing command-line arguments
+   * @param[in]    catalogue               CTA Catalogue
    * @param[in]    scheduler               CTA Scheduler
-   * @param[in]    archiveQueueItorPtr     Pointer to the Archive Queue iterator
-   * @param[in]    retrieveQueueItorPtr    Pointer to the Retrieve Queue iterator
-   * @param[in]    is_summary              Show only a summary of failed jobs
-   * @param[in]    is_log_entries          Include log messages in output (verbose)
+   * @param[in]    schedDb                 CTA ObjectStore
    * @param[in]    lc                      CTA Log Context
    */
-  FailedRequestLsStream(Scheduler &scheduler, OStoreDB::ArchiveQueueItor_t *archiveQueueItorPtr,
-    OStoreDB::RetrieveQueueItor_t *retrieveQueueItorPtr, bool is_summary, bool is_log_entries,
-    log::LogContext &lc) :
-      XrdSsiStream(XrdSsiStream::isActive),
-      m_scheduler(scheduler),
-      m_archiveQueueItorPtr(archiveQueueItorPtr),
-      m_retrieveQueueItorPtr(retrieveQueueItorPtr),
-      m_isSummary(is_summary),
-      m_isLogEntries(is_log_entries),
-      m_lc(lc)
-  {
-    XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "FailedRequestLsStream() constructor");
-  }
+  FailedRequestLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue,
+    cta::Scheduler &scheduler, OStoreDB &schedDb, log::LogContext &lc);
 
+private:
   /*!
-   * Destructor
+   * Can we close the stream?
    */
-  virtual ~FailedRequestLsStream() {
-    XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "~FailedRequestLsStream() destructor");
+  virtual bool isDone() const {
+    return m_isSummary ? m_isSummaryDone
+                       : !isArchiveJobs() && !isRetrieveJobs();
   }
 
-  /*!
-   * Synchronously obtain data from an active stream
-   *
-   * Active streams can only exist on the server-side. This XRootD SSI Stream class is marked as an
-   * active stream in the constructor.
-   *
-   * @param[out]       eInfo   The object to receive any error description.
-   * @param[in,out]    dlen    input:  the optimal amount of data wanted (this is a hint)
-   *                           output: the actual amount of data returned in the buffer.
-   * @param[in,out]    last    input:  should be set to false.
-   *                           output: if true it indicates that no more data remains to be returned
-   *                                   either for this call or on the next call.
-   *
-   * @return    Pointer to the Buffer object that contains a pointer to the the data (see below). The
-   *            buffer must be returned to the stream using Buffer::Recycle(). The next member is usable.
-   * @retval    0    No more data remains or an error occurred:
-   *                 last = true:  No more data remains.
-   *                 last = false: A fatal error occurred, eRef has the reason.
-   */
-  virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override;
+  bool isArchiveJobs() const {
+    return m_archiveQueueItorPtr && !m_archiveQueueItorPtr->end();
+  }
+
+  bool isRetrieveJobs() const {
+    return m_retrieveQueueItorPtr && !m_retrieveQueueItorPtr->end();
+  }
 
-private:
   /*!
-   * Populate the failed queue summary
-   *
-   * @param[in]    streambuf         XRootD SSI stream object to push records to
-   * @param[in]    isArchiveJobs     Include summary of archive jobs in the output
-   * @param[in]    isRetrieveJobs    Include summary of retrieve jobs in the output
+   * Fill the buffer
    */
-  void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, bool isRetrieveJobs);
+  virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf);
 
   /*!
    * Add a record to the stream
@@ -106,12 +78,18 @@ private:
   template<typename QueueType>
   bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const QueueType &item);
 
-  // Member variables
+  /*!
+   * Populate the failed queue summary
+   *
+   * @param[in]    streambuf         XRootD SSI stream object to push records to
+   */
+  void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf);
+
 
-  Scheduler &m_scheduler;                                                   //!< Reference to CTA Scheduler
   std::unique_ptr<OStoreDB::ArchiveQueueItor_t>  m_archiveQueueItorPtr;     //!< Archive Queue Iterator
   std::unique_ptr<OStoreDB::RetrieveQueueItor_t> m_retrieveQueueItorPtr;    //!< Retrieve Queue Iterator
   bool m_isSummary;                                                         //!< Show only summary of items in the failed queues
+  bool m_isSummaryDone;                                                     //!< Summary has been sent
   bool m_isLogEntries;                                                      //!< Show failure log messages (verbose)
   log::LogContext &m_lc;                                                    //!< Reference to CTA Log Context
 
@@ -119,6 +97,36 @@ private:
 };
 
 
+FailedRequestLsStream::FailedRequestLsStream(const RequestMessage &requestMsg,
+  cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler, OStoreDB &schedDb,
+  log::LogContext &lc) :
+    XrdCtaStream(catalogue, scheduler),
+    m_isSummary(requestMsg.has_flag(admin::OptionBoolean::SUMMARY)),
+    m_isSummaryDone(false),
+    m_isLogEntries(requestMsg.has_flag(admin::OptionBoolean::SHOW_LOG_ENTRIES)),
+    m_lc(lc)
+{
+  using namespace cta::admin;
+
+  if(m_isLogEntries && m_isSummary) {
+    throw cta::exception::UserError("--log and --summary are mutually exclusive");
+  }
+
+  auto tapepool     = requestMsg.getOptional(OptionString::TAPE_POOL);
+  auto vid          = requestMsg.getOptional(OptionString::VID);
+  bool justarchive  = requestMsg.has_flag(OptionBoolean::JUSTARCHIVE)  || tapepool;
+  bool justretrieve = requestMsg.has_flag(OptionBoolean::JUSTRETRIEVE) || vid;
+
+  if(justarchive && justretrieve) {
+    throw cta::exception::UserError("--justarchive/--tapepool and --justretrieve/--vid options are mutually exclusive");
+  }
+
+  if(!justretrieve)
+    m_archiveQueueItorPtr.reset(schedDb.getArchiveJobItorPtr(tapepool ? *tapepool : "", objectstore::JobQueueType::FailedJobs));
+  if(!justarchive)
+    m_retrieveQueueItorPtr.reset(schedDb.getRetrieveJobItorPtr(vid ? *vid : "", objectstore::JobQueueType::FailedJobs));
+}
+
 
 /*!
  * pushRecord ArchiveJob specialisation
@@ -142,7 +150,6 @@ pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const common::dataStructure
 }
 
 
-
 /*!
  * pushRecord RetrieveJob specialisation
  */
@@ -167,73 +174,36 @@ pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const common::dataStructure
 }
 
 
-
-XrdSsiStream::Buffer* FailedRequestLsStream::GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last)
-{
-  XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)");
-
-  XrdSsiPb::OStreamBuffer<Data> *streambuf;
-
-  try {
-    bool isArchiveJobs  = m_archiveQueueItorPtr  && !m_archiveQueueItorPtr->end();
-    bool isRetrieveJobs = m_retrieveQueueItorPtr && !m_retrieveQueueItorPtr->end();
-
-    if(!(m_isSummary || isArchiveJobs || isRetrieveJobs)) {
-      // Nothing more to send, close the stream
-      last = true;
-      return nullptr;
-    }
-
-    // Initialise the stream buffer
-    streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
-
-    if(m_isSummary) {
-      // Special handling for --summary option
-      GetBuffSummary(streambuf, isArchiveJobs, isRetrieveJobs);
-      last = true;
-    } else {
-      // List failed archive requests
-      if(isArchiveJobs) {
-        for(bool is_buffer_full = false; !m_archiveQueueItorPtr->end() && !is_buffer_full; ++*m_archiveQueueItorPtr) {
-          is_buffer_full = pushRecord(streambuf, **m_archiveQueueItorPtr);
-        }
+int FailedRequestLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) {
+  if(m_isSummary) {
+    // Special handling for -S (Summary) option
+    GetBuffSummary(streambuf);
+  } else {
+    // List failed archive requests
+    if(isArchiveJobs()) {
+      for(bool is_buffer_full = false; !m_archiveQueueItorPtr->end() && !is_buffer_full; ++*m_archiveQueueItorPtr) {
+        is_buffer_full = pushRecord(streambuf, **m_archiveQueueItorPtr);
       }
-      // List failed retrieve requests
-      if(isRetrieveJobs) {
-        for(bool is_buffer_full = false; !m_retrieveQueueItorPtr->end() && !is_buffer_full; ++*m_retrieveQueueItorPtr) {
-          is_buffer_full = pushRecord(streambuf, **m_retrieveQueueItorPtr);
-        }
+    }
+    // List failed retrieve requests
+    if(isRetrieveJobs()) {
+      for(bool is_buffer_full = false; !m_retrieveQueueItorPtr->end() && !is_buffer_full; ++*m_retrieveQueueItorPtr) {
+        is_buffer_full = pushRecord(streambuf, **m_retrieveQueueItorPtr);
       }
     }
-    dlen = streambuf->Size();
-    XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
-    return streambuf;
-  } catch(exception::Exception &ex) {
-    std::ostringstream errMsg;
-    errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what();
-    eInfo.Set(errMsg.str().c_str(), ECANCELED);
-  } catch(std::exception &ex) {
-    std::ostringstream errMsg;
-    errMsg << __FUNCTION__ << " failed: " << ex.what();
-    eInfo.Set(errMsg.str().c_str(), ECANCELED);
-  } catch(...) {
-    std::ostringstream errMsg;
-    errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
-    eInfo.Set(errMsg.str().c_str(), ECANCELED);
   }
-  delete streambuf;
-  return nullptr;
+  return streambuf->Size();
 }
 
 
 
 void FailedRequestLsStream::
-GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, bool isRetrieveJobs)
+GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf)
 {
   SchedulerDatabase::JobsFailedSummary archive_summary;
   SchedulerDatabase::JobsFailedSummary retrieve_summary;
 
-  if(isArchiveJobs) {
+  if(isArchiveJobs()) {
     Data record;
     archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc);
     record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST);
@@ -241,7 +211,7 @@ GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, boo
     record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes);
     streambuf->Push(record);
   }
-  if(isRetrieveJobs) {
+  if(isRetrieveJobs()) {
     Data record;
     retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc);
     record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST);
@@ -249,7 +219,7 @@ GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, boo
     record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes);
     streambuf->Push(record);
   }
-  if(isArchiveJobs && isRetrieveJobs) {
+  if(isArchiveJobs() && isRetrieveJobs()) {
     Data record;
     record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL);
     record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles);
@@ -257,7 +227,7 @@ GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, boo
     streambuf->Push(record);
   }
 
-  m_isSummary = false;
+  m_isSummaryDone = true;
 }
 
 }} // namespace cta::xrd
diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
index 68d6eb6dab..fdbaf658ed 100644
--- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp
+++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
@@ -704,18 +704,18 @@ void RequestMessage::processAdmin_Ls(const cta::admin::AdminCmd &admincmd, cta::
 
 void RequestMessage::processArchiveFile_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream)
 {
-   using namespace cta::admin;
+  using namespace cta::admin;
 
-   // Create a XrdSsi stream object to return the results
-   stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
+  // Create a XrdSsi stream object to return the results
+  stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
 
-   // Should the client display column headers?
-   if(has_flag(OptionBoolean::SHOW_HEADER)) {
-      response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
-                                                                : HeaderType::ARCHIVEFILE_LS);
-   }
+  // Should the client display column headers?
+  if(has_flag(OptionBoolean::SHOW_HEADER)) {
+    response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
+                                                              : HeaderType::ARCHIVEFILE_LS);
+  }
 
-   response.set_type(cta::xrd::Response::RSP_SUCCESS);
+  response.set_type(cta::xrd::Response::RSP_SUCCESS);
 }
 
 
@@ -984,36 +984,17 @@ void RequestMessage::processDrive_Rm(const cta::admin::AdminCmd &admincmd, cta::
 
 void RequestMessage::processFailedRequest_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream)
 {
-   using namespace cta::admin;
-
-   if(has_flag(OptionBoolean::SHOW_LOG_ENTRIES) && has_flag(OptionBoolean::SUMMARY)) {
-      throw cta::exception::UserError("--log and --summary are mutually exclusive");
-   }
-
-   auto tapepool     = getOptional(OptionString::TAPE_POOL);
-   auto vid          = getOptional(OptionString::VID);
-   bool justarchive  = has_flag(OptionBoolean::JUSTARCHIVE)  || tapepool;
-   bool justretrieve = has_flag(OptionBoolean::JUSTRETRIEVE) || vid;
-
-   if(justarchive && justretrieve) {
-      throw cta::exception::UserError("--justarchive/--tapepool and --justretrieve/--vid options are mutually exclusive");
-   }
-
-   OStoreDB::ArchiveQueueItor_t *archiveQueueItorPtr = justretrieve ? nullptr :
-      m_scheddb.getArchiveJobItorPtr(tapepool ? *tapepool : "", objectstore::JobQueueType::FailedJobs);
-   OStoreDB::RetrieveQueueItor_t *retrieveQueueItorPtr = justarchive ? nullptr :
-      m_scheddb.getRetrieveJobItorPtr(vid ? *vid : "", objectstore::JobQueueType::FailedJobs);
+  using namespace cta::admin;
 
-   // Create a XrdSsi stream object to return the results
-   stream = new FailedRequestLsStream(m_scheduler, archiveQueueItorPtr, retrieveQueueItorPtr, has_flag(OptionBoolean::SUMMARY), has_flag(OptionBoolean::SHOW_LOG_ENTRIES), m_lc);
+  stream = new FailedRequestLsStream(*this, m_catalogue, m_scheduler, m_scheddb, m_lc);
 
-   // Should the client display column headers?
-   if(has_flag(OptionBoolean::SHOW_HEADER)) {
-      response.set_show_header(has_flag(OptionBoolean::SUMMARY) ?
-         HeaderType::FAILEDREQUEST_LS_SUMMARY : HeaderType::FAILEDREQUEST_LS);
-   }
+  // Should the client display column headers?
+  if(has_flag(OptionBoolean::SHOW_HEADER)) {
+    response.set_show_header(has_flag(OptionBoolean::SUMMARY) ?
+      HeaderType::FAILEDREQUEST_LS_SUMMARY : HeaderType::FAILEDREQUEST_LS);
+  }
 
-   response.set_type(cta::xrd::Response::RSP_SUCCESS);
+  response.set_type(cta::xrd::Response::RSP_SUCCESS);
 }
 
 
-- 
GitLab