From c84307abdbdaebeea7b3428a4b65e6474c5a2037 Mon Sep 17 00:00:00 2001
From: Michael Davis <michael.davis@cern.ch>
Date: Thu, 6 Jun 2019 11:17:53 +0200
Subject: [PATCH] Makes ListPendingQueueStream class a subclass of XrdCtaStream

---
 xroot_plugins/XrdCtaFailedRequestLs.hpp   |   4 +-
 xroot_plugins/XrdCtaListPendingQueue.hpp  | 198 ++++++++++------------
 xroot_plugins/XrdSsiCtaRequestMessage.cpp |  46 ++---
 3 files changed, 108 insertions(+), 140 deletions(-)

diff --git a/xroot_plugins/XrdCtaFailedRequestLs.hpp b/xroot_plugins/XrdCtaFailedRequestLs.hpp
index 99a13ea910..d9b2a6063d 100644
--- a/xroot_plugins/XrdCtaFailedRequestLs.hpp
+++ b/xroot_plugins/XrdCtaFailedRequestLs.hpp
@@ -20,8 +20,6 @@
 
 #include <xroot_plugins/XrdCtaStream.hpp>
 #include <xroot_plugins/XrdSsiCtaRequestMessage.hpp>
-//#include <scheduler/OStoreDB/OStoreDB.hpp>
-
 
 
 namespace cta { namespace xrd {
@@ -108,6 +106,8 @@ FailedRequestLsStream::FailedRequestLsStream(const RequestMessage &requestMsg,
 {
   using namespace cta::admin;
 
+  XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "FailedRequestLsStream() constructor");
+
   if(m_isLogEntries && m_isSummary) {
     throw cta::exception::UserError("--log and --summary are mutually exclusive");
   }
diff --git a/xroot_plugins/XrdCtaListPendingQueue.hpp b/xroot_plugins/XrdCtaListPendingQueue.hpp
index d79f4effaa..d06dc47218 100644
--- a/xroot_plugins/XrdCtaListPendingQueue.hpp
+++ b/xroot_plugins/XrdCtaListPendingQueue.hpp
@@ -1,7 +1,7 @@
 /*!
  * @project        The CERN Tape Archive (CTA)
- * @brief          CTA Frontend List Pending Files stream implementation
- * @copyright      Copyright 2017 CERN
+ * @brief          CTA Frontend List Pending Archive/List Pending Retrieves stream implementation
+ * @copyright      Copyright 2019 CERN
  * @license        This program is free software: you can redistribute it and/or modify
  *                 it under the terms of the GNU General Public License as published by
  *                 the Free Software Foundation, either version 3 of the License, or
@@ -18,9 +18,8 @@
 
 #pragma once
 
-#include <XrdSsiPbOStreamBuffer.hpp>
-#include <scheduler/OStoreDB/OStoreDB.hpp>
-
+#include <xroot_plugins/XrdCtaStream.hpp>
+#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp>
 
 
 namespace cta { namespace xrd {
@@ -29,115 +28,30 @@ namespace cta { namespace xrd {
  * Stream object which implements "lpa" and "lpr" commands
  */
 template<typename QueueItor_t>
-class ListPendingQueue : public XrdSsiStream
+class ListPendingQueueStream : public XrdCtaStream
 {
 public:
-  ListPendingQueue(QueueItor_t queueItor, bool is_extended) :
-    XrdSsiStream(XrdSsiStream::isActive),
-    m_isExtended(is_extended),
-    m_queueItor(std::move(queueItor))
-  {
-    XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "ListPendingQueue() constructor");
-  }
+  ListPendingQueueStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue,
+    cta::Scheduler &scheduler, OStoreDB &schedDb);
 
-  virtual ~ListPendingQueue() {
-    XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "~ListPendingQueue() destructor");
+private:
+  /*!
+   * Can we close the stream?
+   */
+  virtual bool isDone() const {
+    return m_queueItor.end();
   }
 
   /*!
-   * Synchronously obtain data from an active stream
-   *
-   * Active streams can only exist on the server-side. This XRootD SSI Stream class is marked as an
-   * active stream in the constructor.
-   *
-   * @param[out]       eInfo   The object to receive any error description.
-   * @param[in,out]    dlen    input:  the optimal amount of data wanted (this is a hint)
-   *                           output: the actual amount of data returned in the buffer.
-   * @param[in,out]    last    input:  should be set to false.
-   *                           output: if true it indicates that no more data remains to be returned
-   *                                   either for this call or on the next call.
-   *
-   * @return    Pointer to the Buffer object that contains a pointer to the the data (see below). The
-   *            buffer must be returned to the stream using Buffer::Recycle(). The next member is usable.
-   * @retval    0    No more data remains or an error occurred:
-   *                 last = true:  No more data remains.
-   *                 last = false: A fatal error occurred, eRef has the reason.
+   * Fill the buffer
    */
-  virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override {
-    XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)");
-
-    XrdSsiPb::OStreamBuffer<Data> *streambuf;
-
-    try {
-      // If there is nothing more to send, close the stream
-      if(m_queueItor.end())
-      {
-        last = true;
-        return nullptr;
-      }
-
-      // Fill the buffer
-
-      streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
-
-      /*
-       * is_buffer_full is set to true when we have one full block of data in the buffer, i.e. enough
-       * data to send to the client. The actual buffer size is double the block size, so we can keep
-       * writing a few additional records after is_buffer_full is true. These will be sent on the next
-       * iteration. If we exceed the hard limit of double the block size, Push() will throw an exception.  
-       */
-
-      if(m_isExtended) {
-        // Detailed listing of all queued files
-
-        for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; ++m_queueItor)
-        {
-           is_buffer_full = pushRecord(streambuf, m_queueItor.qid(), *m_queueItor);
-        }
-      } else {
-        // Summary by tapepool or vid
-
-        for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; )
-        {
-           uint64_t total_files = 0;
-           uint64_t total_size = 0;
-
-           auto qid = m_queueItor.qid();
-
-           for(m_queueItor.beginq(); !m_queueItor.endq(); ++m_queueItor) {
-              ++total_files;
-              total_size += fileSize(*m_queueItor);
-           }
-
-           is_buffer_full = pushRecord(streambuf, qid, total_files, total_size);
-        }
-      }
-
-      dlen = streambuf->Size();
-      XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
-    } catch(exception::Exception &ex) {
-      std::ostringstream errMsg;
-      errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what();
-      eInfo.Set(errMsg.str().c_str(), ECANCELED);
-      delete streambuf;
-    } catch(std::exception &ex) {
-      std::ostringstream errMsg;
-      errMsg << __FUNCTION__ << " failed: " << ex.what();
-      eInfo.Set(errMsg.str().c_str(), ECANCELED);
-      delete streambuf;
-    } catch(...) {
-      std::ostringstream errMsg;
-      errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
-      eInfo.Set(errMsg.str().c_str(), ECANCELED);
-      delete streambuf;
-    }
-    return streambuf;
-  }
+  virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf);
 
 private:
   bool m_isExtended;                                                      //!< Summary or extended listing?
