From cc024a4410ce4b4cd1629d6053c7514d1aee8432 Mon Sep 17 00:00:00 2001
From: Michael Davis <michael.davis@cern.ch>
Date: Thu, 17 May 2018 14:58:07 +0200
Subject: [PATCH] [lpa-stream] Updates cta-admin command to use streaming lpr

---
 cmdline/CtaAdminCmd.cpp                  | 36 +++++++++++++++++-------
 xroot_plugins/XrdCtaListPendingQueue.hpp |  2 +-
 2 files changed, 27 insertions(+), 11 deletions(-)

diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp
index 06188c09b9..63bb07a036 100644
--- a/cmdline/CtaAdminCmd.cpp
+++ b/cmdline/CtaAdminCmd.cpp
@@ -61,8 +61,8 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const
       case Data::kAfItemFieldNumber : switch(record.af_item().type())
       {
          case ArchiveFileItem::ARCHIVEFILE_LS:          CtaAdminCmd::printAfLsItem(record.af_item()); break;
-         case ArchiveFileItem::LISTPENDINGARCHIVES:     CtaAdminCmd::printLpaItem(record.af_item()); break;
-         case ArchiveFileItem::LISTPENDINGRETRIEVES:
+         case ArchiveFileItem::LISTPENDINGARCHIVES:     CtaAdminCmd::printLpaItem (record.af_item()); break;
+         case ArchiveFileItem::LISTPENDINGRETRIEVES:    CtaAdminCmd::printLprItem (record.af_item()); break;
          default:
             throw std::runtime_error("Not implemented/received invalid stream data from CTA Frontend.");
       }
@@ -71,7 +71,7 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const
       case Data::kAfSummaryItemFieldNumber : switch(record.af_summary_item().type())
       {
          case ArchiveFileSummaryItem::LISTPENDINGARCHIVES:    CtaAdminCmd::printLpaSummaryItem(record.af_summary_item()); break;
-         case ArchiveFileSummaryItem::LISTPENDINGRETRIEVES:
+         case ArchiveFileSummaryItem::LISTPENDINGRETRIEVES:   CtaAdminCmd::printLprSummaryItem(record.af_summary_item()); break;
          default:
             throw std::runtime_error("Not implemented/received invalid stream data from CTA Frontend.");
       }
@@ -195,17 +195,19 @@ void CtaAdminCmd::send() const
          std::cout << response.message_txt();
          // Print streaming response header
          switch(response.show_header()) {
-            case HeaderType::ARCHIVEFILE_LS:              printAfLsHeader(); break;
-            case HeaderType::LISTPENDINGARCHIVES:         printLpaHeader(); break;
-            case HeaderType::LISTPENDINGARCHIVES_SUMMARY: printLpaSummaryHeader(); break;
+            case HeaderType::ARCHIVEFILE_LS:               printAfLsHeader(); break;
+            case HeaderType::LISTPENDINGARCHIVES:          printLpaHeader(); break;
+            case HeaderType::LISTPENDINGARCHIVES_SUMMARY:  printLpaSummaryHeader(); break;
+            case HeaderType::LISTPENDINGRETRIEVES:         printLprHeader(); break;
+            case HeaderType::LISTPENDINGRETRIEVES_SUMMARY: printLprSummaryHeader(); break;
             case HeaderType::NONE:
-            default:                                      break;
+            default:                                       break;
          }
          break;
-      case Response::RSP_ERR_PROTOBUF:                    throw XrdSsiPb::PbException(response.message_txt());
+      case Response::RSP_ERR_PROTOBUF:                     throw XrdSsiPb::PbException(response.message_txt());
       case Response::RSP_ERR_USER:
-      case Response::RSP_ERR_CTA:                         throw std::runtime_error(response.message_txt());
-      default:                                            throw XrdSsiPb::PbException("Invalid response type.");
+      case Response::RSP_ERR_CTA:                          throw std::runtime_error(response.message_txt());
+      default:                                             throw XrdSsiPb::PbException("Invalid response type.");
    }
 
    // If there is a Data/Stream payload, wait until it has been processed before exiting
@@ -471,6 +473,16 @@ void CtaAdminCmd::printLprHeader()
 
 void CtaAdminCmd::printLprItem(const cta::admin::ArchiveFileItem &af_item)
 {
+   std::cout << std::setfill(' ') << std::setw(13) << std::right << af_item.vid()             << ' '
+             << std::setfill(' ') << std::setw(7)  << std::right << af_item.af().archive_id() << ' '
+             << std::setfill(' ') << std::setw(7)  << std::right << af_item.copy_nb()         << ' '
+             << std::setfill(' ') << std::setw(7)  << std::right << "fseq"                    << ' '
+             << std::setfill(' ') << std::setw(9)  << std::right << "block id"                << ' '
+             << std::setfill(' ') << std::setw(12) << std::right << af_item.af().size()       << ' '
+             << std::setfill(' ') << std::setw(8)  << std::right << af_item.af().df().owner() << ' '
+             << std::setfill(' ') << std::setw(8)  << std::right << af_item.af().df().group() << ' '
+             <<                                                     af_item.af().df().path()
+             << std::endl;
 }
 
 void CtaAdminCmd::printLprSummaryHeader()
@@ -484,6 +496,10 @@ void CtaAdminCmd::printLprSummaryHeader()
 
 void CtaAdminCmd::printLprSummaryItem(const cta::admin::ArchiveFileSummaryItem &af_summary_item)
 {
+   std::cout << std::setfill(' ') << std::setw(13) << std::right << af_summary_item.vid()         << ' '
+             << std::setfill(' ') << std::setw(13) << std::right << af_summary_item.total_files() << ' '
+             << std::setfill(' ') << std::setw(12) << std::right << af_summary_item.total_size()  << ' '
+             << std::endl;
 }
 
 }} // namespace cta::admin
diff --git a/xroot_plugins/XrdCtaListPendingQueue.hpp b/xroot_plugins/XrdCtaListPendingQueue.hpp
index 24acebff76..53fdacec75 100644
--- a/xroot_plugins/XrdCtaListPendingQueue.hpp
+++ b/xroot_plugins/XrdCtaListPendingQueue.hpp
@@ -242,7 +242,7 @@ Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::stri
    Data record;
 
    // Response type
-   record.mutable_af_item()->set_type(cta::admin::ArchiveFileItem::LISTPENDINGARCHIVES);
+   record.mutable_af_item()->set_type(cta::admin::ArchiveFileItem::LISTPENDINGRETRIEVES);
 
    // Tapepool
    record.mutable_af_item()->set_vid(vid);
-- 
GitLab