diff --git a/cmdline/CMakeLists.txt b/cmdline/CMakeLists.txt index 3a9fc2e3af96eb0f8da5749fa0ae9052426e3633..9098c080de5c08551c43c311c821febcf6f42739 100644 --- a/cmdline/CMakeLists.txt +++ b/cmdline/CMakeLists.txt @@ -32,7 +32,12 @@ include_directories(${CMAKE_BINARY_DIR}/eos_cta ${PROTOBUF3_INCLUDE_DIRS}) # # cta-admin <admin_command> is the SSI version of "cta <admin_command>" # -add_executable(cta-admin CtaAdminCmd.cpp CtaAdminCmdParse.cpp CtaAdminTextFormatter.cpp) +add_executable(cta-admin + CtaAdminCmd.cpp + CtaAdminCmdParse.cpp + CtaAdminTextFormatter.cpp + ../common/dataStructures/DriveStatus.cpp + ../common/dataStructures/MountType.cpp) target_link_libraries(cta-admin XrdSsiPbEosCta XrdSsi-4 XrdSsiLib XrdUtils) set_property (TARGET cta-admin APPEND PROPERTY INSTALL_RPATH ${PROTOBUF3_RPATH}) diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp index 592507b405ad3ee9f9a390d5ace5381dde3f4838..f259e89e97edacadbf484bf8566bd2c6dd5bdacc 100644 --- a/cmdline/CtaAdminCmd.cpp +++ b/cmdline/CtaAdminCmd.cpp @@ -75,6 +75,7 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const case Data::kAflsItem: std::cout << Log::DumpProtobuf(&record.afls_item()); break; case Data::kAflsSummary: std::cout << Log::DumpProtobuf(&record.afls_summary()); break; case Data::kArlsItem: std::cout << Log::DumpProtobuf(&record.arls_item()); break; + case Data::kDrlsItem: std::cout << Log::DumpProtobuf(&record.drls_item()); break; case Data::kFrlsItem: std::cout << Log::DumpProtobuf(&record.frls_item()); break; case Data::kFrlsSummary: std::cout << Log::DumpProtobuf(&record.frls_summary()); break; case Data::kLpaItem: std::cout << Log::DumpProtobuf(&record.lpa_item()); break; @@ -94,6 +95,7 @@ void IStreamBuffer<cta::xrd::Data>::DataCallback(cta::xrd::Data record) const case Data::kAflsItem: formattedText.print(record.afls_item()); break; case Data::kAflsSummary: formattedText.print(record.afls_summary()); break; case Data::kArlsItem: formattedText.print(record.arls_item()); break; + case Data::kDrlsItem: formattedText.print(record.drls_item()); break; case Data::kFrlsItem: formattedText.print(record.frls_item()); break; case Data::kFrlsSummary: formattedText.print(record.frls_summary()); break; case Data::kLpaItem: formattedText.print(record.lpa_item()); break; @@ -236,19 +238,20 @@ void CtaAdminCmd::send() const std::cout << response.message_txt(); // Print streaming response header if(!isJson()) switch(response.show_header()) { - case HeaderType::ADMIN_LS: formattedText.printAdLsHeader(); break; - case HeaderType::ARCHIVEFILE_LS: formattedText.printAfLsHeader(); break; - case HeaderType::ARCHIVEFILE_LS_SUMMARY: formattedText.printAfLsSummaryHeader(); break; - case HeaderType::ARCHIVEROUTE_LS: formattedText.printArLsHeader(); break; - case HeaderType::FAILEDREQUEST_LS: formattedText.printFrLsHeader(); break; - case HeaderType::FAILEDREQUEST_LS_SUMMARY: formattedText.printFrLsSummaryHeader(); break; - case HeaderType::LISTPENDINGARCHIVES: formattedText.printLpaHeader(); break; - case HeaderType::LISTPENDINGARCHIVES_SUMMARY: formattedText.printLpaSummaryHeader(); break; - case HeaderType::LISTPENDINGRETRIEVES: formattedText.printLprHeader(); break; - case HeaderType::LISTPENDINGRETRIEVES_SUMMARY: formattedText.printLprSummaryHeader(); break; - case HeaderType::TAPEPOOL_LS: formattedText.printTapePoolLsHeader(); break; - case HeaderType::TAPE_LS: formattedText.printTapeLsHeader(); break; + case HeaderType::ADMIN_LS: formattedText.printAdminLsHeader(); break; + case HeaderType::ARCHIVEFILE_LS: formattedText.printArchiveFileLsHeader(); break; + case HeaderType::ARCHIVEFILE_LS_SUMMARY: formattedText.printArchiveFileLsSummaryHeader(); break; + case HeaderType::ARCHIVEROUTE_LS: formattedText.printArchiveRouteLsHeader(); break; + case HeaderType::DRIVE_LS: formattedText.printDriveLsHeader(); break; + case HeaderType::FAILEDREQUEST_LS: formattedText.printFailedRequestLsHeader(); break; + case HeaderType::FAILEDREQUEST_LS_SUMMARY: formattedText.printFailedRequestLsSummaryHeader(); break; + case HeaderType::LISTPENDINGARCHIVES: formattedText.printListPendingArchivesHeader(); break; + case HeaderType::LISTPENDINGARCHIVES_SUMMARY: formattedText.printListPendingArchivesSummaryHeader(); break; + case HeaderType::LISTPENDINGRETRIEVES: formattedText.printListPendingRetrievesHeader(); break; + case HeaderType::LISTPENDINGRETRIEVES_SUMMARY: formattedText.printListPendingRetrievesSummaryHeader(); break; case HeaderType::REPACK_LS: formattedText.printRepackLsHeader(); break; + case HeaderType::TAPE_LS: formattedText.printTapeLsHeader(); break; + case HeaderType::TAPEPOOL_LS: formattedText.printTapePoolLsHeader(); break; case HeaderType::NONE: default: break; } diff --git a/cmdline/CtaAdminTextFormatter.cpp b/cmdline/CtaAdminTextFormatter.cpp index 03021b3e5a92dbc995a192611aece34113a49326..311c210c07c8a25d545808d9ad0ad6d926f50ae4 100644 --- a/cmdline/CtaAdminTextFormatter.cpp +++ b/cmdline/CtaAdminTextFormatter.cpp @@ -20,9 +20,11 @@ #include <iostream> #include <iomanip> #include <cmdline/CtaAdminTextFormatter.hpp> +#include <common/dataStructures/DriveStatus.hpp> +#include <common/dataStructures/MountType.hpp> - -namespace cta { namespace admin { +namespace cta { +namespace admin { /** ** Generic utility methods @@ -109,7 +111,7 @@ void TextFormatter::flush() { ** Output for specific commands **/ -void TextFormatter::printAdLsHeader() { +void TextFormatter::printAdminLsHeader() { push_back("HEADER"); push_back( "user", @@ -123,7 +125,7 @@ void TextFormatter::printAdLsHeader() { ); } -void TextFormatter::print(const cta::admin::AdminLsItem &adls_item) { +void TextFormatter::print(const AdminLsItem &adls_item) { push_back( adls_item.user(), adls_item.creation_log().username(), @@ -136,7 +138,7 @@ void TextFormatter::print(const cta::admin::AdminLsItem &adls_item) { ); } -void TextFormatter::printAfLsHeader() { +void TextFormatter::printArchiveFileLsHeader() { push_back("HEADER"); push_back( "archive id", @@ -159,7 +161,7 @@ void TextFormatter::printAfLsHeader() { ); } -void TextFormatter::print(const cta::admin::ArchiveFileLsItem &afls_item) { +void TextFormatter::print(const ArchiveFileLsItem &afls_item) { push_back( afls_item.af().archive_id(), afls_item.copy_nb(), @@ -181,7 +183,23 @@ void TextFormatter::print(const cta::admin::ArchiveFileLsItem &afls_item) { ); } -void TextFormatter::printArLsHeader() { +void TextFormatter::printArchiveFileLsSummaryHeader() { + push_back("HEADER"); + push_back( + "total files", + "total size" + ); +} + +void TextFormatter::print(const ArchiveFileLsSummary &afls_summary) +{ + push_back( + afls_summary.total_files(), + dataSizeToStr(afls_summary.total_size()) + ); +} + +void TextFormatter::printArchiveRouteLsHeader() { push_back("HEADER"); push_back( "instance", @@ -198,7 +216,7 @@ void TextFormatter::printArLsHeader() { ); } -void TextFormatter::print(const cta::admin::ArchiveRouteLsItem &arls_item) { +void TextFormatter::print(const ArchiveRouteLsItem &arls_item) { push_back( arls_item.instance(), arls_item.storage_class(), @@ -214,23 +232,87 @@ void TextFormatter::print(const cta::admin::ArchiveRouteLsItem &arls_item) { ); } -void TextFormatter::printAfLsSummaryHeader() { + +void TextFormatter::printDriveLsHeader() { push_back("HEADER"); push_back( - "total files", - "total size" + "library", + "drive", + "host", + "desired", + "request", + "status", + "since", + "vid", + "tapepool", + "files", + "data", + "MB/s", + "session", + "priority", + "activity", + "age" ); } -void TextFormatter::print(const cta::admin::ArchiveFileLsSummary &afls_summary) +void TextFormatter::print(const DriveLsItem &drls_item) { + using namespace cta::common::dataStructures; + + const int DRIVE_TIMEOUT = 600; // Time after which a drive will be marked as STALE + + std::string mountType; + std::string driveStatus; + std::string driveStatusSince; + std::string filesTransferredInSession; + std::string bytesTransferredInSession; + std::string latestBandwidth; + std::string sessionId; + std::string timeSinceLastUpdate; + + mountType = toString(static_cast<MountType>(drls_item.mount_type())); + driveStatus = toString(static_cast<DriveStatus>(drls_item.drive_status())); + + if(drls_item.drive_status() != DriveStatus::Unknown) { + driveStatusSince = std::to_string(drls_item.drive_status_since()); + } + + if(drls_item.drive_status() == DriveStatus::Transferring) { + filesTransferredInSession = std::to_string(drls_item.files_transferred_in_session()); + bytesTransferredInSession = dataSizeToStr(drls_item.bytes_transferred_in_session()); + latestBandwidth = std::to_string(drls_item.latest_bandwidth()); + } + + if(drls_item.drive_status() != DriveStatus::Up && + drls_item.drive_status() != DriveStatus::Down && + drls_item.drive_status() != DriveStatus::Unknown) { + sessionId = std::to_string(drls_item.session_id()); + } + + timeSinceLastUpdate = std::to_string(drls_item.time_since_last_update()) + + (drls_item.time_since_last_update() > DRIVE_TIMEOUT ? " [STALE]" : ""); + push_back( - afls_summary.total_files(), - dataSizeToStr(afls_summary.total_size()) + drls_item.logical_library(), + drls_item.drive_name(), + drls_item.host(), + drls_item.desired_drive_state() == DriveLsItem::UP ? "Up" : "Down", + mountType, + driveStatus, + driveStatusSince, + drls_item.vid(), + drls_item.tapepool(), + filesTransferredInSession, + bytesTransferredInSession, + latestBandwidth, + sessionId, + drls_item.current_priority(), + drls_item.current_activity(), + timeSinceLastUpdate ); } -void TextFormatter::printFrLsHeader() { +void TextFormatter::printFailedRequestLsHeader() { push_back("HEADER"); push_back( "request type", @@ -242,7 +324,7 @@ void TextFormatter::printFrLsHeader() { ); } -void TextFormatter::print(const cta::admin::FailedRequestLsItem &frls_item) { +void TextFormatter::print(const FailedRequestLsItem &frls_item) { std::string request_type; std::string tapepool_vid; @@ -272,7 +354,7 @@ void TextFormatter::print(const cta::admin::FailedRequestLsItem &frls_item) { // displayed in the text output, only in JSON. } -void TextFormatter::printFrLsSummaryHeader() { +void TextFormatter::printFailedRequestLsSummaryHeader() { push_back("HEADER"); push_back( "request type", @@ -281,10 +363,10 @@ void TextFormatter::printFrLsSummaryHeader() { ); } -void TextFormatter::print(const cta::admin::FailedRequestLsSummary &frls_summary) { +void TextFormatter::print(const FailedRequestLsSummary &frls_summary) { std::string request_type = - frls_summary.request_type() == cta::admin::RequestType::ARCHIVE_REQUEST ? "archive" : - frls_summary.request_type() == cta::admin::RequestType::RETRIEVE_REQUEST ? "retrieve" : "total"; + frls_summary.request_type() == RequestType::ARCHIVE_REQUEST ? "archive" : + frls_summary.request_type() == RequestType::RETRIEVE_REQUEST ? "retrieve" : "total"; push_back( request_type, @@ -293,7 +375,7 @@ void TextFormatter::print(const cta::admin::FailedRequestLsSummary &frls_summary ); } -void TextFormatter::printLpaHeader() { +void TextFormatter::printListPendingArchivesHeader() { push_back("HEADER"); push_back( "tapepool", @@ -311,7 +393,7 @@ void TextFormatter::printLpaHeader() { ); } -void TextFormatter::print(const cta::admin::ListPendingArchivesItem &lpa_item) { +void TextFormatter::print(const ListPendingArchivesItem &lpa_item) { push_back( lpa_item.tapepool(), lpa_item.af().archive_id(), @@ -328,7 +410,7 @@ void TextFormatter::print(const cta::admin::ListPendingArchivesItem &lpa_item) { ); } -void TextFormatter::printLpaSummaryHeader() { +void TextFormatter::printListPendingArchivesSummaryHeader() { push_back("HEADER"); push_back( "tapepool", @@ -337,7 +419,7 @@ void TextFormatter::printLpaSummaryHeader() { ); } -void TextFormatter::print(const cta::admin::ListPendingArchivesSummary &lpa_summary) { +void TextFormatter::print(const ListPendingArchivesSummary &lpa_summary) { push_back( lpa_summary.tapepool(), lpa_summary.total_files(), @@ -345,7 +427,7 @@ void TextFormatter::print(const cta::admin::ListPendingArchivesSummary &lpa_summ ); } -void TextFormatter::printLprHeader() { +void TextFormatter::printListPendingRetrievesHeader() { push_back("HEADER"); push_back( "vid", @@ -360,7 +442,7 @@ void TextFormatter::printLprHeader() { ); } -void TextFormatter::print(const cta::admin::ListPendingRetrievesItem &lpr_item) { +void TextFormatter::print(const ListPendingRetrievesItem &lpr_item) { push_back( lpr_item.tf().vid(), lpr_item.af().archive_id(), @@ -374,7 +456,7 @@ void TextFormatter::print(const cta::admin::ListPendingRetrievesItem &lpr_item) ); } -void TextFormatter::printLprSummaryHeader() { +void TextFormatter::printListPendingRetrievesSummaryHeader() { push_back("HEADER"); push_back( "vid", @@ -383,7 +465,7 @@ void TextFormatter::printLprSummaryHeader() { ); } -void TextFormatter::print(const cta::admin::ListPendingRetrievesSummary &lpr_summary) { +void TextFormatter::print(const ListPendingRetrievesSummary &lpr_summary) { push_back( lpr_summary.vid(), lpr_summary.total_files(), @@ -391,6 +473,47 @@ void TextFormatter::print(const cta::admin::ListPendingRetrievesSummary &lpr_sum ); } +void TextFormatter::printRepackLsHeader() { + push_back("HEADER"); + push_back( + "vid", + "repackBufferURL", + "userProvidedFiles", + "totalFilesToRetrieve", + "totalBytesToRetrieve", + "totalFilesToArchive", + "totalBytesToArchive", + "retrievedFiles", + "archivedFiles", + "failedToRetrieveFiles", + "failedToRetrieveBytes", + "failedToArchiveFiles", + "failedToArchiveBytes", + "lastExpandedFSeq", + "status" + ); +} + +void TextFormatter::print(const RepackLsItem &rels_item) { + push_back( + rels_item.vid(), + rels_item.repack_buffer_url(), + rels_item.user_provided_files(), + rels_item.total_files_to_retrieve(), + dataSizeToStr(rels_item.total_bytes_to_retrieve()), + rels_item.total_files_to_archive(), + dataSizeToStr(rels_item.total_bytes_to_archive()), + rels_item.retrieved_files(), + rels_item.archived_files(), + rels_item.failed_to_retrieve_files(), + dataSizeToStr(rels_item.failed_to_retrieve_bytes()), + rels_item.failed_to_archive_files(), + dataSizeToStr(rels_item.failed_to_retrieve_bytes()), + rels_item.last_expanded_fseq(), + rels_item.status() + ); +} + void TextFormatter::printTapeLsHeader() { push_back("HEADER"); push_back( @@ -421,7 +544,7 @@ void TextFormatter::printTapeLsHeader() { ); } -void TextFormatter::print(const cta::admin::TapeLsItem &tals_item) { +void TextFormatter::print(const TapeLsItem &tals_item) { push_back( tals_item.vid(), tals_item.media_type(), @@ -450,47 +573,6 @@ void TextFormatter::print(const cta::admin::TapeLsItem &tals_item) { ); } -void TextFormatter::printRepackLsHeader() { - push_back("HEADER"); - push_back( - "vid", - "repackBufferURL", - "userProvidedFiles", - "totalFilesToRetrieve", - "totalBytesToRetrieve", - "totalFilesToArchive", - "totalBytesToArchive", - "retrievedFiles", - "archivedFiles", - "failedToRetrieveFiles", - "failedToRetrieveBytes", - "failedToArchiveFiles", - "failedToArchiveBytes", - "lastExpandedFSeq", - "status" - ); -} - -void TextFormatter::print(const cta::admin::RepackLsItem &rels_item) { - push_back( - rels_item.vid(), - rels_item.repack_buffer_url(), - rels_item.user_provided_files(), - rels_item.total_files_to_retrieve(), - dataSizeToStr(rels_item.total_bytes_to_retrieve()), - rels_item.total_files_to_archive(), - dataSizeToStr(rels_item.total_bytes_to_archive()), - rels_item.retrieved_files(), - rels_item.archived_files(), - rels_item.failed_to_retrieve_files(), - dataSizeToStr(rels_item.failed_to_retrieve_bytes()), - rels_item.failed_to_archive_files(), - dataSizeToStr(rels_item.failed_to_retrieve_bytes()), - rels_item.last_expanded_fseq(), - rels_item.status() - ); -} - void TextFormatter::printTapePoolLsHeader() { push_back("HEADER"); push_back( @@ -515,7 +597,7 @@ void TextFormatter::printTapePoolLsHeader() { ); } -void TextFormatter::print(const cta::admin::TapePoolLsItem &tpls_item) +void TextFormatter::print(const TapePoolLsItem &tpls_item) { uint64_t avail = tpls_item.capacity_bytes() > tpls_item.data_bytes() ? tpls_item.capacity_bytes()-tpls_item.data_bytes() : 0; diff --git a/cmdline/CtaAdminTextFormatter.hpp b/cmdline/CtaAdminTextFormatter.hpp index 29eedde121aa15c5e25482c68f1fed3e5102f92c..60f6720c3eead167d6dd06ac9a3b9bacdbcae82a 100644 --- a/cmdline/CtaAdminTextFormatter.hpp +++ b/cmdline/CtaAdminTextFormatter.hpp @@ -44,34 +44,36 @@ public: } // Output headers - void printAdLsHeader(); - void printAfLsHeader(); - void printAfLsSummaryHeader(); - void printArLsHeader(); - void printFrLsHeader(); - void printFrLsSummaryHeader(); - void printLpaHeader(); - void printLpaSummaryHeader(); - void printLprHeader(); - void printLprSummaryHeader(); - void printTapePoolLsHeader(); - void printTapeLsHeader(); + void printAdminLsHeader(); + void printArchiveFileLsHeader(); + void printArchiveFileLsSummaryHeader(); + void printArchiveRouteLsHeader(); + void printDriveLsHeader(); + void printFailedRequestLsHeader(); + void printFailedRequestLsSummaryHeader(); + void printListPendingArchivesHeader(); + void printListPendingArchivesSummaryHeader(); + void printListPendingRetrievesHeader(); + void printListPendingRetrievesSummaryHeader(); void printRepackLsHeader(); + void printTapeLsHeader(); + void printTapePoolLsHeader(); // Output records void print(const AdminLsItem &adls_item); void print(const ArchiveFileLsItem &afls_item); void print(const ArchiveFileLsSummary &afls_summary); void print(const ArchiveRouteLsItem &afls_item); + void print(const DriveLsItem &drls_item); void print(const FailedRequestLsItem &frls_item); void print(const FailedRequestLsSummary &frls_summary); void print(const ListPendingArchivesItem &lpa_item); void print(const ListPendingArchivesSummary &lpa_summary); void print(const ListPendingRetrievesItem &lpr_item); void print(const ListPendingRetrievesSummary &lpr_summary); - void print(const TapePoolLsItem &tpls_item); - void print(const TapeLsItem &tals_item); void print(const RepackLsItem &rels_item); + void print(const TapeLsItem &tals_item); + void print(const TapePoolLsItem &tpls_item); private: //! Add a line to the buffer diff --git a/xroot_plugins/XrdCtaAdminLs.hpp b/xroot_plugins/XrdCtaAdminLs.hpp index 1ad3dd61e57aa5b652b25c58b0aa9c39fcca3d32..4ef232b49ad44fc26b43b00eebdabfcfac812c7c 100644 --- a/xroot_plugins/XrdCtaAdminLs.hpp +++ b/xroot_plugins/XrdCtaAdminLs.hpp @@ -70,7 +70,6 @@ int AdminLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { for(bool is_buffer_full = false; !m_adminList.empty() && !is_buffer_full; m_adminList.pop_front()) { Data record; - // TapePool auto &ad = m_adminList.front(); auto ad_item = record.mutable_adls_item(); diff --git a/xroot_plugins/XrdCtaArchiveRouteLs.hpp b/xroot_plugins/XrdCtaArchiveRouteLs.hpp index 9eb9acbb2148a1d8b27b132907ccc8e4cea45fa7..3ae5e1e3ffa6096d7521dee72d5fc22c025eb6da 100644 --- a/xroot_plugins/XrdCtaArchiveRouteLs.hpp +++ b/xroot_plugins/XrdCtaArchiveRouteLs.hpp @@ -70,7 +70,6 @@ int ArchiveRouteLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { for(bool is_buffer_full = false; !m_archiveRouteList.empty() && !is_buffer_full; m_archiveRouteList.pop_front()) { Data record; - // TapePool auto &ar = m_archiveRouteList.front(); auto ar_item = record.mutable_arls_item(); diff --git a/xroot_plugins/XrdCtaDriveLs.hpp b/xroot_plugins/XrdCtaDriveLs.hpp new file mode 100644 index 0000000000000000000000000000000000000000..d62385e77339d4157e6a44acf320707bf8c5eeeb --- /dev/null +++ b/xroot_plugins/XrdCtaDriveLs.hpp @@ -0,0 +1,150 @@ +/*! + * @project The CERN Tape Archive (CTA) + * @brief CTA Admin Ls stream implementation + * @copyright Copyright 2019 CERN + * @license This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <xroot_plugins/XrdCtaStream.hpp> +#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp> + + +namespace cta { namespace xrd { + +/*! + * Stream object which implements "tapepool ls" command + */ +class DriveLsStream: public XrdCtaStream{ +public: + /*! + * Constructor + * + * @param[in] requestMsg RequestMessage containing command-line arguments + * @param[in] catalogue CTA Catalogue + * @param[in] scheduler CTA Scheduler + */ + DriveLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler, + const cta::common::dataStructures::SecurityIdentity &clientID, log::LogContext &lc); + +private: + /*! + * Can we close the stream? + */ + virtual bool isDone() const { + return m_driveList.empty(); + } + + /*! + * Fill the buffer + */ + virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf); + + std::list<cta::common::dataStructures::DriveState> m_driveList; //!< List of drives from the scheduler + + static constexpr const char* const LOG_SUFFIX = "DriveLsStream"; //!< Identifier for log messages +}; + + +DriveLsStream::DriveLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, + cta::Scheduler &scheduler, const cta::common::dataStructures::SecurityIdentity &clientID, + log::LogContext &lc) : + XrdCtaStream(catalogue, scheduler), + m_driveList(scheduler.getDriveStates(clientID, lc)) +{ + using namespace cta::admin; + + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "DriveLsStream() constructor"); + + auto driveRegexOpt = requestMsg.getOptional(OptionString::DRIVE); + + // Dump all drives unless we specified a drive + if(driveRegexOpt) { + std::string driveRegexStr = '^' + driveRegexOpt.value() + '$'; + utils::Regex driveRegex(driveRegexStr.c_str()); + + // Remove non-matching drives from the list + for(auto dr_it = m_driveList.begin(); dr_it != m_driveList.end(); ) { + if(driveRegex.has_match(dr_it->driveName)) { + ++dr_it; + } else { + auto erase_it = dr_it; + ++dr_it; + m_driveList.erase(erase_it); + } + } + + if(m_driveList.empty()) { + throw exception::UserError(std::string("No such drive: ") + driveRegexOpt.value()); + } + } + + // Sort drives in the result set into lexicographic order + typedef decltype(*m_driveList.begin()) dStateVal_t; + m_driveList.sort([](const dStateVal_t &a, const dStateVal_t &b){ return a.driveName < b.driveName; }); +} + +int DriveLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { + using namespace cta::admin; + + for(bool is_buffer_full = false; !m_driveList.empty() && !is_buffer_full; m_driveList.pop_front()) { + Data record; + + auto &dr = m_driveList.front(); + auto dr_item = record.mutable_drls_item(); + + dr_item->set_logical_library(dr.logicalLibrary); + dr_item->set_drive_name(dr.driveName); + dr_item->set_host(dr.host); + dr_item->set_desired_drive_state(dr.desiredDriveState.up ? DriveLsItem::UP : DriveLsItem::DOWN); + dr_item->set_mount_type(static_cast<uint32_t>(dr.mountType)); + dr_item->set_drive_status(dr.driveStatus); + dr_item->set_vid(dr.currentVid); + dr_item->set_tapepool(dr.currentTapePool); + dr_item->set_files_transferred_in_session(dr.filesTransferredInSession); + dr_item->set_bytes_transferred_in_session(dr.bytesTransferredInSession); + dr_item->set_latest_bandwidth(dr.latestBandwidth); + dr_item->set_session_id(dr.sessionId); + dr_item->set_time_since_last_update(time(nullptr)-dr.lastUpdateTime); + dr_item->set_current_priority(dr.currentPriority); + dr_item->set_current_activity(dr.currentActivityAndWeight ? dr.currentActivityAndWeight.value().activity : ""); + + // set the time spent in the current state + uint64_t drive_time = time(nullptr); + + switch(dr.driveStatus) { + using namespace cta::common::dataStructures; + + case DriveStatus::Probing: drive_time -= dr.probeStartTime; break; + case DriveStatus::Up: drive_time -= dr.downOrUpStartTime; break; + case DriveStatus::Down: drive_time -= dr.downOrUpStartTime; break; + case DriveStatus::Starting: drive_time -= dr.startStartTime; break; + case DriveStatus::Mounting: drive_time -= dr.mountStartTime; break; + case DriveStatus::Transferring: drive_time -= dr.transferStartTime; break; + case DriveStatus::CleaningUp: drive_time -= dr.cleanupStartTime; break; + case DriveStatus::Unloading: drive_time -= dr.unloadStartTime; break; + case DriveStatus::Unmounting: drive_time -= dr.unmountStartTime; break; + case DriveStatus::DrainingToDisk: drive_time -= dr.drainingStartTime; break; + case DriveStatus::Shutdown: drive_time -= dr.shutdownTime; break; + case DriveStatus::Unknown: break; + } + dr_item->set_drive_status_since(drive_time); + + is_buffer_full = streambuf->Push(record); + } + return streambuf->Size(); +} + +}} // namespace cta::xrd diff --git a/xroot_plugins/XrdCtaTapePoolLs.hpp b/xroot_plugins/XrdCtaTapePoolLs.hpp index 4cd527fd2fb2afc0ce61cf26c7122c2cbbb81d1b..f429df10ea137630f82b7acadb82f1adbb27085b 100644 --- a/xroot_plugins/XrdCtaTapePoolLs.hpp +++ b/xroot_plugins/XrdCtaTapePoolLs.hpp @@ -70,7 +70,6 @@ int TapePoolLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { for(bool is_buffer_full = false; !m_tapePoolList.empty() && !is_buffer_full; m_tapePoolList.pop_front()) { Data record; - // TapePool auto &tp = m_tapePoolList.front(); auto tp_item = record.mutable_tpls_item(); diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 626fbdd4bc9e70fb7f1b0af333c47bb303903c4e..30cf387e347b79d86d59e7db2d4aa336fb30c295 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -25,16 +25,16 @@ using XrdSsiPb::PbException; #include <cmdline/CtaAdminCmdParse.hpp> +#include "XrdSsiCtaRequestMessage.hpp" #include "XrdCtaAdminLs.hpp" #include "XrdCtaArchiveFileLs.hpp" #include "XrdCtaArchiveRouteLs.hpp" +#include "XrdCtaDriveLs.hpp" #include "XrdCtaFailedRequestLs.hpp" #include "XrdCtaListPendingQueue.hpp" -#include "XrdCtaTapePoolLs.hpp" -#include "XrdSsiCtaRequestMessage.hpp" -#include "XrdCtaTapeLs.hpp" #include "XrdCtaRepackLs.hpp" - +#include "XrdCtaTapeLs.hpp" +#include "XrdCtaTapePoolLs.hpp" namespace cta { namespace xrd { @@ -138,7 +138,7 @@ void RequestMessage::process(const cta::xrd::Request &request, cta::xrd::Respons processDrive_Down(response); break; case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_LS): - processDrive_Ls(response); + processDrive_Ls(response, stream); break; case cmd_pair(AdminCmd::CMD_DRIVE, AdminCmd::SUBCMD_RM): processDrive_Rm(response); @@ -801,112 +801,15 @@ void RequestMessage::processDrive_Down(cta::xrd::Response &response) -void RequestMessage::processDrive_Ls(cta::xrd::Response &response) +void RequestMessage::processDrive_Ls(cta::xrd::Response &response, XrdSsiStream* &stream) { - using namespace cta::admin; - - const int DRIVE_TIMEOUT = 600; - - std::stringstream cmdlineOutput; - - // Dump all drives unless we specified a drive - bool hasRegex = false; - bool driveFound = false; - - auto driveRegexOpt = getOptional(OptionString::DRIVE, &hasRegex); - std::string driveRegexStr = hasRegex ? '^' + driveRegexOpt.value() + '$' : "."; - utils::Regex driveRegex(driveRegexStr.c_str()); - - auto driveStates = m_scheduler.getDriveStates(m_cliIdentity, m_lc); - if (!driveStates.empty()) - { - std::vector<std::vector<std::string>> responseTable; - std::vector<std::string> headers = { - "library","drive","host","desired","request","status","since","vid","tapepool","files", - "MBytes","MB/s","session","priority","activity","age" - }; - responseTable.push_back(headers); - - typedef decltype(*driveStates.begin()) dStateVal_t; - driveStates.sort([](const dStateVal_t &a, const dStateVal_t &b){ return a.driveName < b.driveName; }); - - for (auto ds: driveStates) - { - if(!driveRegex.has_match(ds.driveName)) continue; - driveFound = true; - - auto timeSinceLastUpdate_s = time(nullptr) - ds.lastUpdateTime; - - std::vector<std::string> currentRow; - currentRow.push_back(ds.logicalLibrary); - currentRow.push_back(ds.driveName); - currentRow.push_back(ds.host); - currentRow.push_back(ds.desiredDriveState.up ? "Up" : "Down"); - currentRow.push_back(cta::common::dataStructures::toString(ds.mountType)); - currentRow.push_back(cta::common::dataStructures::toString(ds.driveStatus)); - - // print the time spent in the current state - unsigned long long drive_time = time(nullptr); - - switch(ds.driveStatus) { - using namespace cta::common::dataStructures; - - case DriveStatus::Probing: drive_time -= ds.probeStartTime; break; - case DriveStatus::Up: - case DriveStatus::Down: drive_time -= ds.downOrUpStartTime; break; - case DriveStatus::Starting: drive_time -= ds.startStartTime; break; - case DriveStatus::Mounting: drive_time -= ds.mountStartTime; break; - case DriveStatus::Transferring: drive_time -= ds.transferStartTime; break; - case DriveStatus::CleaningUp: drive_time -= ds.cleanupStartTime; break; - case DriveStatus::Unloading: drive_time -= ds.unloadStartTime; break; - case DriveStatus::Unmounting: drive_time -= ds.unmountStartTime; break; - case DriveStatus::DrainingToDisk: drive_time -= ds.drainingStartTime; break; - case DriveStatus::Shutdown: drive_time -= ds.shutdownTime; break; - case DriveStatus::Unknown: break; - } - currentRow.push_back(ds.driveStatus == cta::common::dataStructures::DriveStatus::Unknown ? "-" : - std::to_string(drive_time)); - - currentRow.push_back(ds.currentVid == "" ? "-" : ds.currentVid); - currentRow.push_back(ds.currentTapePool == "" ? "-" : ds.currentTapePool); - - switch (ds.driveStatus) { - case cta::common::dataStructures::DriveStatus::Transferring: - currentRow.push_back(std::to_string(static_cast<unsigned long long>(ds.filesTransferredInSession))); - currentRow.push_back(bytesToMbString(ds.bytesTransferredInSession)); - currentRow.push_back(bytesToMbString(ds.latestBandwidth)); - break; - default: - currentRow.push_back("-"); - currentRow.push_back("-"); - currentRow.push_back("-"); - } - switch(ds.driveStatus) { - case cta::common::dataStructures::DriveStatus::Up: - case cta::common::dataStructures::DriveStatus::Down: - case cta::common::dataStructures::DriveStatus::Unknown: - currentRow.push_back("-"); - break; - default: - currentRow.push_back(std::to_string(static_cast<unsigned long long>(ds.sessionId))); - } - currentRow.push_back(std::to_string(ds.currentPriority)); - currentRow.push_back(ds.currentActivityAndWeight?ds.currentActivityAndWeight.value().activity: "-"); - currentRow.push_back(std::to_string(timeSinceLastUpdate_s) + - (timeSinceLastUpdate_s > DRIVE_TIMEOUT ? " [STALE]" : "")); - responseTable.push_back(currentRow); - } - - if (hasRegex && !driveFound) { - throw cta::exception::UserError(std::string("No such drive: ") + driveRegexOpt.value()); - } + using namespace cta::admin; - m_option_bool[OptionBoolean::SHOW_HEADER] = true; - cmdlineOutput<< formatResponse(responseTable); - } + // Create a XrdSsi stream object to return the results + stream = new DriveLsStream(*this, m_catalogue, m_scheduler, m_cliIdentity, m_lc); - response.set_message_txt(cmdlineOutput.str()); - response.set_type(cta::xrd::Response::RSP_SUCCESS); + response.set_show_header(HeaderType::DRIVE_LS); + response.set_type(cta::xrd::Response::RSP_SUCCESS); } diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.hpp b/xroot_plugins/XrdSsiCtaRequestMessage.hpp index 8515f9aba4f0c7a4495989f9a2eb0df24d3249ef..b681664322ff625bbdc8f37933dd642259bce8f6 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.hpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.hpp @@ -158,7 +158,6 @@ private: void processArchiveRoute_Rm (cta::xrd::Response &response); void processDrive_Up (cta::xrd::Response &response); void processDrive_Down (cta::xrd::Response &response); - void processDrive_Ls (cta::xrd::Response &response); void processDrive_Rm (cta::xrd::Response &response); void processGroupMountRule_Add (cta::xrd::Response &response); void processGroupMountRule_Ch (cta::xrd::Response &response); @@ -205,6 +204,7 @@ private: admincmdstream_t processAdmin_Ls; admincmdstream_t processArchiveFile_Ls; admincmdstream_t processArchiveRoute_Ls; + admincmdstream_t processDrive_Ls; admincmdstream_t processFailedRequest_Ls; admincmdstream_t processListPendingArchives; admincmdstream_t processListPendingRetrieves; diff --git a/xrootd-ssi-protobuf-interface b/xrootd-ssi-protobuf-interface index 17f1f77acc76a15bd9f494cfd65bdab68ee8ac19..86937b02d89bdce5bffe792865257ae66b8f9374 160000 --- a/xrootd-ssi-protobuf-interface +++ b/xrootd-ssi-protobuf-interface @@ -1 +1 @@ -Subproject commit 17f1f77acc76a15bd9f494cfd65bdab68ee8ac19 +Subproject commit 86937b02d89bdce5bffe792865257ae66b8f9374