-  QueueItor_t m_queueItor;                                                //!< Archive/Retrieve Queue Iterator
+  optional<std::string> m_filter;                                         //!< Tapepool or Vid to filter results
 
+  QueueItor_t m_queueItor;                                                //!< Archive/Retrieve Queue Iterator
   typedef decltype(*m_queueItor) data_t;                                  //!< Infer data type from template type
 
   uint64_t fileSize(const data_t &job);                                   //!< Obtain file size from queue item
@@ -151,16 +65,78 @@ private:
 };
 
 
+template<>
+ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>::
+ListPendingQueueStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue,
+  cta::Scheduler &scheduler, OStoreDB &schedDb) :
+  XrdCtaStream(catalogue, scheduler),
+  m_isExtended(requestMsg.has_flag(admin::OptionBoolean::EXTENDED)),
+  m_filter(requestMsg.getOptional(admin::OptionString::TAPE_POOL)),
+  m_queueItor(schedDb.getArchiveJobItor(m_filter ? m_filter.value() : ""))
+{
+  XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "ListPendingQueueStream<Archive>() constructor");
+}
+
+
+template<>
+ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>::
+ListPendingQueueStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue,
+  cta::Scheduler &scheduler, OStoreDB &schedDb) :
+  XrdCtaStream(catalogue, scheduler),
+  m_isExtended(requestMsg.has_flag(admin::OptionBoolean::EXTENDED)),
+  m_filter(requestMsg.getOptional(admin::OptionString::VID)),
+  m_queueItor(schedDb.getRetrieveJobItor(m_filter ? m_filter.value() : ""))
+{
+  XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "ListPendingQueueStream<Retrieve>() constructor");
+}
+
+
+template <typename T>
+int ListPendingQueueStream<T>::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf)
+{
+  // is_buffer_full is set to true when we have one full block of data in the buffer, i.e. enough data
+  // to send to the client. The actual buffer size is double the block size, so we can keep writing a
+  // few additional records after is_buffer_full is true. These will be sent on the next iteration. If
+  // we exceed the hard limit of double the block size, Push() will throw an exception.
+
+  if(m_isExtended) {
+    // Detailed listing of all queued files
+    for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; ++m_queueItor) {
+      is_buffer_full = pushRecord(streambuf, m_queueItor.qid(), *m_queueItor);
+    }
+  } else {
+    // Summary by tapepool or vid
+
+    for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; )
+    {
+      uint64_t total_files = 0;
+      uint64_t total_size = 0;
+
+      auto qid = m_queueItor.qid();
+
+      for(m_queueItor.beginq(); !m_queueItor.endq(); ++m_queueItor) {
+        ++total_files;
+        total_size += fileSize(*m_queueItor);
+      }
+
+      is_buffer_full = pushRecord(streambuf, qid, total_files, total_size);
+    }
+  }
+
+  return streambuf->Size();
+}
+
 
 // Template specialisations for Archive Queue types
 
 template<>
