From 1fe14e02bf298505a89436d58c951e7120cf0500 Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Fri, 21 Jun 2019 17:00:27 +0200 Subject: [PATCH] [cta-admin] Implements streaming "cta-admin groupmountrule ls" --- cmdline/CtaAdminCmd.cpp | 3 + cmdline/CtaAdminTextFormatter.cpp | 31 ++++++++ cmdline/CtaAdminTextFormatter.hpp | 2 + xroot_plugins/XrdCtaGroupMountRuleLs.hpp | 92 +++++++++++++++++++++++ xroot_plugins/XrdSsiCtaRequestMessage.cpp | 35 ++------- xroot_plugins/XrdSsiCtaRequestMessage.hpp | 2 +- xrootd-ssi-protobuf-interface | 2 +- 7 files changed, 137 insertions(+), 30 deletions(-) create mode 100644 xroot_plugins/XrdCtaGroupMountRuleLs.hpp diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp index f259e89e97..4d9d977316 100644 --- a/cmdline/CtaAdminCmd.cpp +++ b/cmdline/CtaAdminCmd.cpp @@ -78,6 +78,7 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const case Data::kDrlsItem: std::cout << Log::DumpProtobuf(&record.drls_item()); break; case Data::kFrlsItem: std::cout << Log::DumpProtobuf(&record.frls_item()); break; case Data::kFrlsSummary: std::cout << Log::DumpProtobuf(&record.frls_summary()); break; + case Data::kGmrlsItem: std::cout << Log::DumpProtobuf(&record.gmrls_item()); break; case Data::kLpaItem: std::cout << Log::DumpProtobuf(&record.lpa_item()); break; case Data::kLpaSummary: std::cout << Log::DumpProtobuf(&record.lpa_summary()); break; case Data::kLprItem: std::cout << Log::DumpProtobuf(&record.lpr_item()); break; @@ -98,6 +99,7 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const case Data::kDrlsItem: formattedText.print(record.drls_item()); break; case Data::kFrlsItem: formattedText.print(record.frls_item()); break; case Data::kFrlsSummary: formattedText.print(record.frls_summary()); break; + case Data::kGmrlsItem: formattedText.print(record.gmrls_item()); break; case Data::kLpaItem: formattedText.print(record.lpa_item()); break; case Data::kLpaSummary: formattedText.print(record.lpa_summary()); break; case Data::kLprItem: formattedText.print(record.lpr_item()); break; @@ -245,6 +247,7 @@ void CtaAdminCmd::send() const case HeaderType::DRIVE_LS: formattedText.printDriveLsHeader(); break; case HeaderType::FAILEDREQUEST_LS: formattedText.printFailedRequestLsHeader(); break; case HeaderType::FAILEDREQUEST_LS_SUMMARY: formattedText.printFailedRequestLsSummaryHeader(); break; + case HeaderType::GROUPMOUNTRULE_LS: formattedText.printGroupMountRuleLsHeader(); break; case HeaderType::LISTPENDINGARCHIVES: formattedText.printListPendingArchivesHeader(); break; case HeaderType::LISTPENDINGARCHIVES_SUMMARY: formattedText.printListPendingArchivesSummaryHeader(); break; case HeaderType::LISTPENDINGRETRIEVES: formattedText.printListPendingRetrievesHeader(); break; diff --git a/cmdline/CtaAdminTextFormatter.cpp b/cmdline/CtaAdminTextFormatter.cpp index 7aa306b75f..f009a87c0d 100644 --- a/cmdline/CtaAdminTextFormatter.cpp +++ b/cmdline/CtaAdminTextFormatter.cpp @@ -371,6 +371,37 @@ void TextFormatter::print(const FailedRequestLsSummary &frls_summary) { ); } +void TextFormatter::printGroupMountRuleLsHeader() { + push_back("HEADER"); + push_back( + "instance", + "group", + "policy", + "c.user", + "c.host", + "c.time", + "m.user", + "m.host", + "m.time", + "comment" + ); +} + +void TextFormatter::print(const GroupMountRuleLsItem &gmrls_item) { + push_back( + gmrls_item.disk_instance(), + gmrls_item.group_mount_rule(), + gmrls_item.mount_policy(), + gmrls_item.creation_log().username(), + gmrls_item.creation_log().host(), + timeToStr(gmrls_item.creation_log().time()), + gmrls_item.last_modification_log().username(), + gmrls_item.last_modification_log().host(), + timeToStr(gmrls_item.last_modification_log().time()), + gmrls_item.comment() + ); +} + void TextFormatter::printListPendingArchivesHeader() { push_back("HEADER"); push_back( diff --git a/cmdline/CtaAdminTextFormatter.hpp b/cmdline/CtaAdminTextFormatter.hpp index 60f6720c3e..2edbeeb6cb 100644 --- a/cmdline/CtaAdminTextFormatter.hpp +++ b/cmdline/CtaAdminTextFormatter.hpp @@ -51,6 +51,7 @@ public: void printDriveLsHeader(); void printFailedRequestLsHeader(); void printFailedRequestLsSummaryHeader(); + void printGroupMountRuleLsHeader(); void printListPendingArchivesHeader(); void printListPendingArchivesSummaryHeader(); void printListPendingRetrievesHeader(); @@ -67,6 +68,7 @@ public: void print(const DriveLsItem &drls_item); void print(const FailedRequestLsItem &frls_item); void print(const FailedRequestLsSummary &frls_summary); + void print(const GroupMountRuleLsItem &gmrls_item); void print(const ListPendingArchivesItem &lpa_item); void print(const ListPendingArchivesSummary &lpa_summary); void print(const ListPendingRetrievesItem &lpr_item); diff --git a/xroot_plugins/XrdCtaGroupMountRuleLs.hpp b/xroot_plugins/XrdCtaGroupMountRuleLs.hpp new file mode 100644 index 0000000000..e4861d91a2 --- /dev/null +++ b/xroot_plugins/XrdCtaGroupMountRuleLs.hpp @@ -0,0 +1,92 @@ +/*! + * @project The CERN Tape Archive (CTA) + * @brief CTA Archive Route Ls stream implementation + * @copyright Copyright 2019 CERN + * @license This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <xroot_plugins/XrdCtaStream.hpp> +#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp> + + +namespace cta { namespace xrd { + +/*! + * Stream object which implements "tapepool ls" command + */ +class GroupMountRuleLsStream: public XrdCtaStream{ +public: + /*! + * Constructor + * + * @param[in] requestMsg RequestMessage containing command-line arguments + * @param[in] catalogue CTA Catalogue + * @param[in] scheduler CTA Scheduler + */ + GroupMountRuleLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler); + +private: + /*! + * Can we close the stream? + */ + virtual bool isDone() const { + return m_groupMountRuleList.empty(); + } + + /*! + * Fill the buffer + */ + virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf); + + std::list<cta::common::dataStructures::RequesterGroupMountRule> m_groupMountRuleList; //!< List of archive routes from the catalogue + + static constexpr const char* const LOG_SUFFIX = "GroupMountRuleLsStream"; //!< Identifier for log messages +}; + + +GroupMountRuleLsStream::GroupMountRuleLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler) : + XrdCtaStream(catalogue, scheduler), + m_groupMountRuleList(catalogue.getRequesterGroupMountRules()) +{ + using namespace cta::admin; + + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GroupMountRuleLsStream() constructor"); +} + +int GroupMountRuleLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { + for(bool is_buffer_full = false; !m_groupMountRuleList.empty() && !is_buffer_full; m_groupMountRuleList.pop_front()) { + Data record; + + auto &gmr = m_groupMountRuleList.front(); + auto gmr_item = record.mutable_gmrls_item(); + + gmr_item->set_disk_instance(gmr.diskInstance); + gmr_item->set_group_mount_rule(gmr.name); + gmr_item->set_mount_policy(gmr.mountPolicy); + gmr_item->mutable_creation_log()->set_username(gmr.creationLog.username); + gmr_item->mutable_creation_log()->set_host(gmr.creationLog.host); + gmr_item->mutable_creation_log()->set_time(gmr.creationLog.time); + gmr_item->mutable_last_modification_log()->set_username(gmr.lastModificationLog.username); + gmr_item->mutable_last_modification_log()->set_host(gmr.lastModificationLog.host); + gmr_item->mutable_last_modification_log()->set_time(gmr.lastModificationLog.time); + gmr_item->set_comment(gmr.comment); + + is_buffer_full = streambuf->Push(record); + } + return streambuf->Size(); +} + +}} // namespace cta::xrd diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 30cf387e34..49bf6d7965 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -31,6 +31,7 @@ using XrdSsiPb::PbException; #include "XrdCtaArchiveRouteLs.hpp" #include "XrdCtaDriveLs.hpp" #include "XrdCtaFailedRequestLs.hpp" +#include "XrdCtaGroupMountRuleLs.hpp" #include "XrdCtaListPendingQueue.hpp" #include "XrdCtaRepackLs.hpp" #include "XrdCtaTapeLs.hpp" @@ -156,7 +157,7 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons processGroupMountRule_Rm(response); break; case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS): - processGroupMountRule_Ls(response); + processGroupMountRule_Ls(response, stream); break; case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE): processListPendingArchives(response, stream); @@ -923,36 +924,14 @@ void RequestMessage::processGroupMountRule_Rm(cta::xrd::Response &response) -void RequestMessage::processGroupMountRule_Ls(cta::xrd::Response &response) +void RequestMessage::processGroupMountRule_Ls(cta::xrd::Response &response, XrdSsiStream* &stream) { - using namespace cta::admin; - - std::stringstream cmdlineOutput; - - std::list<cta::common::dataStructures::RequesterGroupMountRule> list = m_catalogue.getRequesterGroupMountRules(); + using namespace cta::admin; - if(!list.empty()) - { - std::vector<std::vector<std::string>> responseTable; - std::vector<std::string> header = { - "instance","group","policy","c.user","c.host","c.time","m.user","m.host","m.time","comment" - }; - if(has_flag(OptionBoolean::SHOW_HEADER)) responseTable.push_back(header); - for(auto it = list.cbegin(); it != list.cend(); it++) - { - std::vector<std::string> currentRow; - currentRow.push_back(it->diskInstance); - currentRow.push_back(it->name); - currentRow.push_back(it->mountPolicy); - addLogInfoToResponseRow(currentRow, it->creationLog, it->lastModificationLog); - currentRow.push_back(it->comment); - responseTable.push_back(currentRow); - } - cmdlineOutput << formatResponse(responseTable); - } + stream = new GroupMountRuleLsStream(*this, m_catalogue, m_scheduler); - response.set_message_txt(cmdlineOutput.str()); - response.set_type(cta::xrd::Response::RSP_SUCCESS); + response.set_show_header(HeaderType::GROUPMOUNTRULE_LS); + response.set_type(cta::xrd::Response::RSP_SUCCESS); } diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.hpp b/xroot_plugins/XrdSsiCtaRequestMessage.hpp index b681664322..744e6a07f6 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.hpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.hpp @@ -162,7 +162,6 @@ private: void processGroupMountRule_Add (cta::xrd::Response &response); void processGroupMountRule_Ch (cta::xrd::Response &response); void processGroupMountRule_Rm (cta::xrd::Response &response); - void processGroupMountRule_Ls (cta::xrd::Response &response); void processLogicalLibrary_Add (cta::xrd::Response &response); void processLogicalLibrary_Ch (cta::xrd::Response &response); void processLogicalLibrary_Rm (cta::xrd::Response &response); @@ -206,6 +205,7 @@ private: admincmdstream_t processArchiveRoute_Ls; admincmdstream_t processDrive_Ls; admincmdstream_t processFailedRequest_Ls; + admincmdstream_t processGroupMountRule_Ls; admincmdstream_t processListPendingArchives; admincmdstream_t processListPendingRetrieves; admincmdstream_t processTapePool_Ls; diff --git a/xrootd-ssi-protobuf-interface b/xrootd-ssi-protobuf-interface index 7c6e0d5e1b..9eb104df34 160000 --- a/xrootd-ssi-protobuf-interface +++ b/xrootd-ssi-protobuf-interface @@ -1 +1 @@ -Subproject commit 7c6e0d5e1b666e34212847d96c5ca52e9af3fc7c +Subproject commit 9eb104df34892f5d7a6a251faae6399dd1f7f5c1 -- GitLab