From 1701ec21376124d263c61919057501848e2b70a9 Mon Sep 17 00:00:00 2001
From: Michael Davis <michael.davis@cern.ch>
Date: Tue, 8 May 2018 15:19:48 +0200
Subject: [PATCH] [lpa-stream] Separate header and body in cta-admin lpa
 response

---
 xroot_plugins/XrdCtaArchiveFileLs.hpp     |   4 +-
 xroot_plugins/XrdCtaListPendingStream.hpp | 144 ++++++++++++++++++++++
 xroot_plugins/XrdSsiCtaRequestMessage.cpp |  73 ++++++++---
 xroot_plugins/XrdSsiCtaRequestMessage.hpp |  28 ++---
 4 files changed, 215 insertions(+), 34 deletions(-)
 create mode 100644 xroot_plugins/XrdCtaListPendingStream.hpp

diff --git a/xroot_plugins/XrdCtaArchiveFileLs.hpp b/xroot_plugins/XrdCtaArchiveFileLs.hpp
index 94416e4624..cbba7a4b84 100644
--- a/xroot_plugins/XrdCtaArchiveFileLs.hpp
+++ b/xroot_plugins/XrdCtaArchiveFileLs.hpp
@@ -18,8 +18,8 @@
 
 #pragma once
 
-#include "XrdSsiPbOStreamBuffer.hpp"
-#include "catalogue/Catalogue.hpp"
+#include <XrdSsiPbOStreamBuffer.hpp>
+#include <catalogue/Catalogue.hpp>
 
 
 
