Commit 1b5c82aa authored by Michael Davis's avatar Michael Davis
Browse files

[cta-admin] Changes "tapepool ls" command to streaming version

parent af875230
......@@ -222,7 +222,7 @@ RmcdClientCmd::RmcdClientCmd(int argc, char *const *const argv) :
// Obtain a Service Provider
std::string resource("/rmc_test");
m_rmc_test_service_ptr = make_unique<XrdSsiPbServiceType>(m_endpoint, resource, config);
m_rmc_test_service_ptr = std::make_unique<XrdSsiPbServiceType>(m_endpoint, resource, config);
}
void RmcdClientCmd::processMount(int argc, char *const *const argv) {
......
......@@ -26,8 +26,6 @@
#include "CtaAdminCmd.hpp"
// Define XRootD SSI Alert message callback
namespace XrdSsiPb {
......@@ -68,6 +66,7 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const
case Data::kLpaSummary: std::cout << Log::DumpProtobuf(&record.lpa_summary()); break;
case Data::kLprItem: std::cout << Log::DumpProtobuf(&record.lpr_item()); break;
case Data::kLprSummary: std::cout << Log::DumpProtobuf(&record.lpr_summary()); break;
case Data::kTplsItem: std::cout << Log::DumpProtobuf(&record.tpls_item()); break;
default:
throw std::runtime_error("Received invalid stream data from CTA Frontend.");
}
......@@ -80,6 +79,7 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const
case Data::kLpaSummary: CtaAdminCmd::print(record.lpa_summary()); break;
case Data::kLprItem: CtaAdminCmd::print(record.lpr_item()); break;
case Data::kLprSummary: CtaAdminCmd::print(record.lpr_summary()); break;
case Data::kTplsItem: CtaAdminCmd::print(record.tpls_item()); break;
default:
throw std::runtime_error("Received invalid stream data from CTA Frontend.");
}
......@@ -219,6 +219,7 @@ void CtaAdminCmd::send() const
case HeaderType::LISTPENDINGARCHIVES_SUMMARY: printLpaSummaryHeader(); break;
case HeaderType::LISTPENDINGRETRIEVES: printLprHeader(); break;
case HeaderType::LISTPENDINGRETRIEVES_SUMMARY: printLprSummaryHeader(); break;
case HeaderType::TAPEPOOL_LS: printTpLsHeader(); break;
case HeaderType::NONE:
default: break;
}
......@@ -539,6 +540,57 @@ void CtaAdminCmd::print(const cta::admin::ListPendingRetrievesSummary &lpr_summa
<< std::endl;
}
void CtaAdminCmd::printTpLsHeader()
{
std::cout << TEXT_RED
<< std::setfill(' ') << std::setw(18) << std::right << "name" << ' '
<< std::setfill(' ') << std::setw(10) << std::right << "vo" << ' '
<< std::setfill(' ') << std::setw(7) << std::right << "#tapes" << ' '
<< std::setfill(' ') << std::setw(9) << std::right << "#partial" << ' '
<< std::setfill(' ') << std::setw(12) << std::right << "#phys files" << ' '
<< std::setfill(' ') << std::setw(5) << std::right << "size" << ' '
<< std::setfill(' ') << std::setw(5) << std::right << "used" << ' '
<< std::setfill(' ') << std::setw(6) << std::right << "avail" << ' '
<< std::setfill(' ') << std::setw(6) << std::right << "use%" << ' '
<< std::setfill(' ') << std::setw(8) << std::right << "encrypt" << ' '
<< std::setfill(' ') << std::setw(8) << std::right << "c.user" << ' '
<< std::setfill(' ') << std::setw(25) << std::right << "c.host" << ' '
<< std::setfill(' ') << std::setw(24) << std::right << "c.time" << ' '
<< std::setfill(' ') << std::setw(8) << std::right << "m.user" << ' '
<< std::setfill(' ') << std::setw(25) << std::right << "m.host" << ' '
<< std::setfill(' ') << std::setw(24) << std::right << "m.time" << ' '
<< "comment" << ' '
<< TEXT_NORMAL << std::endl;
}
void CtaAdminCmd::print(const cta::admin::TapePoolLsItem &tpls_item)
{
std::string encrypt_str = tpls_item.encrypt() ? "true" : "false";
uint64_t avail = tpls_item.capacity_bytes() > tpls_item.data_bytes() ?
tpls_item.capacity_bytes()-tpls_item.data_bytes() : 0;
double use_percent = tpls_item.capacity_bytes() > 0 ?
(static_cast<double>(tpls_item.data_bytes())/static_cast<double>(tpls_item.capacity_bytes()))*100.0 : 0.0;
std::cout << std::setfill(' ') << std::setw(18) << std::right << tpls_item.name() << ' '
<< std::setfill(' ') << std::setw(10) << std::right << tpls_item.vo() << ' '
<< std::setfill(' ') << std::setw(7) << std::right << tpls_item.num_tapes() << ' '
<< std::setfill(' ') << std::setw(9) << std::right << tpls_item.num_partial_tapes() << ' '
<< std::setfill(' ') << std::setw(12) << std::right << tpls_item.num_physical_files() << ' '
<< std::setfill(' ') << std::setw(4) << std::right << tpls_item.capacity_bytes() / 1000000000 << "G "
<< std::setfill(' ') << std::setw(4) << std::right << tpls_item.data_bytes() / 1000000000 << "G "
<< std::setfill(' ') << std::setw(5) << std::right << avail / 1000000000 << "G "
<< std::setfill(' ') << std::setw(5) << std::right << std::fixed << std::setprecision(1) << use_percent << "% "
<< std::setfill(' ') << std::setw(8) << std::right << encrypt_str << ' '
<< std::setfill(' ') << std::setw(8) << std::right << tpls_item.created().username() << ' '
<< std::setfill(' ') << std::setw(25) << std::right << tpls_item.created().host() << ' '
<< std::setfill(' ') << std::setw(24) << std::right << timeToString(tpls_item.created().time()) << ' '
<< std::setfill(' ') << std::setw(8) << std::right << tpls_item.modified().username() << ' '
<< std::setfill(' ') << std::setw(25) << std::right << tpls_item.modified().host() << ' '
<< std::setfill(' ') << std::setw(24) << std::right << timeToString(tpls_item.modified().time()) << ' '
<< tpls_item.comment()
<< std::endl;
}
}} // namespace cta::admin
......
......@@ -39,6 +39,13 @@ public:
is_first_record = false;
return c;
}
// Static method to convert time to string
static std::string timeToString(const time_t &time)
{
std::string timeString(ctime(&time));
timeString.resize(timeString.size()-1); //remove newline
return timeString;
}
// Output headers
static void printAfLsHeader();
......@@ -47,6 +54,7 @@ public:
static void printLpaSummaryHeader();
static void printLprHeader();
static void printLprSummaryHeader();
static void printTpLsHeader();
// Output records
static void print(const ArchiveFileLsItem &afls_item);
......@@ -55,6 +63,7 @@ public:
static void print(const ListPendingArchivesSummary &lpa_summary);
static void print(const ListPendingRetrievesItem &lpr_item);
static void print(const ListPendingRetrievesSummary &lpr_summary);
static void print(const TapePoolLsItem &tpls_item);
private:
//! Parse the options for a specific command/subcommand
......
/*!
* @project The CERN Tape Archive (CTA)
* @brief CTA Frontend Tape Pool Ls stream implementation
* @copyright Copyright 2018 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <XrdSsiPbOStreamBuffer.hpp>
#include <catalogue/Catalogue.hpp>
namespace cta { namespace xrd {
/*!
* Stream object which implements "tapepool ls" command.
*/
class TapePoolLsStream : public XrdSsiStream
{
public:
TapePoolLsStream(cta::catalogue::Catalogue &catalogue) :
XrdSsiStream(XrdSsiStream::isActive),
m_tapePoolList(catalogue.getTapePools())
{
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "TapePoolLsStream() constructor");
}
virtual ~TapePoolLsStream() {
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "~TapePoolLsStream() destructor");
}
/*!
* Synchronously obtain data from an active stream
*
* Active streams can only exist on the server-side. This XRootD SSI Stream class is marked as an
* active stream in the constructor.
*
* @param[out] eInfo The object to receive any error description.
* @param[in,out] dlen input: the optimal amount of data wanted (this is a hint)
* output: the actual amount of data returned in the buffer.
* @param[in,out] last input: should be set to false.
* output: if true it indicates that no more data remains to be returned
* either for this call or on the next call.
*
* @return Pointer to the Buffer object that contains a pointer to the the data (see below). The
* buffer must be returned to the stream using Buffer::Recycle(). The next member is usable.
* @retval 0 No more data remains or an error occurred:
* last = true: No more data remains.
* last = false: A fatal error occurred, eRef has the reason.
*/
virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override {
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)");
XrdSsiPb::OStreamBuffer<Data> *streambuf;
try {
if(m_tapePoolList.empty()) {
// Nothing more to send, close the stream
last = true;
return nullptr;
}
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
for(bool is_buffer_full = false; !m_tapePoolList.empty() && !is_buffer_full; m_tapePoolList.pop_front())
{
Data record;
// TapePool
auto &tp = m_tapePoolList.front();
auto tp_item = record.mutable_tpls_item();
tp_item->set_name(tp.name);
tp_item->set_vo(tp.vo);
tp_item->set_num_tapes(tp.nbTapes);
tp_item->set_num_partial_tapes(tp.nbPartialTapes);
tp_item->set_num_physical_files(tp.nbPhysicalFiles);
tp_item->set_capacity_bytes(tp.capacityBytes);
tp_item->set_data_bytes(tp.dataBytes);
tp_item->set_encrypt(tp.encryption);
tp_item->mutable_created()->set_username(tp.creationLog.username);
tp_item->mutable_created()->set_host(tp.creationLog.host);
tp_item->mutable_created()->set_time(tp.creationLog.time);
tp_item->mutable_modified()->set_username(tp.lastModificationLog.username);
tp_item->mutable_modified()->set_host(tp.lastModificationLog.host);
tp_item->mutable_modified()->set_time(tp.lastModificationLog.time);
tp_item->set_comment(tp.comment);
// is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
// enough data to send to the client. The actual buffer size is double the block size,
// so we can keep writing a few additional records after is_buffer_full is true. These
// will be sent on the next iteration. If we exceed the hard limit of double the block
// size, Push() will throw an exception.
is_buffer_full = streambuf->Push(record);
}
dlen = streambuf->Size();
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
} catch(cta::exception::Exception &ex) {
throw std::runtime_error(ex.getMessage().str());
} catch(std::exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
} catch(...) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
}
return streambuf;
}
private:
std::list<cta::catalogue::TapePool> m_tapePoolList; //!< List of tape pools from the catalogue
static constexpr const char* const LOG_SUFFIX = "TapePoolLsStream"; //!< Identifier for log messages
};
}} // namespace cta::xrd
......@@ -27,6 +27,7 @@ using XrdSsiPb::PbException;
#include <cmdline/CtaAdminCmdParse.hpp>
#include "XrdCtaArchiveFileLs.hpp"
#include "XrdCtaListPendingQueue.hpp"
#include "XrdCtaTapePoolLs.hpp"
#include "XrdSsiCtaRequestMessage.hpp"
......@@ -250,7 +251,7 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons
processTapePool_Rm(request.admincmd(), response);
break;
case cmd_pair(AdminCmd::CMD_TAPEPOOL, AdminCmd::SUBCMD_LS):
processTapePool_Ls(request.admincmd(), response);
processTapePool_Ls(request.admincmd(), response, stream);
break;
case cmd_pair(AdminCmd::CMD_TEST, AdminCmd::SUBCMD_READ):
processTest_Read(request.admincmd(), response);
......@@ -1974,49 +1975,16 @@ void RequestMessage::processTapePool_Rm(const cta::admin::AdminCmd &admincmd, ct
void RequestMessage::processTapePool_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response)
void RequestMessage::processTapePool_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream)
{
using namespace cta::admin;
std::stringstream cmdlineOutput;
const std::list<cta::catalogue::TapePool> tp_list= m_catalogue.getTapePools();
if(!tp_list.empty())
{
const std::vector<std::string> header = {
"name","vo","# tapes","# partial","# phys files","size","used","avail","use%","encrypt",
"c.user","c.host","c.time","m.user","m.host","m.time","comment"
};
std::vector<std::vector<std::string>> responseTable;
if(has_flag(OptionBoolean::SHOW_HEADER)) responseTable.push_back(header);
for(auto &tp : tp_list) {
std::vector<std::string> currentRow;
uint64_t avail = tp.capacityBytes > tp.dataBytes ? tp.capacityBytes - tp.dataBytes : 0;
double use_d = (static_cast<double>(tp.dataBytes) / static_cast<double>(tp.capacityBytes)) * 100.0;
std::ostringstream use;
use << std::fixed << std::setprecision(1) <<(use_d < 0.0 ? 0.0 : use_d) << '%';
currentRow.push_back(tp.name);
currentRow.push_back(tp.vo);
currentRow.push_back(std::to_string(tp.nbTapes));
currentRow.push_back(std::to_string(tp.nbPartialTapes));
currentRow.push_back(std::to_string(tp.nbPhysicalFiles));
currentRow.push_back(std::to_string(tp.capacityBytes/1000000000) + "G");
currentRow.push_back(std::to_string(tp.dataBytes/1000000000) + "G");
currentRow.push_back(std::to_string(avail/1000000000) + "G");
currentRow.push_back(use.str());
currentRow.push_back(tp.encryption ? "true" : "false");
addLogInfoToResponseRow(currentRow, tp.creationLog, tp.lastModificationLog);
currentRow.push_back(tp.comment);
// Create a XrdSsi stream object to return the results
stream = new TapePoolLsStream(m_catalogue);
responseTable.push_back(currentRow);
}
cmdlineOutput << formatResponse(responseTable);
}
// Should the client display column headers?
if(has_flag(OptionBoolean::SHOW_HEADER)) response.set_show_header(HeaderType::TAPEPOOL_LS);
response.set_message_txt(cmdlineOutput.str());
response.set_type(cta::xrd::Response::RSP_SUCCESS);
}
......
......@@ -126,7 +126,6 @@ private:
admincmd_t processTapePool_Add;
admincmd_t processTapePool_Ch;
admincmd_t processTapePool_Rm;
admincmd_t processTapePool_Ls;
admincmd_t processTest_Read;
admincmd_t processTest_Write;
admincmd_t processTest_WriteAuto;
......@@ -148,6 +147,7 @@ private:
admincmdstream_t processArchiveFile_Ls;
admincmdstream_t processListPendingArchives;
admincmdstream_t processListPendingRetrieves;
admincmdstream_t processTapePool_Ls;
/*!
* Log an admin command
......
Subproject commit 7490d7393303a43673a8116f1d1e94bfff35c670
Subproject commit 976a179b7440930083f089464d53bf2598e891be
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment