From df0f4da336d1e9514c0dd652c47b774b95b5d2bd Mon Sep 17 00:00:00 2001
From: Michael Davis <michael.davis@cern.ch>
Date: Mon, 4 Jun 2018 14:42:02 +0200
Subject: [PATCH] [lpa-stream] Refactors XrdCtaListPendingQueue.hpp

Allows pushing multiple protobufs per queue item, needed in the case of
listing retrieve queue jobs, which can have several VIDs in the same
job.
---
 xroot_plugins/XrdCtaListPendingQueue.hpp | 55 ++++++++++++------------
 1 file changed, 27 insertions(+), 28 deletions(-)

diff --git a/xroot_plugins/XrdCtaListPendingQueue.hpp b/xroot_plugins/XrdCtaListPendingQueue.hpp
index 53fdacec75..dfab642ced 100644
--- a/xroot_plugins/XrdCtaListPendingQueue.hpp
+++ b/xroot_plugins/XrdCtaListPendingQueue.hpp
@@ -80,7 +80,13 @@ public:
          // Fill the buffer
 
          streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
-         Data record;
+
+         /*
+          * is_buffer_full is set to true when we have one full block of data in the buffer, i.e. enough
+          * data to send to the client. The actual buffer size is double the block size, so we can keep
+          * writing a few additional records after is_buffer_full is true. These will be sent on the next
+          * iteration. If we exceed the hard limit of double the block size, Push() will throw an exception.  
+          */
 
          if(m_isExtended) {
             // Detailed listing of all queued files
@@ -89,14 +95,7 @@ public:
             {
                auto job = m_queueItor.getJob();
                if(!job.first) continue;
-               record = fillRecord(m_queueItor.qid(), job.second);
-
-               // is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
-               // enough data to send to the client. The actual buffer size is double the block size,
-               // so we can keep writing a few additional records after is_buffer_full is true. These
-               // will be sent on the next iteration. If we exceed the hard limit of double the block
-               // size, Push() will throw an exception.
-               is_buffer_full = streambuf->Push(record);
+               is_buffer_full = pushRecord(streambuf, m_queueItor.qid(), job.second);
             }
          } else {
             // Summary by tapepool or vid
@@ -114,9 +113,7 @@ public:
                   }
                }
 
-               record = fillRecord(m_queueItor.qid(), total_files, total_size);
-
-               is_buffer_full = streambuf->Push(record);
+               is_buffer_full = pushRecord(streambuf, m_queueItor.qid(), total_files, total_size);
             }
          }
 
@@ -144,9 +141,11 @@ private:
 
    typedef decltype(m_queueItor.getJob().second) data_t;                   //!< Infer data type from template type
 
-   Data fillRecord(const std::string &tape_id, const data_t &job);         //!< Convert data to protobuf
-   Data fillRecord(const std::string &tape_id,
-      const uint64_t &total_files, const uint64_t &total_size);            //!< Convert summary to protobuf
+   bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,               //!< Convert data to protobufs and put on stream
+      const std::string &tape_id, const data_t &job);
+   bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,               //!< Convert summary to protobufs and put on stream
+      const std::string &tape_id, const uint64_t &total_files,
+      const uint64_t &total_size);
 
    static uint64_t fileSize(const data_t &job);                            //!< Obtain file size from queue item
    static constexpr const char* const LOG_SUFFIX  = "ListPendingQueue";    //!< Identifier for log messages
@@ -157,8 +156,8 @@ private:
 // Template specialisations for Archive Queue types
 
 template<>
-Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool,
-   const common::dataStructures::ArchiveJob &job)
+bool ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
+   const std::string &tapepool, const common::dataStructures::ArchiveJob &job)
 {
    Data record;
 
@@ -184,12 +183,12 @@ Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::strin
    af->mutable_df()->set_group(job.request.requester.group);
    af->mutable_df()->set_path(job.request.diskFileInfo.path);
 
-   return record;
+   return streambuf->Push(record);
 }
 
 template<>
-Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool,
-   const uint64_t &total_files, const uint64_t &total_size)
+bool ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
+   const std::string &tapepool, const uint64_t &total_files, const uint64_t &total_size)
 {
    Data record;
 
@@ -203,7 +202,7 @@ Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::strin
    record.mutable_af_summary_item()->set_total_files(total_files);
    record.mutable_af_summary_item()->set_total_size(total_size);
 
-   return record;
+   return streambuf->Push(record);
 }
 
 template<>
@@ -216,8 +215,8 @@ uint64_t ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fileSize(const data_t &
 // Template specialisations for Retrieve Queue types
 
 template<>
-Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::string &vid,
-   const common::dataStructures::RetrieveJob &job)
+bool ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
+   const std::string &vid, const common::dataStructures::RetrieveJob &job)
 {
 
 #if 0
@@ -267,12 +266,12 @@ Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::stri
    af->mutable_df()->set_group(job.request.requester.group);
    af->mutable_df()->set_path(job.request.diskFileInfo.path);
 
-   return record;
+   return streambuf->Push(record);
 }
 
 template<>
-Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::string &tapepool,
-   const uint64_t &total_files, const uint64_t &total_size)
+bool ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
+   const std::string &vid, const uint64_t &total_files, const uint64_t &total_size)
 {
 #if 0
          for(auto it = result.cbegin(); it != result.cend(); it++)
@@ -296,13 +295,13 @@ Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::stri
    record.mutable_af_summary_item()->set_type(cta::admin::ArchiveFileSummaryItem::LISTPENDINGRETRIEVES);
 
    // Tapepool
-   record.mutable_af_summary_item()->set_tapepool(tapepool);
+   record.mutable_af_summary_item()->set_vid(vid);
 
    // Summary statistics
    record.mutable_af_summary_item()->set_total_files(total_files);
    record.mutable_af_summary_item()->set_total_size(total_size);
 
-   return record;
+   return streambuf->Push(record);
 }
 
 template<>
-- 
GitLab