From 60f7f6570660b8d593c9e1edb1687ac4c86a1df8 Mon Sep 17 00:00:00 2001
From: Michael Davis <michael.davis@cern.ch>
Date: Tue, 15 May 2018 11:20:22 +0200
Subject: [PATCH] [lpa-stream] Packs protocol buffer with reply

---
 xroot_plugins/XrdCtaListPendingQueue.hpp | 133 ++++++++++++++---------
 1 file changed, 79 insertions(+), 54 deletions(-)

diff --git a/xroot_plugins/XrdCtaListPendingQueue.hpp b/xroot_plugins/XrdCtaListPendingQueue.hpp
index 41133e3e07..dcc647c1a5 100644
--- a/xroot_plugins/XrdCtaListPendingQueue.hpp
+++ b/xroot_plugins/XrdCtaListPendingQueue.hpp
@@ -19,7 +19,7 @@
 #pragma once
 
 #include <XrdSsiPbOStreamBuffer.hpp>
-#include <catalogue/Catalogue.hpp>
+#include <scheduler/OStoreDB/OStoreDB.hpp>
 
 
 
@@ -66,7 +66,7 @@ public:
    virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) {
       XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)");
 
-      XrdSsiPb::OStreamBuffer<cta::xrd::Data> *streambuf;
+      XrdSsiPb::OStreamBuffer<Data> *streambuf;
 
       try {
          if(m_queueItor.end()) {
@@ -75,7 +75,7 @@ public:
             return nullptr;
          }
 
-         streambuf = new XrdSsiPb::OStreamBuffer<cta::xrd::Data>(dlen);
+         streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
 
          for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; ++m_queueItor)
          {
@@ -83,8 +83,81 @@ public:
 
             if(!job.first) continue;
 
-            cta::xrd::Data record = fillRecord(job.second);
+            Data record = fillRecord(m_queueItor.qid(), job.second);
 
+            // is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
+            // enough data to send to the client. The actual buffer size is double the block size,
+            // so we can keep writing a few additional records after is_buffer_full is true. These
+            // will be sent on the next iteration. If we exceed the hard limit of double the block
+            // size, Push() will throw an exception.
+            is_buffer_full = streambuf->Push(record);
+         }
+         dlen = streambuf->Size();
+         XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
+      } catch(exception::Exception &ex) {
+         throw std::runtime_error(ex.getMessage().str());
+      } catch(std::exception &ex) {
+         std::ostringstream errMsg;
+         errMsg << __FUNCTION__ << " failed: " << ex.what();
+         eInfo.Set(errMsg.str().c_str(), ECANCELED);
+         delete streambuf;
+      } catch(...) {
+         std::ostringstream errMsg;
+         errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
+         eInfo.Set(errMsg.str().c_str(), ECANCELED);
+         delete streambuf;
+      }
+      return streambuf;
+   }
+
+private:
+   bool        m_isExtended;
+   QueueItor_t m_queueItor;
+
+   typedef decltype(m_queueItor.getJob().second) data_t;                   //!< Infer data type from template type
+
+   Data fillRecord(const std::string &tape_id, const data_t &job);         //!< Convert data to protobuf
+
+   static constexpr const char* const LOG_SUFFIX  = "ListPendingQueue";    //!< Identifier for log messages
+};
+
+
+
+// Template specialisations for Archive and Retrieve Queue types
+
+template<>
+Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool, const common::dataStructures::ArchiveJob &job)
+{
+   Data record;
+
+   // Tapepool
+   record.mutable_af_ls_item()->set_tapepool(tapepool);
+
+   // Copy number
+   record.mutable_af_ls_item()->set_copy_nb(job.copyNumber);
+
+   // Archive file
+   auto af = record.mutable_af_ls_item()->mutable_af();
+   af->set_archive_id(job.archiveFileID);
+   af->set_disk_instance(job.instanceName);
+   af->set_disk_id(job.request.diskFileID);
+   af->set_size(job.request.fileSize);
+   af->mutable_cs()->set_type(job.request.checksumType);
+   af->mutable_cs()->set_value(job.request.checksumValue);         
+   af->set_storage_class(job.request.storageClass);
+   af->mutable_df()->set_owner(job.request.requester.name);
+   af->mutable_df()->set_group(job.request.requester.group);
+   af->mutable_df()->set_path(job.request.diskFileInfo.path);
+
+#if 0
+   if(m_isExtended)
+   {
+
+   } else {
+      auto lpa = record.mutable_lpa_summary();
+      lpa->set_
+   }
+#endif
 #if 0
                // Copy number
                record.mutable_af_ls_item()->set_copy_nb(jt->first);
@@ -109,46 +182,9 @@ public:
                tf->set_f_seq(jt->second.fSeq);
                tf->set_block_id(jt->second.blockId);
 #endif
-
-               // is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
-               // enough data to send to the client. The actual buffer size is double the block size,
-               // so we can keep writing a few additional records after is_buffer_full is true. These
-               // will be sent on the next iteration. If we exceed the hard limit of double the block
-               // size, Push() will throw an exception.
-               is_buffer_full = streambuf->Push(record);
-         }
-         dlen = streambuf->Size();
-         XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
-      } catch(cta::exception::Exception &ex) {
-         throw std::runtime_error(ex.getMessage().str());
-      } catch(std::exception &ex) {
-         std::ostringstream errMsg;
-         errMsg << __FUNCTION__ << " failed: " << ex.what();
-         eInfo.Set(errMsg.str().c_str(), ECANCELED);
-         delete streambuf;
-      } catch(...) {
-         std::ostringstream errMsg;
-         errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
-         eInfo.Set(errMsg.str().c_str(), ECANCELED);
-         delete streambuf;
-      }
-      return streambuf;
-   }
-
 #if 0
    std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob>> result;
 
-   if(tapepool) {
-      std::list<cta::common::dataStructures::ArchiveJob> list = m_scheduler.getPendingArchiveJobs(tapepool.value(), m_lc);
-      if(!list.empty()) result[tapepool.value()] = list;
-   } else {
-     result = m_scheduler.getPendingArchiveJobs(m_lc);
-   }
-
-   if(!result.empty())
-   {
-      std::vector<std::vector<std::string>> responseTable;
-
       if(has_flag(OptionBoolean::EXTENDED))
       {
          for(auto it = result.cbegin(); it != result.cend(); it++) {
@@ -183,20 +219,9 @@ public:
             responseTable.push_back(currentRow);
          }
       }
-
-      cmdlineOutput << formatResponse(responseTable);
-   }
 #endif
 
-
-private:
-   cta::xrd::Data fillRecord(const cta::common::dataStructures::ArchiveJob &job);
-   cta::xrd::Data fillRecord(const cta::common::dataStructures::RetrieveJob &job);
-
-   bool        m_isExtended;
-   QueueItor_t m_queueItor;
-
-   static constexpr const char* const LOG_SUFFIX  = "ListPendingQueue";    //!< Identifier for log messages
-};
+   return record;
+}
 
 }} // namespace cta::xrd
-- 
GitLab