From b7642e140c3b19065128fafa39f431b338550fbc Mon Sep 17 00:00:00 2001 From: Cedric CAFFY <cedric.caffy@cern.ch> Date: Fri, 24 May 2019 18:13:33 +0200 Subject: [PATCH] Added support for json in cta-admin repack ls --- cmdline/CtaAdminCmd.cpp | 41 ++++++++ cmdline/CtaAdminCmd.hpp | 2 + common/dataStructures/RepackInfo.hpp | 2 + objectstore/RepackRequest.cpp | 2 + xroot_plugins/XrdCtaRepackLs.hpp | 115 ++++++++++++++++++++++ xroot_plugins/XrdSsiCtaRequestMessage.cpp | 58 +++-------- xroot_plugins/XrdSsiCtaRequestMessage.hpp | 3 +- 7 files changed, 176 insertions(+), 47 deletions(-) create mode 100644 xroot_plugins/XrdCtaRepackLs.hpp diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp index 338e2a2508..47fe7a749a 100644 --- a/cmdline/CtaAdminCmd.cpp +++ b/cmdline/CtaAdminCmd.cpp @@ -78,6 +78,7 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const case Data::kLprSummary: std::cout << Log::DumpProtobuf(&record.lpr_summary()); break; case Data::kTplsItem: std::cout << Log::DumpProtobuf(&record.tpls_item()); break; case Data::kTalsItem: std::cout << Log::DumpProtobuf(&record.tals_item()); break; + case Data::kRelsItem: std::cout << Log::DumpProtobuf(&record.rels_item()); break; default: throw std::runtime_error("Received invalid stream data from CTA Frontend."); } @@ -94,6 +95,7 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const case Data::kLprSummary: CtaAdminCmd::print(record.lpr_summary()); break; case Data::kTplsItem: CtaAdminCmd::print(record.tpls_item()); break; case Data::kTalsItem: CtaAdminCmd::print(record.tals_item()); break; + case Data::kRelsItem: CtaAdminCmd::print(record.rels_item()); break; default: throw std::runtime_error("Received invalid stream data from CTA Frontend."); } @@ -237,6 +239,7 @@ void CtaAdminCmd::send() const case HeaderType::LISTPENDINGRETRIEVES_SUMMARY: printLprSummaryHeader(); break; case HeaderType::TAPEPOOL_LS: printTpLsHeader(); break; case HeaderType::TAPE_LS: printTapeLsHeader(); break; + case HeaderType::REPACK_LS: printRepackLsHeader(); break; case HeaderType::NONE: default: break; } @@ -731,6 +734,44 @@ void CtaAdminCmd::print(const cta::admin::TapeLsItem &tals_item){ << std::endl; } +void CtaAdminCmd::printRepackLsHeader(){ + std::cout << TEXT_RED + << std::setfill(' ') << std::setw(7) << std::right << "vid" << ' ' + << std::setfill(' ') << std::setw(50) << std::right << "repackBufferURL" << ' ' + << std::setfill(' ') << std::setw(17) << std::right << "userProvidedFiles" << ' ' + << std::setfill(' ') << std::setw(20) << std::right << "totalFilesToRetrieve" << ' ' + << std::setfill(' ') << std::setw(19) << std::right << "totalBytesToRetrieve" << ' ' + << std::setfill(' ') << std::setw(20) << std::right << "totalFilesToArchive" << ' ' + << std::setfill(' ') << std::setw(19) << std::right << "totalBytesToArchive" << ' ' + << std::setfill(' ') << std::setw(14) << std::right << "retrievedFiles" << ' ' + << std::setfill(' ') << std::setw(13) << std::right << "archivedFiles" << ' ' + << std::setfill(' ') << std::setw(21) << std::right << "failedToRetrieveFiles" << ' ' + << std::setfill(' ') << std::setw(20) << std::right << "failedToRetrieveBytes" << ' ' + << std::setfill(' ') << std::setw(20) << std::right << "failedToArchiveFiles" << ' ' + << std::setfill(' ') << std::setw(20) << std::right << "failedToArchiveBytes" << ' ' + << std::setfill(' ') << std::setw(16) << std::right << "lastExpandedFSeq" << ' ' + << "status" << ' ' + << TEXT_NORMAL << std::endl; +} + +void CtaAdminCmd::print(const cta::admin::RepackLsItem &rels_item){ + std::cout << std::setfill(' ') << std::setw(7) << std::right << rels_item.vid() << ' ' + << std::setfill(' ') << std::setw(50) << std::right << rels_item.repack_buffer_url() << ' ' + << std::setfill(' ') << std::setw(17) << std::right << rels_item.user_provided_files() << ' ' + << std::setfill(' ') << std::setw(20) << std::right << rels_item.total_files_to_retrieve() << ' ' + << std::setfill(' ') << std::setw(19) << std::right << rels_item.total_bytes_to_retrieve() << ' ' + << std::setfill(' ') << std::setw(20) << std::right << rels_item.total_files_to_archive() << ' ' + << std::setfill(' ') << std::setw(19) << std::right << rels_item.total_bytes_to_archive() << ' ' + << std::setfill(' ') << std::setw(14) << std::right << rels_item.retrieved_files() << ' ' + << std::setfill(' ') << std::setw(13) << std::right << rels_item.archived_files() << ' ' + << std::setfill(' ') << std::setw(21) << std::right << rels_item.failed_to_retrieve_files() << ' ' + << std::setfill(' ') << std::setw(20) << std::right << rels_item.failed_to_retrieve_bytes() << ' ' + << std::setfill(' ') << std::setw(20) << std::right << rels_item.failed_to_archive_files() << ' ' + << std::setfill(' ') << std::setw(20) << std::right << rels_item.failed_to_retrieve_bytes() << ' ' + << std::setfill(' ') << std::setw(10) << std::right << rels_item.last_expanded_fseq() << ' ' + << rels_item.status() << std::endl; +} + void CtaAdminCmd::print(const cta::admin::TapePoolLsItem &tpls_item) { std::string encrypt_str = tpls_item.encrypt() ? "true" : "false"; diff --git a/cmdline/CtaAdminCmd.hpp b/cmdline/CtaAdminCmd.hpp index 68e1f285c0..56cbc901b6 100644 --- a/cmdline/CtaAdminCmd.hpp +++ b/cmdline/CtaAdminCmd.hpp @@ -58,6 +58,7 @@ public: static void printLprSummaryHeader(); static void printTpLsHeader(); static void printTapeLsHeader(); + static void printRepackLsHeader(); // Output records static void print(const ArchiveFileLsItem &afls_item); @@ -70,6 +71,7 @@ public: static void print(const ListPendingRetrievesSummary &lpr_summary); static void print(const TapePoolLsItem &tpls_item); static void print(const cta::admin::TapeLsItem &tals_item); + static void print(const cta::admin::RepackLsItem &rels_item); private: //! Parse the options for a specific command/subcommand diff --git a/common/dataStructures/RepackInfo.hpp b/common/dataStructures/RepackInfo.hpp index e94e87246d..03b7666a01 100644 --- a/common/dataStructures/RepackInfo.hpp +++ b/common/dataStructures/RepackInfo.hpp @@ -58,6 +58,8 @@ struct RepackInfo { uint64_t failedBytesToRetrieve; uint64_t lastExpandedFseq; uint64_t userProvidedFiles; + uint64_t retrievedFiles; + uint64_t archivedFiles; bool isExpandFinished; // std::string tag; // uint64_t totalFiles; diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index 0a0bb02505..25759d7211 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -138,6 +138,8 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() { ret.failedBytesToArchive = m_payload.failedtoarchivebytes(); ret.failedFilesToRetrieve = m_payload.failedtoretrievefiles(); ret.failedBytesToRetrieve = m_payload.failedtoretrievebytes(); + ret.archivedFiles = m_payload.archivedfiles(); + ret.retrievedFiles = m_payload.retrievedfiles(); ret.lastExpandedFseq = m_payload.lastexpandedfseq(); ret.userProvidedFiles = m_payload.userprovidedfiles(); ret.isExpandFinished = m_payload.is_expand_finished(); diff --git a/xroot_plugins/XrdCtaRepackLs.hpp b/xroot_plugins/XrdCtaRepackLs.hpp new file mode 100644 index 0000000000..010fdd29b2 --- /dev/null +++ b/xroot_plugins/XrdCtaRepackLs.hpp @@ -0,0 +1,115 @@ +/** + * The CERN Tape Archive (CTA) project + * Copyright © 2018 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <XrdSsiPbOStreamBuffer.hpp> +#include <catalogue/Catalogue.hpp> +#include <xrootd/private/XrdSsi/XrdSsiStream.hh> + + + +namespace cta { namespace xrd { + /*! + * Stream object which implements "repack ls" command. + */ + class RepackLsStream: public XrdSsiStream { + public: + + RepackLsStream(cta::Scheduler& scheduler, const cta::optional<std::string> vid): + XrdSsiStream(XrdSsiStream::isActive),m_scheduler(scheduler), m_vid(vid){ + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "RepackLsStream() constructor"); + if(!vid){ + m_repackList = m_scheduler.getRepacks(); + } else { + m_repackList.push_back(m_scheduler.getRepack(vid.value())); + } + } + + virtual ~RepackLsStream(){ + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG,LOG_SUFFIX,"~RepackLsStream() destructor"); + } + + virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo,int &dlen, bool &last) override { + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); + + XrdSsiPb::OStreamBuffer<Data> *streambuf; + + try { + + if(m_repackList.empty()) { + // Nothing more to send, close the stream + last = true; + return nullptr; + } + + streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); + + for(bool is_buffer_full = false; !m_repackList.empty() && !is_buffer_full; m_repackList.pop_front()){ + Data record; + auto &repackRequest = m_repackList.front(); + auto repackRequestItem = record.mutable_rels_item(); + repackRequestItem->set_vid(repackRequest.vid); + repackRequestItem->set_repack_buffer_url(repackRequest.repackBufferBaseURL); + repackRequestItem->set_user_provided_files(repackRequest.userProvidedFiles); + repackRequestItem->set_total_files_to_retrieve(repackRequest.totalFilesToRetrieve); + repackRequestItem->set_total_bytes_to_retrieve(repackRequest.totalBytesToRetrieve); + repackRequestItem->set_total_files_to_archive(repackRequest.totalFilesToArchive); + repackRequestItem->set_total_bytes_to_archive(repackRequest.totalBytesToArchive); + repackRequestItem->set_retrieved_files(repackRequest.retrievedFiles); + repackRequestItem->set_archived_files(repackRequest.archivedFiles); + repackRequestItem->set_failed_to_retrieve_files(repackRequest.failedFilesToRetrieve); + repackRequestItem->set_failed_to_retrieve_bytes(repackRequest.failedBytesToRetrieve); + repackRequestItem->set_failed_to_archive_files(repackRequest.failedFilesToArchive); + repackRequestItem->set_failed_to_archive_bytes(repackRequest.failedBytesToArchive); + repackRequestItem->set_status(toString(repackRequest.status)); + //Last expanded fSeq is in reality the next FSeq to Expand. So last one is next - 1 + repackRequestItem->set_last_expanded_fseq(repackRequest.lastExpandedFseq != 0 ? repackRequest.lastExpandedFseq - 1 : 0); + is_buffer_full = streambuf->Push(record); + } + + dlen = streambuf->Size(); + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); + + } catch(cta::exception::Exception &ex) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; + } catch(std::exception &ex) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: " << ex.what(); + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; + } catch(...) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; + } + return streambuf; + } + + private: + cta::Scheduler &m_scheduler; + const cta::optional<std::string> m_vid; + std::list<common::dataStructures::RepackInfo> m_repackList; + static constexpr const char * const LOG_SUFFIX = "RepackLsStream"; + }; + +}} \ No newline at end of file diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 4d4acb1e6d..ad212bf440 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -31,7 +31,7 @@ using XrdSsiPb::PbException; #include "XrdCtaTapePoolLs.hpp" #include "XrdSsiCtaRequestMessage.hpp" #include "XrdCtaTapeLs.hpp" - +#include "XrdCtaRepackLs.hpp" namespace cta { @@ -193,7 +193,7 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons processRepack_Rm(request.admincmd(), response); break; case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_LS): - processRepack_Ls(request.admincmd(), response); + processRepack_Ls(request.admincmd(), response, stream); break; case cmd_pair(AdminCmd::CMD_REPACK, AdminCmd::SUBCMD_ERR): processRepack_Err(request.admincmd(), response); @@ -1418,55 +1418,21 @@ void RequestMessage::processRepack_Rm(const cta::admin::AdminCmd &admincmd, cta: -void RequestMessage::processRepack_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response) +void RequestMessage::processRepack_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream) { using namespace cta::admin; + + auto vid = getOptional(OptionString::VID); - std::stringstream cmdlineOutput; - - auto vid = getOptional(OptionString::VID); - - std::list<cta::common::dataStructures::RepackInfo> list; - - if(!vid) { - list = m_scheduler.getRepacks(); - } else { - list.push_back(m_scheduler.getRepack(vid.value())); - } + // Create a XrdSsi stream object to return the results + stream = new RepackLsStream(m_scheduler, vid); - if(!list.empty()) - { - std::vector<std::vector<std::string>> responseTable; - std::vector<std::string> header = { - "vid","file buffer URL","UserProvidedFiles","FilesToRetrieve","BytesToRetrieve","FilesToArchive","BytesToArchive","FailedToRetrieve (files)","FailedToRetrieve (bytes)","FailedToArchive (files)","FailedToArchive (bytes)","LastExpandedFSeq","status"//,"name","host","time" - }; - if(has_flag(OptionBoolean::SHOW_HEADER)) responseTable.push_back(header); - for(auto it = list.cbegin(); it != list.cend(); it++) - { - std::vector<std::string> currentRow; - currentRow.push_back(it->vid); - currentRow.push_back(utils::midEllipsis(it->repackBufferBaseURL,40));//std::to_string(static_cast<unsigned long long>(it->totalFiles))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->userProvidedFiles))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->totalFilesToRetrieve))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->totalBytesToRetrieve))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->totalFilesToArchive)));//std::to_string(static_cast<unsigned long long>(it->totalSize))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->totalBytesToArchive))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->failedFilesToRetrieve))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->failedBytesToRetrieve))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->failedFilesToArchive))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->failedBytesToArchive))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->lastExpandedFseq)));//std::to_string(static_cast<unsigned long long>(it->filesArchived))); - currentRow.push_back(common::dataStructures::toString(it->status)); - //currentRow.push_back("-");//it->creationLog.username); - //currentRow.push_back("-");//it->creationLog.host); - //currentRow.push_back("-");//timeToString(it->creationLog.time)); - responseTable.push_back(currentRow); - } - cmdlineOutput << formatResponse(responseTable); - } + // Should the client display column headers? + if(has_flag(OptionBoolean::SHOW_HEADER)) { + response.set_show_header(HeaderType::REPACK_LS); + } - response.set_message_txt(cmdlineOutput.str()); - response.set_type(cta::xrd::Response::RSP_SUCCESS); + response.set_type(cta::xrd::Response::RSP_SUCCESS); } diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.hpp b/xroot_plugins/XrdSsiCtaRequestMessage.hpp index 43c134fa5b..94d4c7ac4e 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.hpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.hpp @@ -101,7 +101,7 @@ private: void processMountPolicy_Ls (const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response); void processRepack_Add (const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response); void processRepack_Rm (const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response); - void processRepack_Ls (const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response); + //void processRepack_Ls (const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response); void processRepack_Err (const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response); void processRequesterMountRule_Add(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response); void processRequesterMountRule_Ch (const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response); @@ -137,6 +137,7 @@ private: admincmdstream_t processListPendingRetrieves; admincmdstream_t processTapePool_Ls; admincmdstream_t processTape_Ls; + admincmdstream_t processRepack_Ls; /*! * Log an admin command -- GitLab