-uint64_t ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fileSize(const data_t &job) {
+uint64_t ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>::fileSize(const data_t &job) {
   return job.request.fileSize;
 }
 
+
 template<>
-bool ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
+bool ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
   const std::string &tapepool, const common::dataStructures::ArchiveJob &job)
 {
   Data record;
@@ -187,8 +163,9 @@ bool ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStrea
   return streambuf->Push(record);
 }
 
+
 template<>
-bool ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
+bool ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
   const std::string &tapepool, const uint64_t &total_files, const uint64_t &total_size)
 {
   Data record;
@@ -204,16 +181,16 @@ bool ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStrea
 }
 
 
-
 // Template specialisations for Retrieve Queue types
 
 template<>
-uint64_t ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fileSize(const data_t &job) {
+uint64_t ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>::fileSize(const data_t &job) {
   return job.fileSize;
 }
 
+
 template<>
-bool ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
+bool ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
   const std::string &vid, const common::dataStructures::RetrieveJob &job)
 {
   bool is_buffer_full = false;
@@ -254,8 +231,9 @@ bool ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStre
   return is_buffer_full;
 }
 
+
 template<>
-bool ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
+bool ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
   const std::string &vid, const uint64_t &total_files, const uint64_t &total_size)
 {
   Data record;
diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
index fdbaf658ed..dd353ef4ac 100644
--- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp
+++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
@@ -1086,46 +1086,36 @@ void RequestMessage::processGroupMountRule_Ls(const cta::admin::AdminCmd &adminc
 
 void RequestMessage::processListPendingArchives(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream)
 {
-   using namespace cta::admin;
-
-   // Filter criteria
-   auto tapepool = getOptional(OptionString::TAPE_POOL);
+  using namespace cta::admin;
 
-   // Create a XrdSsi stream object to return the results
-   stream = new ListPendingQueue<OStoreDB::ArchiveQueueItor_t>(
-      m_scheddb.getArchiveJobItor(tapepool ? tapepool.value() : ""),
-      has_flag(OptionBoolean::EXTENDED));
+  // Create a XrdSsi stream object to return the results
+  stream = new ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>(*this, m_catalogue, m_scheduler, m_scheddb);
 
-   // Should the client display column headers?
-   if(has_flag(OptionBoolean::SHOW_HEADER)) {
-      response.set_show_header(has_flag(OptionBoolean::EXTENDED) ? HeaderType::LISTPENDINGARCHIVES
-                                                                 : HeaderType::LISTPENDINGARCHIVES_SUMMARY);
-   }
+  // Should the client display column headers?
+  if(has_flag(OptionBoolean::SHOW_HEADER)) {
+    response.set_show_header(has_flag(OptionBoolean::EXTENDED) ? HeaderType::LISTPENDINGARCHIVES
+                                                               : HeaderType::LISTPENDINGARCHIVES_SUMMARY);
+  }
 
-   response.set_type(cta::xrd::Response::RSP_SUCCESS);
+  response.set_type(cta::xrd::Response::RSP_SUCCESS);
 }
 
 
 
 void RequestMessage::processListPendingRetrieves(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream)
 {
-   using namespace cta::admin;
-
-   // Filter criteria
-   auto vid = getOptional(OptionString::VID);
+  using namespace cta::admin;
 
-   // Create a XrdSsi stream object to return the results
-   stream = new ListPendingQueue<OStoreDB::RetrieveQueueItor_t>(
-      m_scheddb.getRetrieveJobItor(vid ? vid.value() : ""),
-      has_flag(OptionBoolean::EXTENDED));
+  // Create a XrdSsi stream object to return the results
+  stream = new ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>(*this, m_catalogue, m_scheduler, m_scheddb);
 
-   // Should the client display column headers?
-   if(has_flag(OptionBoolean::SHOW_HEADER)) {
-         response.set_show_header(has_flag(OptionBoolean::EXTENDED) ? HeaderType::LISTPENDINGRETRIEVES
-                                                                    : HeaderType::LISTPENDINGRETRIEVES_SUMMARY);
-   }
+  // Should the client display column headers?
+  if(has_flag(OptionBoolean::SHOW_HEADER)) {
+    response.set_show_header(has_flag(OptionBoolean::EXTENDED) ? HeaderType::LISTPENDINGRETRIEVES
+                                                               : HeaderType::LISTPENDINGRETRIEVES_SUMMARY);
+  }
 
-   response.set_type(cta::xrd::Response::RSP_SUCCESS);
+  response.set_type(cta::xrd::Response::RSP_SUCCESS);
 }
 
 
-- 
GitLab