Commit 5499849a authored by Cedric Caffy's avatar Cedric Caffy
Browse files

[cta-admin] Cleaned cta-admin deprecated methods and remove fields for...

[cta-admin] Cleaned cta-admin deprecated methods and remove fields for cta-admin --json output of the commands that were modified by CTA v4.0

- cta-admin --json sq showqueues does not show maxdrives anymore but read/write max drives
- cta-admin --json showqueues does not display the next mounts, empty tapes, readonly tapes and disabled tapes anymore
- cta-admin --json mountpolicy ls does not display the old maxDrivesAllowed parameter anymore
- cta-admin --json tapefile ls does not display supersededByVid and supersededByFseq anymore
parent 3a1584af
......@@ -34,8 +34,8 @@ struct QueueAndMountSummary {
MountType mountType=MountType::NoMount;
std::string tapePool;
std::string vo;
uint64_t readMaxDrives;
uint64_t writeMaxDrives;
uint64_t readMaxDrives=0;
uint64_t writeMaxDrives=0;
std::string vid;
std::string logicalLibrary;
uint64_t filesQueued=0;
......
/*!
* @project The CERN Tape Archive (CTA)
* @brief CTA Frontend Archive File Ls stream implementation
* @copyright Copyright 2019 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <xroot_plugins/XrdCtaStream.hpp>
#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp>
#include <common/checksum/ChecksumBlobSerDeser.hpp>
namespace cta { namespace xrd {
/*!
* Stream object which implements "archivefile ls" command.
*/
class ArchiveFileLsStream : public XrdCtaStream
{
public:
ArchiveFileLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler);
private:
/*!
* Can we close the stream?
*/
virtual bool isDone() const {
return m_isSummary ? m_isSummaryDone : !m_archiveFileItor.hasMore();
}
/*!
* Fill the buffer
*/
virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf);
cta::catalogue::TapeFileSearchCriteria m_searchCriteria; //!< Search criteria
catalogue::ArchiveFileItor m_archiveFileItor; //!< Iterator across files which have been archived
bool m_isSummary; //!< Full listing or short summary?
bool m_isSummaryDone; //!< Summary has been sent
static constexpr const char* const LOG_SUFFIX = "ArchiveFileLsStream"; //!< Identifier for log messages
};
ArchiveFileLsStream::ArchiveFileLsStream(const RequestMessage &requestMsg,
cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler) :
XrdCtaStream(catalogue, scheduler),
m_isSummary(requestMsg.has_flag(admin::OptionBoolean::SUMMARY)),
m_isSummaryDone(false)
{
using namespace cta::admin;
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "ArchiveFileLsStream() constructor");
if(!requestMsg.has_flag(OptionBoolean::ALL))
{
bool has_any = false; // set to true if at least one optional option is set
// Get the search criteria from the optional options
m_searchCriteria.archiveFileId = requestMsg.getOptional(OptionUInt64::ARCHIVE_FILE_ID, &has_any);
m_searchCriteria.tapeFileCopyNb = requestMsg.getOptional(OptionUInt64::COPY_NUMBER, &has_any);
m_searchCriteria.diskFileId = requestMsg.getOptional(OptionString::DISKID, &has_any);
m_searchCriteria.vid = requestMsg.getOptional(OptionString::VID, &has_any);
m_searchCriteria.tapePool = requestMsg.getOptional(OptionString::TAPE_POOL, &has_any);
m_searchCriteria.diskFileOwnerUid = requestMsg.getOptional(OptionUInt64::OWNER_UID, &has_any);
m_searchCriteria.diskFileGid = requestMsg.getOptional(OptionUInt64::GID, &has_any);
m_searchCriteria.storageClass = requestMsg.getOptional(OptionString::STORAGE_CLASS, &has_any);
m_searchCriteria.diskInstance = requestMsg.getOptional(OptionString::INSTANCE, &has_any);
if(!has_any) {
throw cta::exception::UserError("Must specify at least one search option, or --all");
}
}
if(!m_isSummary) {
m_archiveFileItor = m_catalogue.getArchiveFilesItor(m_searchCriteria);
}
}
int ArchiveFileLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) {
// Special handling for -S (Summary) option
if(m_isSummary) {
common::dataStructures::ArchiveFileSummary summary = m_catalogue.getTapeFileSummary(m_searchCriteria);
Data record;
record.mutable_afls_summary()->set_total_files(summary.totalFiles);
record.mutable_afls_summary()->set_total_size(summary.totalBytes);
streambuf->Push(record);
m_isSummaryDone = true;
return streambuf->Size();
}
for(bool is_buffer_full = false; m_archiveFileItor.hasMore() && !is_buffer_full; )
{
const cta::common::dataStructures::ArchiveFile archiveFile = m_archiveFileItor.next();
for(auto jt = archiveFile.tapeFiles.cbegin(); jt != archiveFile.tapeFiles.cend(); jt++) {
Data record;
// Archive file
auto af = record.mutable_afls_item()->mutable_af();
af->set_archive_id(archiveFile.archiveFileID);
af->set_disk_instance(archiveFile.diskInstance);
af->set_disk_id(archiveFile.diskFileId);
af->set_size(archiveFile.fileSize);
checksum::ChecksumBlobToProtobuf(archiveFile.checksumBlob, *(af->mutable_csb()));
af->set_storage_class(archiveFile.storageClass);
af->mutable_df()->mutable_owner_id()->set_uid(archiveFile.diskFileInfo.owner_uid);
af->mutable_df()->mutable_owner_id()->set_gid(archiveFile.diskFileInfo.gid);
af->mutable_df()->set_path(archiveFile.diskFileInfo.path);
af->set_creation_time(archiveFile.creationTime);
// Tape file
auto tf = record.mutable_afls_item()->mutable_tf();
tf->set_vid(jt->vid);
tf->set_f_seq(jt->fSeq);
tf->set_block_id(jt->blockId);
record.mutable_afls_item()->set_copy_nb(jt->copyNb);
// is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
// enough data to send to the client. The actual buffer size is double the block size,
// so we can keep writing a few additional records after is_buffer_full is true. These
// will be sent on the next iteration. If we exceed the hard limit of double the block
// size, Push() will throw an exception.
is_buffer_full = streambuf->Push(record);
}
}
return streambuf->Size();
}
}} // namespace cta::xrd
/*!
* @project The CERN Tape Archive (CTA)
* @brief CTA Frontend List Pending Archive/List Pending Retrieves stream implementation
* @copyright Copyright 2019 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <xroot_plugins/XrdCtaStream.hpp>
#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp>
#include <common/checksum/ChecksumBlobSerDeser.hpp>
namespace cta { namespace xrd {
/*!
* Stream object which implements "lpa" and "lpr" commands
*/
template<typename QueueItor_t>
class ListPendingQueueStream : public XrdCtaStream
{
public:
ListPendingQueueStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue,
cta::Scheduler &scheduler, OStoreDB &schedDb);
private:
/*!
* Can we close the stream?
*/
virtual bool isDone() const {
return m_queueItor.end();
}
/*!
* Fill the buffer
*/
virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf);
private:
bool m_isExtended; //!< Summary or extended listing?
optional<std::string> m_filter; //!< Tapepool or Vid to filter results
QueueItor_t m_queueItor; //!< Archive/Retrieve Queue Iterator
typedef decltype(*m_queueItor) data_t; //!< Infer data type from template type
uint64_t fileSize(const data_t &job); //!< Obtain file size from queue item
bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, //!< Convert data to protobufs and put on stream
const std::string &tape_id, const data_t &job);
bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, //!< Convert summary to protobufs and put on stream
const std::string &tape_id, const uint64_t &total_files,
const uint64_t &total_size);
static constexpr const char* const LOG_SUFFIX = "ListPendingQueue"; //!< Identifier for log messages
};
template<>
ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>::
ListPendingQueueStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue,
cta::Scheduler &scheduler, OStoreDB &schedDb) :
XrdCtaStream(catalogue, scheduler),
m_isExtended(requestMsg.has_flag(admin::OptionBoolean::EXTENDED)),
m_filter(requestMsg.getOptional(admin::OptionString::TAPE_POOL)),
m_queueItor(schedDb.getArchiveJobItor(m_filter ? m_filter.value() : ""))
{
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "ListPendingQueueStream<Archive>() constructor");
}
template<>
ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>::
ListPendingQueueStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue,
cta::Scheduler &scheduler, OStoreDB &schedDb) :
XrdCtaStream(catalogue, scheduler),
m_isExtended(requestMsg.has_flag(admin::OptionBoolean::EXTENDED)),
m_filter(requestMsg.getOptional(admin::OptionString::VID)),
m_queueItor(schedDb.getRetrieveJobItor(m_filter ? m_filter.value() : ""))
{
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "ListPendingQueueStream<Retrieve>() constructor");
}
template <typename T>
int ListPendingQueueStream<T>::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf)
{
// is_buffer_full is set to true when we have one full block of data in the buffer, i.e. enough data
// to send to the client. The actual buffer size is double the block size, so we can keep writing a
// few additional records after is_buffer_full is true. These will be sent on the next iteration. If
// we exceed the hard limit of double the block size, Push() will throw an exception.
if(m_isExtended) {
// Detailed listing of all queued files
for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; ++m_queueItor) {
is_buffer_full = pushRecord(streambuf, m_queueItor.qid(), *m_queueItor);
}
} else {
// Summary by tapepool or vid
for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; )
{
uint64_t total_files = 0;
uint64_t total_size = 0;
auto qid = m_queueItor.qid();
for(m_queueItor.beginq(); !m_queueItor.endq(); ++m_queueItor) {
++total_files;
total_size += fileSize(*m_queueItor);
}
is_buffer_full = pushRecord(streambuf, qid, total_files, total_size);
}
}
return streambuf->Size();
}
// Template specialisations for Archive Queue types
template<>
uint64_t ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>::fileSize(const data_t &job) {
return job.request.fileSize;
}
template<>
bool ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
const std::string &tapepool, const common::dataStructures::ArchiveJob &job)
{
Data record;
// Tapepool
record.mutable_lpa_item()->set_tapepool(tapepool);
// Copy number
record.mutable_lpa_item()->set_copy_nb(job.copyNumber);
// Archive file
auto af = record.mutable_lpa_item()->mutable_af();
af->set_archive_id(job.archiveFileID);
af->set_disk_instance(job.instanceName);
af->set_disk_id(job.request.diskFileID);
af->set_size(job.request.fileSize);
af->set_storage_class(job.request.storageClass);
af->set_creation_time(job.request.creationLog.time);
af->mutable_df()->mutable_owner_id()->set_uid(job.request.diskFileInfo.owner_uid);
af->mutable_df()->mutable_owner_id()->set_gid(job.request.diskFileInfo.gid);
af->mutable_df()->set_path(job.request.diskFileInfo.path);
// Checksum array
checksum::ChecksumBlobToProtobuf(job.request.checksumBlob, *(af->mutable_csb()));
return streambuf->Push(record);
}
template<>
bool ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
const std::string &tapepool, const uint64_t &total_files, const uint64_t &total_size)
{
Data record;
// Tapepool
record.mutable_lpa_summary()->set_tapepool(tapepool);
// Summary statistics
record.mutable_lpa_summary()->set_total_files(total_files);
record.mutable_lpa_summary()->set_total_size(total_size);
return streambuf->Push(record);
}
// Template specialisations for Retrieve Queue types
template<>
uint64_t ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>::fileSize(const data_t &job) {
return job.fileSize;
}
template<>
bool ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
const std::string &vid, const common::dataStructures::RetrieveJob &job)
{
bool is_buffer_full = false;
// Write one record per tape copy
for(auto tape_it = job.tapeCopies.begin(); tape_it != job.tapeCopies.end(); ++tape_it)
{
// If we are filtering by vid, skip the ones we are not interested in.
//
// Do we actually need to do this or are we guaranteed that the vid field is the same for all
// objects in the same queue?
if(!vid.empty() && vid != tape_it->first) continue;
Data record;
// Copy number
record.mutable_lpr_item()->set_copy_nb(tape_it->second.first);
// Archive file
auto af = record.mutable_lpr_item()->mutable_af();
af->set_archive_id(job.request.archiveFileID);
af->set_size(job.fileSize);
af->set_creation_time(job.request.creationLog.time);
af->mutable_df()->mutable_owner_id()->set_uid(job.request.diskFileInfo.owner_uid);
af->mutable_df()->mutable_owner_id()->set_gid(job.request.diskFileInfo.gid);
af->mutable_df()->set_path(job.request.diskFileInfo.path);
af->set_disk_instance(job.diskInstance);
af->set_disk_id(job.diskFileId);
af->set_storage_class(job.storageClass);
// Tape file
auto tf = record.mutable_lpr_item()->mutable_tf();
tf->set_vid(tape_it->first);
tf->set_f_seq(tape_it->second.second.fSeq);
tf->set_block_id(tape_it->second.second.blockId);
// Checksum array
checksum::ChecksumBlobToProtobuf(tape_it->second.second.checksumBlob, *(af->mutable_csb()));
is_buffer_full = streambuf->Push(record);
}
return is_buffer_full;
}
template<>
bool ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
const std::string &vid, const uint64_t &total_files, const uint64_t &total_size)
{
Data record;
// Tapepool
record.mutable_lpr_summary()->set_vid(vid);
// Summary statistics
record.mutable_lpr_summary()->set_total_files(total_files);
record.mutable_lpr_summary()->set_total_size(total_size);
return streambuf->Push(record);
}
}} // namespace cta::xrd
......@@ -81,14 +81,10 @@ int ShowQueuesStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) {
case common::dataStructures::MountType::ArchiveForUser:
sq_item->set_priority(sq.mountPolicy.archivePriority);
sq_item->set_min_age(sq.mountPolicy.archiveMinRequestAge);
//TODO: max drives per VO
sq_item->set_max_drives(0);
break;
case common::dataStructures::MountType::Retrieve:
sq_item->set_priority(sq.mountPolicy.retrievePriority);
sq_item->set_min_age(sq.mountPolicy.retrieveMinRequestAge);
//TODO: max drives per VO
sq_item->set_max_drives(0);
break;
default:
break;
......
......@@ -27,7 +27,6 @@ using XrdSsiPb::PbException;
#include "XrdCtaDriveLs.hpp"
#include "XrdCtaFailedRequestLs.hpp"
#include "XrdCtaGroupMountRuleLs.hpp"
#include "XrdCtaListPendingQueue.hpp"
#include "XrdCtaLogicalLibraryLs.hpp"
#include "XrdCtaMountPolicyLs.hpp"
#include "XrdCtaMediaTypeLs.hpp"
......@@ -100,11 +99,6 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons
case cmd_pair(AdminCmd::CMD_ADMIN, AdminCmd::SUBCMD_LS):
processAdmin_Ls(response, stream);
break;
#if 0
case cmd_pair(AdminCmd::CMD_ARCHIVEFILE, AdminCmd::SUBCMD_LS):
processArchiveFile_Ls(response, stream);
break;
#endif
case cmd_pair(AdminCmd::CMD_ARCHIVEROUTE, AdminCmd::SUBCMD_ADD):
processArchiveRoute_Add(response);
break;
......@@ -123,9 +117,9 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons
case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_DOWN):
processDrive_Down(response);
break;
case cmd_pair(AdminCmd::CMD_DRIVE,AdminCmd::SUBCMD_CH):
processDrive_Ch(response);
break;
case cmd_pair(AdminCmd::CMD_DRIVE,AdminCmd::SUBCMD_CH):
processDrive_Ch(response);
break;
case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS):
processDrive_Ls(response, stream);
break;
......@@ -150,12 +144,6 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons
case cmd_pair(AdminCmd::CMD_GROUPMOUNTRULE, AdminCmd::SUBCMD_LS):
processGroupMountRule_Ls(response, stream);
break;
case cmd_pair(AdminCmd::CMD_LISTPENDINGARCHIVES, AdminCmd::SUBCMD_NONE):
processListPendingArchives(response, stream);
break;
case cmd_pair(AdminCmd::CMD_LISTPENDINGRETRIEVES, AdminCmd::SUBCMD_NONE):
processListPendingRetrieves(response, stream);
break;
case cmd_pair(AdminCmd::CMD_LOGICALLIBRARY, AdminCmd::SUBCMD_ADD):
processLogicalLibrary_Add(response);
break;
......@@ -871,26 +859,6 @@ void RequestMessage::processAdmin_Ls(cta::xrd::Response &response, XrdSsiStream*
response.set_type(cta::xrd::Response::RSP_SUCCESS);
}
#if 0
void RequestMessage::processArchiveFile_Ls(cta::xrd::Response &response, XrdSsiStream* &stream)
{
using namespace cta::admin;
// Create a XrdSsi stream object to return the results
stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler);
// Set correct column headers
response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY
: HeaderType::ARCHIVEFILE_LS);
response.set_type(cta::xrd::Response::RSP_SUCCESS);
}
#endif
void RequestMessage::processArchiveRoute_Add(cta::xrd::Response &response)
{
using namespace cta::admin;
......@@ -1158,40 +1126,6 @@ void RequestMessage::processGroupMountRule_Ls(cta::xrd::Response &response, XrdS
response.set_type(cta::xrd::Response::RSP_SUCCESS);
}
void RequestMessage::processListPendingArchives(cta::xrd::Response &response, XrdSsiStream* &stream)
{
using namespace cta::admin;
// Create a XrdSsi stream object to return the results
stream = new ListPendingQueueStream<OStoreDB::ArchiveQueueItor_t>(*this, m_catalogue, m_scheduler, m_scheddb);
// Display the correct column headers
response.set_show_header(has_flag(OptionBoolean::EXTENDED) ? HeaderType::LISTPENDINGARCHIVES
: HeaderType::LISTPENDINGARCHIVES_SUMMARY);
response.set_type(cta::xrd::Response::RSP_SUCCESS);
}
void RequestMessage::processListPendingRetrieves(cta::xrd::Response &response, XrdSsiStream* &stream)
{
using namespace cta::admin;
// Create a XrdSsi stream object to return the results
stream = new ListPendingQueueStream<OStoreDB::RetrieveQueueItor_t>(*this, m_catalogue, m_scheduler, m_scheddb);
// Display correct column headers
response.set_show_header(has_flag(OptionBoolean::EXTENDED) ? HeaderType::LISTPENDINGRETRIEVES
: HeaderType::LISTPENDINGRETRIEVES_SUMMARY);
response.set_type(cta::xrd::Response::RSP_SUCCESS);
}
void RequestMessage::processLogicalLibrary_Add(cta::xrd::Response &response)
{
using namespace cta::admin;
......
......@@ -223,8 +223,6 @@ private:
admincmdstream_t processDrive_Ls;
admincmdstream_t processFailedRequest_Ls;
admincmdstream_t processGroupMountRule_Ls;
admincmdstream_t processListPendingArchives;
admincmdstream_t processListPendingRetrieves;
admincmdstream_t processLogicalLibrary_Ls;
admincmdstream_t processMediaType_Ls;
admincmdstream_t processMountPolicy_Ls;
......
Subproject commit 737c710bde9e1e0bd5a54fa69159dcd525b62ef7
Subproject commit ef1e4d7a679b8f60e48a6aa0f25e09c5851c2abc
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment