From 6e951d8f277e34dd3a11c8740427d15426ee8a84 Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Thu, 6 Jun 2019 11:48:55 +0200 Subject: [PATCH] Makes TapeLsStream class a subclass of XrdCtaStream --- xroot_plugins/XrdCtaTapeLs.hpp | 228 ++++++++++++---------- xroot_plugins/XrdSsiCtaRequestMessage.cpp | 170 +--------------- 2 files changed, 129 insertions(+), 269 deletions(-) diff --git a/xroot_plugins/XrdCtaTapeLs.hpp b/xroot_plugins/XrdCtaTapeLs.hpp index e56c0c6cdb..a60b201b2d 100644 --- a/xroot_plugins/XrdCtaTapeLs.hpp +++ b/xroot_plugins/XrdCtaTapeLs.hpp @@ -1,26 +1,25 @@ -/** - * The CERN Tape Archive (CTA) project - * Copyright © 2018 CERN - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. +/*! + * @project The CERN Tape Archive (CTA) + * @brief CTA Tape Ls stream implementation + * @copyright Copyright 2019 CERN + * @license This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#pragma once -#include <XrdSsiPbOStreamBuffer.hpp> -#include <catalogue/Catalogue.hpp> -#include <xrootd/private/XrdSsi/XrdSsiStream.hh> +#pragma once +#include <xroot_plugins/XrdCtaStream.hpp> +#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp> namespace cta { namespace xrd { @@ -28,102 +27,115 @@ namespace cta { namespace xrd { /*! * Stream object which implements "ta ls" command. */ -class TapeLsStream: public XrdSsiStream{ +class TapeLsStream: public XrdCtaStream{ public: - TapeLsStream(cta::catalogue::Catalogue& catalogue, const cta::catalogue::TapeSearchCriteria& searchCriteria): - XrdSsiStream(XrdSsiStream::isActive),m_catalogue(catalogue),m_searchCriteria(searchCriteria),m_tapeList(catalogue.getTapes(searchCriteria)) - { - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "TapeLsStream() constructor"); + /*! + * Constructor + * + * @param[in] requestMsg RequestMessage containing command-line arguments + * @param[in] catalogue CTA Catalogue + * @param[in] scheduler CTA Scheduler + */ + TapeLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler); + +private: + /*! + * Can we close the stream? + */ + virtual bool isDone() const { + return m_tapeList.empty(); } + + /*! + * Fill the buffer + */ + virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf); - virtual ~TapeLsStream(){ - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG,LOG_SUFFIX,"~TapeLsStream() destructor"); + std::list<common::dataStructures::Tape> m_tapeList; + + static constexpr const char * const LOG_SUFFIX = "TapeLsStream"; +}; + + +TapeLsStream::TapeLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler) : + XrdCtaStream(catalogue, scheduler) +{ + using namespace cta::admin; + + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "TapeLsStream() constructor"); + + cta::catalogue::TapeSearchCriteria searchCriteria; + + if(!requestMsg.has_flag(OptionBoolean::ALL)) { + bool has_any = false; // set to true if at least one optional option is set + + // Get the search criteria from the optional options + + searchCriteria.disabled = requestMsg.getOptional(OptionBoolean::DISABLED, &has_any); + searchCriteria.full = requestMsg.getOptional(OptionBoolean::FULL, &has_any); + searchCriteria.capacityInBytes = requestMsg.getOptional(OptionUInt64::CAPACITY, &has_any); + searchCriteria.logicalLibrary = requestMsg.getOptional(OptionString::LOGICAL_LIBRARY, &has_any); + searchCriteria.tapePool = requestMsg.getOptional(OptionString::TAPE_POOL, &has_any); + searchCriteria.vo = requestMsg.getOptional(OptionString::VO, &has_any); + searchCriteria.vid = requestMsg.getOptional(OptionString::VID, &has_any); + searchCriteria.mediaType = requestMsg.getOptional(OptionString::MEDIA_TYPE, &has_any); + searchCriteria.vendor = requestMsg.getOptional(OptionString::VENDOR, &has_any); + + if(!has_any) { + throw cta::exception::UserError("Must specify at least one search option, or --all"); + } } - - virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override { - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); - XrdSsiPb::OStreamBuffer<Data> *streambuf; + m_tapeList = m_catalogue.getTapes(searchCriteria); +} + + +int TapeLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { + for(bool is_buffer_full = false; !m_tapeList.empty() && !is_buffer_full; m_tapeList.pop_front()) { + Data record; + auto &tape = m_tapeList.front(); + auto tape_item = record.mutable_tals_item(); - try { - if(m_tapeList.empty()) { - // Nothing more to send, close the stream - last = true; - return nullptr; - } - - streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); - - for(bool is_buffer_full = false; !m_tapeList.empty() && !is_buffer_full; m_tapeList.pop_front()){ - Data record; - auto &tape = m_tapeList.front(); - auto tape_item = record.mutable_tals_item(); - - tape_item->set_vid(tape.vid); - tape_item->set_media_type(tape.mediaType); - tape_item->set_vendor(tape.vendor); - tape_item->set_logical_library(tape.logicalLibraryName); - tape_item->set_tapepool(tape.tapePoolName); - tape_item->set_vo(tape.vo); - tape_item->set_encryption_key((bool)tape.encryptionKey ? tape.encryptionKey.value() : "-"); - tape_item->set_capacity(tape.capacityInBytes); - tape_item->set_occupancy(tape.dataOnTapeInBytes); - tape_item->set_last_fseq(tape.lastFSeq); - tape_item->set_full(tape.full); - tape_item->set_disabled(tape.disabled); - if(tape.labelLog) { - ::cta::common::TapeLog * labelLog = tape_item->mutable_label_log(); - labelLog->set_drive(tape.labelLog.value().drive); - labelLog->set_time(tape.labelLog.value().time); - } - if(tape.lastWriteLog){ - ::cta::common::TapeLog * lastWriteLog = tape_item->mutable_last_written_log(); - lastWriteLog->set_drive(tape.lastWriteLog.value().drive); - lastWriteLog->set_time(tape.lastWriteLog.value().time); - } - if(tape.lastReadLog){ - ::cta::common::TapeLog * lastReadLog = tape_item->mutable_last_read_log(); - lastReadLog->set_drive(tape.lastReadLog.value().drive); - lastReadLog->set_time(tape.lastReadLog.value().time); - } - ::cta::common::EntryLog * creationLog = tape_item->mutable_creation_log(); - creationLog->set_username(tape.creationLog.username); - creationLog->set_host(tape.creationLog.host); - creationLog->set_time(tape.creationLog.time); - ::cta::common::EntryLog * lastModificationLog = tape_item->mutable_last_modification_log(); - lastModificationLog->set_username(tape.lastModificationLog.username); - lastModificationLog->set_host(tape.lastModificationLog.host); - lastModificationLog->set_time(tape.lastModificationLog.time); - - is_buffer_full = streambuf->Push(record); - } - dlen = streambuf->Size(); - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); - } catch(cta::exception::Exception &ex) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); - eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; - } catch(std::exception &ex) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: " << ex.what(); - eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; - } catch(...) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; - eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; + tape_item->set_vid(tape.vid); + tape_item->set_media_type(tape.mediaType); + tape_item->set_vendor(tape.vendor); + tape_item->set_logical_library(tape.logicalLibraryName); + tape_item->set_tapepool(tape.tapePoolName); + tape_item->set_vo(tape.vo); + tape_item->set_encryption_key((bool)tape.encryptionKey ? tape.encryptionKey.value() : "-"); + tape_item->set_capacity(tape.capacityInBytes); + tape_item->set_occupancy(tape.dataOnTapeInBytes); + tape_item->set_last_fseq(tape.lastFSeq); + tape_item->set_full(tape.full); + tape_item->set_disabled(tape.disabled); + + if(tape.labelLog) { + ::cta::common::TapeLog * labelLog = tape_item->mutable_label_log(); + labelLog->set_drive(tape.labelLog.value().drive); + labelLog->set_time(tape.labelLog.value().time); + } + if(tape.lastWriteLog){ + ::cta::common::TapeLog * lastWriteLog = tape_item->mutable_last_written_log(); + lastWriteLog->set_drive(tape.lastWriteLog.value().drive); + lastWriteLog->set_time(tape.lastWriteLog.value().time); } - return streambuf; + if(tape.lastReadLog){ + ::cta::common::TapeLog * lastReadLog = tape_item->mutable_last_read_log(); + lastReadLog->set_drive(tape.lastReadLog.value().drive); + lastReadLog->set_time(tape.lastReadLog.value().time); + } + ::cta::common::EntryLog * creationLog = tape_item->mutable_creation_log(); + creationLog->set_username(tape.creationLog.username); + creationLog->set_host(tape.creationLog.host); + creationLog->set_time(tape.creationLog.time); + ::cta::common::EntryLog * lastModificationLog = tape_item->mutable_last_modification_log(); + lastModificationLog->set_username(tape.lastModificationLog.username); + lastModificationLog->set_host(tape.lastModificationLog.host); + lastModificationLog->set_time(tape.lastModificationLog.time); + + is_buffer_full = streambuf->Push(record); } - -private: - cta::catalogue::Catalogue &m_catalogue; - const cta::catalogue::TapeSearchCriteria m_searchCriteria; - std::list<common::dataStructures::Tape> m_tapeList; - static constexpr const char * const LOG_SUFFIX = "TapeLsStream"; -}; - -}} + return streambuf->Size(); +} +}} diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index dd353ef4ac..242079947e 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -238,8 +238,7 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons processTape_Reclaim(request.admincmd(), response); break; case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LS): - //processTape_Ls(request.admincmd(), response); - processTape_Ls(request.admincmd(),response, stream); + processTape_Ls(request.admincmd(),response, stream); break; case cmd_pair(AdminCmd::CMD_TAPE, AdminCmd::SUBCMD_LABEL): processTape_Label(request.admincmd(), response); @@ -1776,172 +1775,21 @@ void RequestMessage::processTape_Reclaim(const cta::admin::AdminCmd &admincmd, c response.set_type(cta::xrd::Response::RSP_SUCCESS); } + void RequestMessage::processTape_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream) { using namespace cta::admin; - - cta::catalogue::TapeSearchCriteria searchCriteria; - - if(!has_flag(OptionBoolean::ALL)) - { - bool has_any = false; // set to true if at least one optional option is set - - // Get the search criteria from the optional options - - searchCriteria.disabled = getOptional(OptionBoolean::DISABLED, &has_any); - searchCriteria.full = getOptional(OptionBoolean::FULL, &has_any); - searchCriteria.capacityInBytes = getOptional(OptionUInt64::CAPACITY, &has_any); - searchCriteria.logicalLibrary = getOptional(OptionString::LOGICAL_LIBRARY, &has_any); - searchCriteria.tapePool = getOptional(OptionString::TAPE_POOL, &has_any); - searchCriteria.vo = getOptional(OptionString::VO, &has_any); - searchCriteria.vid = getOptional(OptionString::VID, &has_any); - searchCriteria.mediaType = getOptional(OptionString::MEDIA_TYPE, &has_any); - searchCriteria.vendor = getOptional(OptionString::VENDOR, &has_any); - - if(!has_any) { - throw cta::exception::UserError("Must specify at least one search option, or --all"); - } - } - // Create a XrdSsi stream object to return the results - stream = new TapeLsStream(m_catalogue, searchCriteria); + // Create a XrdSsi stream object to return the results + stream = new TapeLsStream(*this, m_catalogue, m_scheduler); - // Should the client display column headers? - if(has_flag(OptionBoolean::SHOW_HEADER)) { - response.set_show_header(HeaderType::TAPE_LS); - } + // Should the client display column headers? + if(has_flag(OptionBoolean::SHOW_HEADER)) { + response.set_show_header(HeaderType::TAPE_LS); + } - response.set_type(cta::xrd::Response::RSP_SUCCESS); + response.set_type(cta::xrd::Response::RSP_SUCCESS); } -/*void RequestMessage::processTape_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response) -{ - using namespace cta::admin; - - std::stringstream cmdlineOutput; - - cta::catalogue::TapeSearchCriteria searchCriteria; - - if(!has_flag(OptionBoolean::ALL)) - { - bool has_any = false; // set to true if at least one optional option is set - - // Get the search criteria from the optional options - - searchCriteria.disabled = getOptional(OptionBoolean::DISABLED, &has_any); - searchCriteria.full = getOptional(OptionBoolean::FULL, &has_any); - searchCriteria.capacityInBytes = getOptional(OptionUInt64::CAPACITY, &has_any); - searchCriteria.logicalLibrary = getOptional(OptionString::LOGICAL_LIBRARY, &has_any); - searchCriteria.tapePool = getOptional(OptionString::TAPE_POOL, &has_any); - searchCriteria.vo = getOptional(OptionString::VO, &has_any); - searchCriteria.vid = getOptional(OptionString::VID, &has_any); - searchCriteria.mediaType = getOptional(OptionString::MEDIA_TYPE, &has_any); - searchCriteria.vendor = getOptional(OptionString::VENDOR, &has_any); - - if(!has_any) { - throw cta::exception::UserError("Must specify at least one search option, or --all"); - } - } - - std::list<cta::common::dataStructures::Tape> list= m_catalogue.getTapes(searchCriteria); - - TapeLsItem tapeList; - if(!list.empty()) - { - for(auto it = list.cbegin(); it != list.cend(); it++) { - ::cta::common::Tape * tapeToAdd = tapeList.add_tapes(); - tapeToAdd->set_vid(it->vid); - tapeToAdd->set_media_type(it->mediaType); - tapeToAdd->set_vendor(it->vendor); - tapeToAdd->set_logical_library(it->logicalLibraryName); - tapeToAdd->set_tapepool(it->tapePoolName); - tapeToAdd->set_vo(it->vo); - tapeToAdd->set_encryption_key((bool)it->encryptionKey ? it->encryptionKey.value() : "-"); - tapeToAdd->set_capacity(it->capacityInBytes); - tapeToAdd->set_occupancy(it->dataOnTapeInBytes); - tapeToAdd->set_last_fseq(it->lastFSeq); - tapeToAdd->set_full(it->full); - tapeToAdd->set_disabled(it->disabled); - if(it->labelLog) { - ::cta::common::TapeLog * labelLog = tapeToAdd->mutable_label_log(); - labelLog->set_drive(it->labelLog.value().drive); - labelLog->set_time(it->labelLog.value().time); - } - if(it->lastWriteLog){ - ::cta::common::TapeLog * lastWriteLog = tapeToAdd->mutable_last_written_log(); - lastWriteLog->set_drive(it->lastWriteLog.value().drive); - lastWriteLog->set_time(it->lastWriteLog.value().time); - } - if(it->lastReadLog){ - ::cta::common::TapeLog * lastReadLog = tapeToAdd->mutable_last_read_log(); - lastReadLog->set_drive(it->lastReadLog.value().drive); - lastReadLog->set_time(it->lastReadLog.value().time); - } - ::cta::common::EntryLog * creationLog = tapeToAdd->mutable_creation_log(); - creationLog->set_username(it->creationLog.username); - creationLog->set_host(it->creationLog.host); - creationLog->set_time(it->creationLog.time); - ::cta::common::EntryLog * lastModificationLog = tapeToAdd->mutable_last_modification_log(); - lastModificationLog->set_username(it->lastModificationLog.username); - lastModificationLog->set_host(it->lastModificationLog.host); - lastModificationLog->set_time(it->lastModificationLog.time); - } - std::vector<std::vector<std::string>> responseTable; - std::vector<std::string> header = { - "vid","media type","vendor","logical library","tapepool","vo","encryption key","capacity","occupancy", - "last fseq","full","disabled","label drive","label time","last w drive","last w time", - "last r drive","last r time","c.user","c.host","c.time","m.user","m.host","m.time","comment" - }; - if(has_flag(OptionBoolean::SHOW_HEADER)) responseTable.push_back(header); - for(auto it = list.cbegin(); it != list.cend(); it++) { - std::vector<std::string> currentRow; - currentRow.push_back(it->vid); - currentRow.push_back(it->mediaType); - currentRow.push_back(it->vendor); - currentRow.push_back(it->logicalLibraryName); - currentRow.push_back(it->tapePoolName); - currentRow.push_back(it->vo); - currentRow.push_back((bool)it->encryptionKey ? it->encryptionKey.value() : "-"); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->capacityInBytes))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->dataOnTapeInBytes))); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->lastFSeq))); - if(it->full) currentRow.push_back("true"); else currentRow.push_back("false"); - if(it->disabled) currentRow.push_back("true"); else currentRow.push_back("false"); - - if(it->labelLog) { - currentRow.push_back(it->labelLog.value().drive); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->labelLog.value().time))); - } else { - currentRow.push_back("-"); - currentRow.push_back("-"); - } - - if(it->lastWriteLog) { - currentRow.push_back(it->lastWriteLog.value().drive); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->lastWriteLog.value().time))); - } else { - currentRow.push_back("-"); - currentRow.push_back("-"); - } - - if(it->lastReadLog) { - currentRow.push_back(it->lastReadLog.value().drive); - currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->lastReadLog.value().time))); - } else { - currentRow.push_back("-"); - currentRow.push_back("-"); - } - - addLogInfoToResponseRow(currentRow, it->creationLog, it->lastModificationLog); - currentRow.push_back(it->comment); - responseTable.push_back(currentRow); - } - cmdlineOutput << formatResponse(responseTable); - } - - response.set_message_txt(cmdlineOutput.str()); - response.set_type(cta::xrd::Response::RSP_SUCCESS); -}*/ - void RequestMessage::processTape_Label(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response) -- GitLab