From 5ca0812a173402be153b96a51f57592966de834e Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Thu, 6 Jun 2019 13:11:12 +0200 Subject: [PATCH] Makes TapePoolLsStream class a subclass of XrdCtaStream --- xroot_plugins/XrdCtaArchiveFileLs.hpp | 2 +- xroot_plugins/XrdCtaFailedRequestLs.hpp | 10 +- xroot_plugins/XrdCtaTapeLs.hpp | 8 +- xroot_plugins/XrdCtaTapePoolLs.hpp | 163 ++++++++-------------- xroot_plugins/XrdSsiCtaRequestMessage.cpp | 12 +- 5 files changed, 78 insertions(+), 117 deletions(-) diff --git a/xroot_plugins/XrdCtaArchiveFileLs.hpp b/xroot_plugins/XrdCtaArchiveFileLs.hpp index dccc2cdd8d..addab744d9 100644 --- a/xroot_plugins/XrdCtaArchiveFileLs.hpp +++ b/xroot_plugins/XrdCtaArchiveFileLs.hpp @@ -25,7 +25,7 @@ namespace cta { namespace xrd { /*! - * Stream object which implements "af ls" command. + * Stream object which implements "archivefile ls" command. */ class ArchiveFileLsStream : public XrdCtaStream { diff --git a/xroot_plugins/XrdCtaFailedRequestLs.hpp b/xroot_plugins/XrdCtaFailedRequestLs.hpp index d9b2a6063d..20d0228ee4 100644 --- a/xroot_plugins/XrdCtaFailedRequestLs.hpp +++ b/xroot_plugins/XrdCtaFailedRequestLs.hpp @@ -33,11 +33,11 @@ public: /*! * Constructor * - * @param[in] requestMsg RequestMessage containing command-line arguments - * @param[in] catalogue CTA Catalogue - * @param[in] scheduler CTA Scheduler - * @param[in] schedDb CTA ObjectStore - * @param[in] lc CTA Log Context + * @param[in] requestMsg RequestMessage containing command-line arguments + * @param[in] catalogue CTA Catalogue + * @param[in] scheduler CTA Scheduler + * @param[in] schedDb CTA ObjectStore + * @param[in] lc CTA Log Context */ FailedRequestLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler, OStoreDB &schedDb, log::LogContext &lc); diff --git a/xroot_plugins/XrdCtaTapeLs.hpp b/xroot_plugins/XrdCtaTapeLs.hpp index a60b201b2d..1fd378df91 100644 --- a/xroot_plugins/XrdCtaTapeLs.hpp +++ b/xroot_plugins/XrdCtaTapeLs.hpp @@ -25,16 +25,16 @@ namespace cta { namespace xrd { /*! - * Stream object which implements "ta ls" command. + * Stream object which implements "tape ls" command */ class TapeLsStream: public XrdCtaStream{ public: /*! * Constructor * - * @param[in] requestMsg RequestMessage containing command-line arguments - * @param[in] catalogue CTA Catalogue - * @param[in] scheduler CTA Scheduler + * @param[in] requestMsg RequestMessage containing command-line arguments + * @param[in] catalogue CTA Catalogue + * @param[in] scheduler CTA Scheduler */ TapeLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler); diff --git a/xroot_plugins/XrdCtaTapePoolLs.hpp b/xroot_plugins/XrdCtaTapePoolLs.hpp index 595d9ae96d..4cd527fd2f 100644 --- a/xroot_plugins/XrdCtaTapePoolLs.hpp +++ b/xroot_plugins/XrdCtaTapePoolLs.hpp @@ -1,7 +1,7 @@ /*! * @project The CERN Tape Archive (CTA) - * @brief CTA Frontend Tape Pool Ls stream implementation - * @copyright Copyright 2018 CERN + * @brief CTA TapePool Ls stream implementation + * @copyright Copyright 2019 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or @@ -18,121 +18,82 @@ #pragma once -#include <XrdSsiPbOStreamBuffer.hpp> -#include <catalogue/Catalogue.hpp> - +#include <xroot_plugins/XrdCtaStream.hpp> +#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp> namespace cta { namespace xrd { /*! - * Stream object which implements "tapepool ls" command. + * Stream object which implements "tapepool ls" command */ -class TapePoolLsStream : public XrdSsiStream -{ +class TapePoolLsStream: public XrdCtaStream{ public: - TapePoolLsStream(cta::catalogue::Catalogue &catalogue) : - XrdSsiStream(XrdSsiStream::isActive), - m_tapePoolList(catalogue.getTapePools()) - { - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "TapePoolLsStream() constructor"); - } + /*! + * Constructor + * + * @param[in] requestMsg RequestMessage containing command-line arguments + * @param[in] catalogue CTA Catalogue + * @param[in] scheduler CTA Scheduler + */ + TapePoolLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler); - virtual ~TapePoolLsStream() { - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "~TapePoolLsStream() destructor"); +private: + /*! + * Can we close the stream? + */ + virtual bool isDone() const { + return m_tapePoolList.empty(); } /*! - * Synchronously obtain data from an active stream - * - * Active streams can only exist on the server-side. This XRootD SSI Stream class is marked as an - * active stream in the constructor. - * - * @param[out] eInfo The object to receive any error description. - * @param[in,out] dlen input: the optimal amount of data wanted (this is a hint) - * output: the actual amount of data returned in the buffer. - * @param[in,out] last input: should be set to false. - * output: if true it indicates that no more data remains to be returned - * either for this call or on the next call. - * - * @return Pointer to the Buffer object that contains a pointer to the the data (see below). The - * buffer must be returned to the stream using Buffer::Recycle(). The next member is usable. - * @retval 0 No more data remains or an error occurred: - * last = true: No more data remains. - * last = false: A fatal error occurred, eRef has the reason. + * Fill the buffer */ - virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override { - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); - - XrdSsiPb::OStreamBuffer<Data> *streambuf; - - try { - if(m_tapePoolList.empty()) { - // Nothing more to send, close the stream - last = true; - return nullptr; - } - - streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); - - for(bool is_buffer_full = false; !m_tapePoolList.empty() && !is_buffer_full; m_tapePoolList.pop_front()) - { - Data record; - - // TapePool - auto &tp = m_tapePoolList.front(); - auto tp_item = record.mutable_tpls_item(); - - tp_item->set_name(tp.name); - tp_item->set_vo(tp.vo); - tp_item->set_num_tapes(tp.nbTapes); - tp_item->set_num_partial_tapes(tp.nbPartialTapes); - tp_item->set_num_physical_files(tp.nbPhysicalFiles); - tp_item->set_capacity_bytes(tp.capacityBytes); - tp_item->set_data_bytes(tp.dataBytes); - tp_item->set_encrypt(tp.encryption); - tp_item->set_supply(tp.supply ? tp.supply.value() : ""); - tp_item->mutable_created()->set_username(tp.creationLog.username); - tp_item->mutable_created()->set_host(tp.creationLog.host); - tp_item->mutable_created()->set_time(tp.creationLog.time); - tp_item->mutable_modified()->set_username(tp.lastModificationLog.username); - tp_item->mutable_modified()->set_host(tp.lastModificationLog.host); - tp_item->mutable_modified()->set_time(tp.lastModificationLog.time); - tp_item->set_comment(tp.comment); - - // is_buffer_full is set to true when we have one full block of data in the buffer, i.e. - // enough data to send to the client. The actual buffer size is double the block size, - // so we can keep writing a few additional records after is_buffer_full is true. These - // will be sent on the next iteration. If we exceed the hard limit of double the block - // size, Push() will throw an exception. - is_buffer_full = streambuf->Push(record); - } - - dlen = streambuf->Size(); - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); - } catch(cta::exception::Exception &ex) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); - eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; - } catch(std::exception &ex) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: " << ex.what(); - eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; - } catch(...) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; - eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; - } - return streambuf; - } + virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf); -private: std::list<cta::catalogue::TapePool> m_tapePoolList; //!< List of tape pools from the catalogue static constexpr const char* const LOG_SUFFIX = "TapePoolLsStream"; //!< Identifier for log messages }; + +TapePoolLsStream::TapePoolLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler) : + XrdCtaStream(catalogue, scheduler), + m_tapePoolList(catalogue.getTapePools()) +{ + using namespace cta::admin; + + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "TapePoolLsStream() constructor"); +} + +int TapePoolLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { + for(bool is_buffer_full = false; !m_tapePoolList.empty() && !is_buffer_full; m_tapePoolList.pop_front()) { + Data record; + + // TapePool + auto &tp = m_tapePoolList.front(); + auto tp_item = record.mutable_tpls_item(); + + tp_item->set_name(tp.name); + tp_item->set_vo(tp.vo); + tp_item->set_num_tapes(tp.nbTapes); + tp_item->set_num_partial_tapes(tp.nbPartialTapes); + tp_item->set_num_physical_files(tp.nbPhysicalFiles); + tp_item->set_capacity_bytes(tp.capacityBytes); + tp_item->set_data_bytes(tp.dataBytes); + tp_item->set_encrypt(tp.encryption); + tp_item->set_supply(tp.supply ? tp.supply.value() : ""); + tp_item->mutable_created()->set_username(tp.creationLog.username); + tp_item->mutable_created()->set_host(tp.creationLog.host); + tp_item->mutable_created()->set_time(tp.creationLog.time); + tp_item->mutable_modified()->set_username(tp.lastModificationLog.username); + tp_item->mutable_modified()->set_host(tp.lastModificationLog.host); + tp_item->mutable_modified()->set_time(tp.lastModificationLog.time); + tp_item->set_comment(tp.comment); + + is_buffer_full = streambuf->Push(record); + } + return streambuf->Size(); +} + }} // namespace cta::xrd diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 242079947e..fa2729517d 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -1871,15 +1871,15 @@ void RequestMessage::processTapePool_Rm(const cta::admin::AdminCmd &admincmd, ct void RequestMessage::processTapePool_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream) { - using namespace cta::admin; + using namespace cta::admin; - // Create a XrdSsi stream object to return the results - stream = new TapePoolLsStream(m_catalogue); + // Create a XrdSsi stream object to return the results + stream = new TapePoolLsStream(*this, m_catalogue, m_scheduler); - // Should the client display column headers? - if(has_flag(OptionBoolean::SHOW_HEADER)) response.set_show_header(HeaderType::TAPEPOOL_LS); + // Should the client display column headers? + if(has_flag(OptionBoolean::SHOW_HEADER)) response.set_show_header(HeaderType::TAPEPOOL_LS); - response.set_type(cta::xrd::Response::RSP_SUCCESS); + response.set_type(cta::xrd::Response::RSP_SUCCESS); } -- GitLab