diff --git a/xroot_plugins/XrdCtaListPendingStream.hpp b/xroot_plugins/XrdCtaListPendingStream.hpp
new file mode 100644
index 0000000000..f8173b7064
--- /dev/null
+++ b/xroot_plugins/XrdCtaListPendingStream.hpp
@@ -0,0 +1,144 @@
+/*!
+ * @project        The CERN Tape Archive (CTA)
+ * @brief          CTA Frontend List Pending Files stream implementation
+ * @copyright      Copyright 2017 CERN
+ * @license        This program is free software: you can redistribute it and/or modify
+ *                 it under the terms of the GNU General Public License as published by
+ *                 the Free Software Foundation, either version 3 of the License, or
+ *                 (at your option) any later version.
+ *
+ *                 This program is distributed in the hope that it will be useful,
+ *                 but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *                 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *                 GNU General Public License for more details.
+ *
+ *                 You should have received a copy of the GNU General Public License
+ *                 along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <XrdSsiPbOStreamBuffer.hpp>
+#include <catalogue/Catalogue.hpp>
+
+
+
+namespace cta { namespace xrd {
+
+/*!
+ * Stream object which implements "lpa" and "lpr" commands
+ */
+class ListPendingStream : public XrdSsiStream
+{
+public:
+   enum ListStreamType { LIST_ARCHIVES, LIST_RETRIEVES };
+
+   ListPendingStream(enum ListStreamType listStreamType, bool is_extended,
+cta::catalogue::ArchiveFileItor archiveFileItor) :
+      XrdSsiStream(XrdSsiStream::isActive),
+      m_archiveFileItor(std::move(archiveFileItor))
+   {
+      XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "ListPendingStream() constructor");
+   }
+
+   virtual ~ListPendingStream() {
+      XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "~ListPendingStream() destructor");
+   }
+
+   /*!
+    * Synchronously obtain data from an active stream
+    *
+    * Active streams can only exist on the server-side. This XRootD SSI Stream class is marked as an
+    * active stream in the constructor.
+    *
+    * @param[out]       eInfo   The object to receive any error description.
+    * @param[in,out]    dlen    input:  the optimal amount of data wanted (this is a hint)
+    *                           output: the actual amount of data returned in the buffer.
+    * @param[in,out]    last    input:  should be set to false.
+    *                           output: if true it indicates that no more data remains to be returned
+    *                                   either for this call or on the next call.
+    *
+    * @return    Pointer to the Buffer object that contains a pointer to the the data (see below). The
+    *            buffer must be returned to the stream using Buffer::Recycle(). The next member is usable.
+    * @retval    0    No more data remains or an error occurred:
+    *                 last = true:  No more data remains.
+    *                 last = false: A fatal error occurred, eRef has the reason.
+    */
+   virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) {
+      XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)");
+
+      XrdSsiPb::OStreamBuffer<cta::xrd::Data> *streambuf;
+
+      try {
+         if(!m_archiveFileItor.hasMore()) {
+            // Nothing more to send, close the stream
+            last = true;
+            return nullptr;
+         }
+
+         streambuf = new XrdSsiPb::OStreamBuffer<cta::xrd::Data>(dlen);
+
+         for(bool is_buffer_full = false; m_archiveFileItor.hasMore() && !is_buffer_full; )
+         {
+            const cta::common::dataStructures::ArchiveFile archiveFile = m_archiveFileItor.next();
+
+            for(auto jt = archiveFile.tapeFiles.cbegin(); jt != archiveFile.tapeFiles.cend(); jt++) {
+               cta::xrd::Data record;
+
+               // Copy number
+               record.mutable_af_ls_item()->set_copy_nb(jt->first);
+
+               // Archive file
+               auto af = record.mutable_af_ls_item()->mutable_af();
+               af->set_archive_id(archiveFile.archiveFileID);
+               af->set_disk_instance(archiveFile.diskInstance);
+               af->set_disk_id(archiveFile.diskFileId);
+               af->set_size(archiveFile.fileSize);
+               af->mutable_cs()->set_type(archiveFile.checksumType);
+               af->mutable_cs()->set_value(archiveFile.checksumValue);
+               af->set_storage_class(archiveFile.storageClass);
+               af->mutable_df()->set_owner(archiveFile.diskFileInfo.owner);
+               af->mutable_df()->set_group(archiveFile.diskFileInfo.group);
+               af->mutable_df()->set_path(archiveFile.diskFileInfo.path);
+               af->set_creation_time(archiveFile.creationTime);
+
+               // Tape file
+               auto tf = record.mutable_af_ls_item()->mutable_tf();
+               tf->set_vid(jt->second.vid);
+               tf->set_f_seq(jt->second.fSeq);
+               tf->set_block_id(jt->second.blockId);
+
+               // is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
+               // enough data to send to the client. The actual buffer size is double the block size,
+               // so we can keep writing a few additional records after is_buffer_full is true. These
+               // will be sent on the next iteration. If we exceed the hard limit of double the block
+               // size, Push() will throw an exception.
+               is_buffer_full = streambuf->Push(record);
+            }
+         }
+         dlen = streambuf->Size();
+         XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
+      } catch(cta::exception::Exception &ex) {
+         throw std::runtime_error(ex.getMessage().str());
+      } catch(std::exception &ex) {
+         std::ostringstream errMsg;
+         errMsg << __FUNCTION__ << " failed: " << ex.what();
+         eInfo.Set(errMsg.str().c_str(), ECANCELED);
+         delete streambuf;
+      } catch(...) {
+         std::ostringstream errMsg;
+         errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
+         eInfo.Set(errMsg.str().c_str(), ECANCELED);
+         delete streambuf;
+      }
+      return streambuf;
+   }
+
+private:
+   catalogue::ArchiveFileItor m_archiveFileItor;
+
+   static constexpr const char* const LOG_SUFFIX  = "ListPendingStream";    //!< Identifier for log messages
+};
+
+}} // namespace cta::xrd
+
diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
index 7ed8515614..3a4973c17d 100644
--- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp
+++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
@@ -18,7 +18,7 @@
 
 #include <iomanip> // for setw
 
-#include "common/utils/utils.hpp"
+#include <common/utils/utils.hpp>
 #include <common/utils/Regex.hpp>
 
 #include <XrdSsiPbException.hpp>
@@ -26,6 +26,7 @@ using XrdSsiPb::PbException;
 
 #include <cmdline/CtaAdminCmdParse.hpp>
 #include "XrdCtaArchiveFileLs.hpp"
+//#include "XrdCtaListPendingStream.hpp"
 #include "XrdSsiCtaRequestMessage.hpp"
 
 
