From 9f1752542cb8a3d8f052d719f30d96638d6407ff Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Mon, 4 Feb 2019 15:50:48 +0100 Subject: [PATCH] [os-failedrequests] Implements "cta-admin fr ls" --- cmdline/CtaAdminCmd.cpp | 34 +++++-- xroot_plugins/XrdCtaFailedRequestLs.hpp | 115 ++++++++++++++++-------- 2 files changed, 105 insertions(+), 44 deletions(-) diff --git a/cmdline/CtaAdminCmd.cpp b/cmdline/CtaAdminCmd.cpp index b2da936286..fccaf4bbe2 100644 --- a/cmdline/CtaAdminCmd.cpp +++ b/cmdline/CtaAdminCmd.cpp @@ -451,18 +451,40 @@ void CtaAdminCmd::print(const cta::admin::ArchiveFileLsSummary &afls_summary) void CtaAdminCmd::printFrLsHeader() { std::cout << TEXT_RED - << std::setfill(' ') << std::setw(11) << std::right << "request type" << ' ' - << std::setfill(' ') << std::setw(7) << std::right << "copy no" << ' ' - << std::setfill(' ') << std::setw(7) << std::right << "vid" << ' ' - << std::setfill(' ') << std::setw(8) << std::right << "requester" << ' ' - << std::setfill(' ') << std::setw(8) << std::right << "group" << ' ' + << std::setfill(' ') << std::setw(12) << std::right << "request type" << ' ' + << std::setfill(' ') << std::setw(8) << std::right << "copy no" << ' ' + << std::setfill(' ') << std::setw(13) << std::right << "tapepool/vid" << ' ' + << std::setfill(' ') << std::setw(10) << std::right << "requester" << ' ' + << std::setfill(' ') << std::setw(6) << std::right << "group" << ' ' << "path" << TEXT_NORMAL << std::endl; } void CtaAdminCmd::print(const cta::admin::FailedRequestLsItem &frls_item) { - throw std::runtime_error("Not implemented."); + std::string request_type; + std::string tapepool_vid; + + switch(frls_item.request_type()) { + case admin::RequestType::ARCHIVE_REQUEST: + request_type = "archive"; + tapepool_vid = frls_item.tapepool(); + break; + case admin::RequestType::RETRIEVE_REQUEST: + request_type = "retrieve"; + tapepool_vid = frls_item.tf().vid(); + break; + default: + throw std::runtime_error("Unrecognised request type: " + std::to_string(frls_item.request_type())); + } + + std::cout << std::setfill(' ') << std::setw(11) << std::right << request_type << ' ' + << std::setfill(' ') << std::setw(8) << std::right << frls_item.copy_nb() << ' ' + << std::setfill(' ') << std::setw(14) << std::right << tapepool_vid << ' ' + << std::setfill(' ') << std::setw(10) << std::right << frls_item.requester().username() << ' ' + << std::setfill(' ') << std::setw(6) << std::right << frls_item.requester().groupname() << ' ' + << frls_item.af().df().path() + << std::endl; } void CtaAdminCmd::printFrLsSummaryHeader() diff --git a/xroot_plugins/XrdCtaFailedRequestLs.hpp b/xroot_plugins/XrdCtaFailedRequestLs.hpp index ecf48ef620..43732580fc 100644 --- a/xroot_plugins/XrdCtaFailedRequestLs.hpp +++ b/xroot_plugins/XrdCtaFailedRequestLs.hpp @@ -33,7 +33,7 @@ class FailedRequestLsStream : public XrdSsiStream { public: FailedRequestLsStream(Scheduler &scheduler, bool is_archive, bool is_retrieve, - bool is_log_entries, bool is_summary, cta::log::LogContext &lc) : + bool is_log_entries, bool is_summary, log::LogContext &lc) : XrdSsiStream(XrdSsiStream::isActive), m_scheduler(scheduler), m_isArchive(is_archive), @@ -74,30 +74,42 @@ public: XrdSsiPb::OStreamBuffer<Data> *streambuf; try { - if(!m_isSummary && true) { - // Nothing more to send, close the stream - last = true; - return nullptr; - } - - streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); - - // Special handling for -S option if(m_isSummary) { - GetBuffSummary(streambuf); - dlen = streambuf->Size(); - last = true; - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); - return streambuf; + // Special handling for --summary option + streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); + GetBuffSummary(streambuf); + last = true; + } else if(m_isArchive) { + // List failed archive requests + streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); + + //auto aqList = m_scheduler.getArchiveJobsFailedQueues(m_lc); + + pushRecord(streambuf, admin::RequestType::ARCHIVE_REQUEST); + + } else if(m_isRetrieve) { + // List failed retrieve requests + streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); + +/* + Take a look at OStoreDB::getNextRetrieveJobsFailedBatch() + + 1. new method to get a list of Failed queues + 2. for each queue, get a list of jobs (without popping from the queue) + 3. have some maximum number of items that can be viewed (how should this be configured, in the config file, on the command line?) + 4. pop items off list of jobs, get details, add to stream, while !is_buffer_full + 5. when all queues are done and all jobs are done, set m_isRetrieve to false +*/ + + } else { + // Nothing more to send, close the stream + last = true; + return nullptr; } -#if 0 - m_scheduler.listQueueItems(m_cliIdentity.username, "failed queue", m_lc); - auto archiveJobFailedList = m_scheduler.getNextArchiveJobsFailedBatch(10,m_lc); - cmdlineOutput << "Failed archive jobs: " << archiveJobFailedList.size() << std::endl; - auto retrieveJobFailedList = m_scheduler.getRetrieveJobsFailedSummary(m_lc); - cmdlineOutput << "Failed retrieve jobs: " << retrieveJobFailedList.size() << std::endl; -#endif + dlen = streambuf->Size(); + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); + return streambuf; #if 0 for(bool is_buffer_full = false; m_archiveFileItor.hasMore() && !is_buffer_full; ) { @@ -138,27 +150,28 @@ public: } } #endif - dlen = streambuf->Size(); - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); - } catch(cta::exception::Exception &ex) { + } catch(exception::Exception &ex) { std::ostringstream errMsg; errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; } catch(std::exception &ex) { std::ostringstream errMsg; errMsg << __FUNCTION__ << " failed: " << ex.what(); eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; } catch(...) { std::ostringstream errMsg; errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; eInfo.Set(errMsg.str().c_str(), ECANCELED); - delete streambuf; } - return streambuf; + delete streambuf; + return nullptr; } + /*! + * Populate the failed queue summary + * + * @param[in] streambuf XRootD SSI stream object to push records to + */ void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf) { SchedulerDatabase::JobsFailedSummary archive_summary; SchedulerDatabase::JobsFailedSummary retrieve_summary; @@ -166,7 +179,7 @@ public: if(m_isArchive) { Data record; archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc); - record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::ARCHIVE_REQUEST); + record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST); record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles); record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes); streambuf->Push(record); @@ -174,14 +187,14 @@ public: if(m_isRetrieve) { Data record; retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc); - record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::RETRIEVE_REQUEST); + record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST); record.mutable_frls_summary()->set_total_files(retrieve_summary.totalFiles); record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes); streambuf->Push(record); } if(m_isArchive && m_isRetrieve) { Data record; - record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::TOTAL); + record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL); record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles); record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes + retrieve_summary.totalBytes); streambuf->Push(record); @@ -190,15 +203,41 @@ public: m_isSummary = false; } + /*! + * Add a record to the stream + * + * @param[in] streambuf XRootD SSI stream object to push records to + * @param[in] requestType The type of failed request (archive or retrieve) + */ + void pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, admin::RequestType requestType) { + Data record; + + switch(requestType) { + case admin::RequestType::ARCHIVE_REQUEST: + record.mutable_frls_item()->set_tapepool("tapepool"); break; + case admin::RequestType::RETRIEVE_REQUEST: + record.mutable_frls_item()->mutable_tf()->set_vid("vid"); break; + default: + throw exception::Exception("Unrecognised RequestType: " + std::to_string(requestType)); + } + record.mutable_frls_item()->set_request_type(requestType); + record.mutable_frls_item()->set_copy_nb(1); + record.mutable_frls_item()->mutable_requester()->set_username("u"); + record.mutable_frls_item()->mutable_requester()->set_groupname("g"); + record.mutable_frls_item()->mutable_af()->mutable_df()->set_path("/path/"); + //record.mutable_frls()->set_failurelogs(... + streambuf->Push(record); + } + private: - cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler + Scheduler &m_scheduler; //!< Reference to CTA Scheduler - bool m_isArchive; //!< List failed archive requests - bool m_isRetrieve; //!< List failed retrieve requests - bool m_isLogEntries; //!< Show failure log messages (verbose) - bool m_isSummary; //!< Short summary of number of failures + bool m_isArchive; //!< List failed archive requests + bool m_isRetrieve; //!< List failed retrieve requests + bool m_isLogEntries; //!< Show failure log messages (verbose) + bool m_isSummary; //!< Short summary of number of failures - cta::log::LogContext &m_lc; //!< Reference to CTA Log Context + log::LogContext &m_lc; //!< Reference to CTA Log Context static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for SSI log messages }; -- GitLab