@@ -33,6 +34,12 @@ using XrdSsiPb::PbException;
 namespace cta {
 namespace xrd {
 
+// Codes to change colours for console output (when sending a response to cta-admin)
+const char* const TEXT_RED    = "\x1b[31;1m";
+const char* const TEXT_NORMAL = "\x1b[0m\n";
+
+
+
 /*
  * Convert AdminCmd <Cmd, SubCmd> pair to an integer so that it can be used in a switch statement
  */
@@ -156,7 +163,7 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons
                processGroupMountRule_Ls(request.admincmd(), response);
                break;
             case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
-               processListPendingArchives(request.admincmd(), response);
+               processListPendingArchives(request.admincmd(), response, stream);
                break;
             case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
                processListPendingRetrieves(request.admincmd(), response);
@@ -851,9 +858,6 @@ void RequestMessage::processArchiveFile_Ls(const cta::admin::AdminCmd &admincmd,
 
       // Send the column headers in the metadata
       if(has_flag(OptionBoolean::SHOW_HEADER)) {
-         const char* const TEXT_RED    = "\x1b[31;1m";
-         const char* const TEXT_NORMAL = "\x1b[0m\n";
-
          cmdlineOutput << TEXT_RED <<
          std::setfill(' ') << std::setw(7)  << std::right << "id"             << ' ' <<
          std::setfill(' ') << std::setw(7)  << std::right << "copy no"        << ' ' <<
@@ -1222,15 +1226,54 @@ void RequestMessage::processGroupMountRule_Ls(const cta::admin::AdminCmd &adminc
 
 
 
-void RequestMessage::processListPendingArchives(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response)
+void RequestMessage::processListPendingArchives(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream)
 {
    using namespace cta::admin;
 
    std::stringstream cmdlineOutput;
-   std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob> > result;
 
+   // Search filter criteria
    auto tapepool = getOptional(OptionString::TAPE_POOL);
 
+#if 0
+   // Create a XrdSsi stream object to return the results
+   if(tapepool) {
+      stream = new ListPendingStream(ListPendingStream::LIST_ARCHIVES, has_flag(OptionBoolean::EXTENDED),
+                                     m_scheduler.getPendingArchiveJobs(tapepool.value(), m_lc));
+   } else {
+      stream = new ListPendingStream(ListPendingStream::LIST_ARCHIVES, has_flag(OptionBoolean::EXTENDED),
+                                     m_scheduler.getPendingArchiveJobs(m_lc));
+   }
+#endif
+
+   // Send the column headers in the metadata
+   if(has_flag(OptionBoolean::SHOW_HEADER)) {
+      if(has_flag(OptionBoolean::EXTENDED)) {
+         cmdlineOutput << TEXT_RED
+         << std::setfill(' ') << std::setw(18) << std::right << "tapepool"       << ' '
+         << std::setfill(' ') << std::setw(7)  << std::right << "id"             << ' '
+         << std::setfill(' ') << std::setw(13) << std::right << "storage class"  << ' '
+         << std::setfill(' ') << std::setw(7)  << std::right << "copy no"        << ' '
+         << std::setfill(' ') << std::setw(7)  << std::right << "disk id"        << ' '
+         << std::setfill(' ') << std::setw(8)  << std::right << "instance"       << ' '
+         << std::setfill(' ') << std::setw(13) << std::right << "checksum type"  << ' '
+         << std::setfill(' ') << std::setw(14) << std::right << "checksum value" << ' '
+         << std::setfill(' ') << std::setw(12) << std::right << "size"           << ' '
+         << std::setfill(' ') << std::setw(8)  << std::right << "user"           << ' '
+         << std::setfill(' ') << std::setw(8)  << std::right << "group"          << ' '
+         <<                                                     "path"           << TEXT_NORMAL;
+      } else {
+         cmdlineOutput << TEXT_RED
+         << std::setfill(' ') << std::setw(18) << std::right << "tapepool"    << ' '
+         << std::setfill(' ') << std::setw(13) << std::right << "total files" << ' '
+         << std::setfill(' ') << std::setw(12) << std::right << "total size"  << ' '
+         << TEXT_NORMAL;
+      }
+   }
+
+#if 1
+   std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob>> result;
+
    if(tapepool) {
       std::list<cta::common::dataStructures::ArchiveJob> list = m_scheduler.getPendingArchiveJobs(tapepool.value(), m_lc);
       if(!list.empty()) result[tapepool.value()] = list;
@@ -1240,14 +1283,10 @@ void RequestMessage::processListPendingArchives(const cta::admin::AdminCmd &admi
 
    if(!result.empty())
    {
+      std::vector<std::vector<std::string>> responseTable;
+
       if(has_flag(OptionBoolean::EXTENDED))
       {
-         std::vector<std::vector<std::string>> responseTable;
-         std::vector<std::string> header = {
-            "tapepool","id","storage class","copy no.","disk id","instance","checksum type",
-            "checksum value","size","user","group","path"
-         };
-         if(has_flag(OptionBoolean::SHOW_HEADER)) responseTable.push_back(header);    
          for(auto it = result.cbegin(); it != result.cend(); it++) {
             for(auto jt = it->second.cbegin(); jt != it->second.cend(); jt++)
             {
@@ -1267,11 +1306,7 @@ void RequestMessage::processListPendingArchives(const cta::admin::AdminCmd &admi
                responseTable.push_back(currentRow);
             }
          }
-         cmdlineOutput << formatResponse(responseTable);
       } else {
-         std::vector<std::vector<std::string>> responseTable;
-         std::vector<std::string> header = { "tapepool","total files","total size" };
-         if(has_flag(OptionBoolean::SHOW_HEADER)) responseTable.push_back(header);
          for(auto it = result.cbegin(); it != result.cend(); it++) {
             std::vector<std::string> currentRow;
             currentRow.push_back(it->first);
@@ -1283,9 +1318,11 @@ void RequestMessage::processListPendingArchives(const cta::admin::AdminCmd &admi
             currentRow.push_back(std::to_string(static_cast<unsigned long long>(size)));
             responseTable.push_back(currentRow);
          }
-         cmdlineOutput << formatResponse(responseTable);
       }
+
+      cmdlineOutput << formatResponse(responseTable);
    }
+#endif
 
    response.set_message_txt(cmdlineOutput.str());
    response.set_type(cta::xrd::Response::RSP_SUCCESS);
diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.hpp b/xroot_plugins/XrdSsiCtaRequestMessage.hpp
index d4bfddae86..82d1d170b4 100644
--- a/xroot_plugins/XrdSsiCtaRequestMessage.hpp
+++ b/xroot_plugins/XrdSsiCtaRequestMessage.hpp
@@ -98,8 +98,6 @@ private:
    admincmd_t processGroupMountRule_Ch;
    admincmd_t processGroupMountRule_Rm;
    admincmd_t processGroupMountRule_Ls;
-   admincmd_t processListPendingArchives;
-   admincmd_t processListPendingRetrieves;
    admincmd_t processLogicalLibrary_Add;
    admincmd_t processLogicalLibrary_Ch;
    admincmd_t processLogicalLibrary_Rm;
@@ -141,24 +139,26 @@ private:
    admincmd_t processVerify_Err;
 
    /*!
-    * Log an admin command
+    * Process AdminCmd events which can return a stream response
     *
-    * @param[in]    admincmd    CTA Admin command request message
-    * @param[in]    t           CTA Catalogue timer
+    * @param[in]     admincmd        CTA Admin command request message
+    * @param[out]    response        Response protocol buffer message. This is used for response
+    *                                headers or for summary responses.
+    * @param[out]    stream          Reference to Response stream message pointer
     */
-   void logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t);
+   typedef void admincmdstream_t(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream);
+
+   admincmdstream_t processArchiveFile_Ls;
+   admincmdstream_t processListPendingArchives;
+   admincmd_t processListPendingRetrieves; // TODO: convert to stream format
 
    /*!
-    * "af ls" command
-    *
-    * This is a special case as it can return a protocol buffer (for the summary) or a stream (for
-    * the full listing)
+    * Log an admin command
     *
-    * @param[in]     admincmd        CTA Admin command request message
-    * @param[out]    response        Response protocol buffer message
-    * @param[out]    stream          Reference to Response stream message pointer
+    * @param[in]    admincmd    CTA Admin command request message
+    * @param[in]    t           CTA Catalogue timer
     */
-   void processArchiveFile_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream);
+   void logAdminCmd(const std::string &function, const cta::admin::AdminCmd &admincmd, cta::utils::Timer &t);
 
    /*!
     * Drive state enum
-- 
GitLab