diff --git a/xroot_plugins/XrdCtaFailedRequestLs.hpp b/xroot_plugins/XrdCtaFailedRequestLs.hpp index e0fe876672938b4a8ce4789acf169fa8abef324b..39edc12bd478d979e5c488761ec7a4860765f44f 100644 --- a/xroot_plugins/XrdCtaFailedRequestLs.hpp +++ b/xroot_plugins/XrdCtaFailedRequestLs.hpp @@ -1,7 +1,7 @@ /*! * @project The CERN Tape Archive (CTA) - * @brief CTA Frontend Archive File Ls stream implementation - * @copyright Copyright 2017 CERN + * @brief CTA Frontend Failed Request Ls stream implementation + * @copyright Copyright 2019 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or @@ -27,20 +27,18 @@ namespace cta { namespace xrd { /*! - * Stream object which implements "af ls" command. + * Stream object which implements "failedrequest ls" command. */ class FailedRequestLsStream : public XrdSsiStream { public: FailedRequestLsStream(Scheduler &scheduler, OStoreDB::ArchiveQueueItor_t archiveQueueItor, - OStoreDB::RetrieveQueueItor_t retrieveQueueItor, bool is_archive, bool is_retrieve, - bool is_log_entries, bool is_summary, log::LogContext &lc) : + OStoreDB::RetrieveQueueItor_t retrieveQueueItor, bool is_log_entries, bool is_summary, + log::LogContext &lc) : XrdSsiStream(XrdSsiStream::isActive), m_scheduler(scheduler), m_archiveQueueItor(std::move(archiveQueueItor)), m_retrieveQueueItor(std::move(retrieveQueueItor)), - m_isArchive(is_archive), - m_isRetrieve(is_retrieve), m_isLogEntries(is_log_entries), m_isSummary(is_summary), m_lc(lc) @@ -82,79 +80,28 @@ public: streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); GetBuffSummary(streambuf); last = true; - } else if(m_isArchive) { + } else if(!m_archiveQueueItor.end()) { // List failed archive requests streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); - pushRecord(streambuf, admin::RequestType::ARCHIVE_REQUEST); - - m_isArchive = false; - } else if(m_isRetrieve) { + for(bool is_buffer_full = false; !m_archiveQueueItor.end() && !is_buffer_full; ++m_archiveQueueItor) { + is_buffer_full = pushRecord(streambuf, m_archiveQueueItor.qid(), *m_archiveQueueItor); + } + } else if(!m_retrieveQueueItor.end()) { // List failed retrieve requests streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); - pushRecord(streambuf, admin::RequestType::RETRIEVE_REQUEST); - -/* - Take a look at OStoreDB::getNextRetrieveJobsFailedBatch() - - 1. new method to get a list of Failed queues - 2. for each queue, get a list of jobs (without popping from the queue) - 3. have some maximum number of items that can be viewed (how should this be configured, in the config file, on the command line?) - 4. pop items off list of jobs, get details, add to stream, while !is_buffer_full - 5. when all queues are done and all jobs are done, set m_isRetrieve to false -*/ - - m_isRetrieve = false; + for(bool is_buffer_full = false; !m_retrieveQueueItor.end() && !is_buffer_full; ++m_retrieveQueueItor) { + is_buffer_full = pushRecord(streambuf, m_retrieveQueueItor.qid(), *m_retrieveQueueItor); + } } else { // Nothing more to send, close the stream last = true; return nullptr; } - dlen = streambuf->Size(); XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); return streambuf; -#if 0 - for(bool is_buffer_full = false; m_archiveFileItor.hasMore() && !is_buffer_full; ) - { - const cta::common::dataStructures::FailedRequest archiveFile = m_archiveFileItor.next(); - - for(auto jt = archiveFile.tapeFiles.cbegin(); jt != archiveFile.tapeFiles.cend(); jt++) { - Data record; - - // Copy number - record.mutable_afls_item()->set_copy_nb(jt->first); - - // Archive file - auto af = record.mutable_afls_item()->mutable_af(); - af->set_archive_id(archiveFile.archiveFileID); - af->set_disk_instance(archiveFile.diskInstance); - af->set_disk_id(archiveFile.diskFileId); - af->set_size(archiveFile.fileSize); - af->mutable_cs()->set_type(archiveFile.checksumType); - af->mutable_cs()->set_value(archiveFile.checksumValue); - af->set_storage_class(archiveFile.storageClass); - af->mutable_df()->set_owner(archiveFile.diskFileInfo.owner); - af->mutable_df()->set_group(archiveFile.diskFileInfo.group); - af->mutable_df()->set_path(archiveFile.diskFileInfo.path); - af->set_creation_time(archiveFile.creationTime); - - // Tape file - auto tf = record.mutable_afls_item()->mutable_tf(); - tf->set_vid(jt->second.vid); - tf->set_f_seq(jt->second.fSeq); - tf->set_block_id(jt->second.blockId); - - // is_buffer_full is set to true when we have one full block of data in the buffer, i.e. - // enough data to send to the client. The actual buffer size is double the block size, - // so we can keep writing a few additional records after is_buffer_full is true. These - // will be sent on the next iteration. If we exceed the hard limit of double the block - // size, Push() will throw an exception. - is_buffer_full = streambuf->Push(record); - } - } -#endif } catch(exception::Exception &ex) { std::ostringstream errMsg; errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); @@ -181,7 +128,10 @@ public: SchedulerDatabase::JobsFailedSummary archive_summary; SchedulerDatabase::JobsFailedSummary retrieve_summary; - if(m_isArchive) { + bool isArchive = !m_archiveQueueItor.end(); + bool isRetrieve = !m_retrieveQueueItor.end(); + + if(isArchive) { Data record; archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc); record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST); @@ -189,7 +139,7 @@ public: record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes); streambuf->Push(record); } - if(m_isRetrieve) { + if(isRetrieve) { Data record; retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc); record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST); @@ -197,7 +147,7 @@ public: record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes); streambuf->Push(record); } - if(m_isArchive && m_isRetrieve) { + if(isArchive && isRetrieve) { Data record; record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL); record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles); @@ -213,41 +163,64 @@ public: * * @param[in] streambuf XRootD SSI stream object to push records to * @param[in] requestType The type of failed request (archive or retrieve) + * + * @retval true Stream buffer is full and ready to send + * @retval false Stream buffer is not full */ - void pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, admin::RequestType requestType) { - Data record; - - switch(requestType) { - case admin::RequestType::ARCHIVE_REQUEST: - record.mutable_frls_item()->set_tapepool("tapepool"); break; - case admin::RequestType::RETRIEVE_REQUEST: - record.mutable_frls_item()->mutable_tf()->set_vid("vid"); break; - default: - throw exception::Exception("Unrecognised RequestType: " + std::to_string(requestType)); - } - record.mutable_frls_item()->set_request_type(requestType); - record.mutable_frls_item()->set_copy_nb(1); - record.mutable_frls_item()->mutable_requester()->set_username("u"); - record.mutable_frls_item()->mutable_requester()->set_groupname("g"); - record.mutable_frls_item()->mutable_af()->mutable_df()->set_path("/path/"); - //record.mutable_frls()->set_failurelogs(... - streambuf->Push(record); - } + template<typename QueueType> + bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const std::string &qid, const QueueType &item); private: - Scheduler &m_scheduler; //!< Reference to CTA Scheduler + /*! + * Map queue type to RequestType + * + * @return RequestType for the template specialisation + */ + template<typename QueueType> + admin::RequestType getRequestType(const QueueType &item); - OStoreDB::ArchiveQueueItor_t m_archiveQueueItor; //!< Archive Queue Iterator + Scheduler &m_scheduler; //!< Reference to CTA Scheduler + OStoreDB::ArchiveQueueItor_t m_archiveQueueItor; //!< Archive Queue Iterator OStoreDB::RetrieveQueueItor_t m_retrieveQueueItor; //!< Retrieve Queue Iterator - - bool m_isArchive; //!< List failed archive requests - bool m_isRetrieve; //!< List failed retrieve requests bool m_isLogEntries; //!< Show failure log messages (verbose) - bool m_isSummary; //!< Short summary of number of failures - + bool m_isSummary; //!< Show only summary of items in the failed queues log::LogContext &m_lc; //!< Reference to CTA Log Context static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for SSI log messages }; + + +template<typename QueueType> +bool FailedRequestLsStream::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const std::string &qid, const QueueType &item) +{ + Data record; + + record.mutable_frls_item()->set_request_type(getRequestType(item)); +#if 0 + switch(requestType) { + case admin::RequestType::ARCHIVE_REQUEST: + record.mutable_frls_item()->set_tapepool("tapepool"); break; + case admin::RequestType::RETRIEVE_REQUEST: + record.mutable_frls_item()->mutable_tf()->set_vid("vid"); break; + default: + throw exception::Exception("Unrecognised RequestType: " + std::to_string(requestType)); + } + record.mutable_frls_item()->set_copy_nb(1); + record.mutable_frls_item()->mutable_requester()->set_username("u"); + record.mutable_frls_item()->mutable_requester()->set_groupname("g"); + record.mutable_frls_item()->mutable_af()->mutable_df()->set_path("/path/"); + //record.mutable_frls()->set_failurelogs(... +#endif + return streambuf->Push(record); +} + + + +template<> +admin::RequestType FailedRequestLsStream::getRequestType(const common::dataStructures::ArchiveJob &item) { return admin::RequestType::ARCHIVE_REQUEST; } + +template<> +admin::RequestType FailedRequestLsStream::getRequestType(const common::dataStructures::RetrieveJob &item) { return admin::RequestType::RETRIEVE_REQUEST; } + }} // namespace cta::xrd diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index a8b817ff1b7d223455e4dc7ff9b1d7967848d3d0..4a20b54d95ad53d6861909ac93fbdd69d45df858 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -1044,17 +1044,13 @@ void RequestMessage::processFailedRequest_Ls(const cta::admin::AdminCmd &admincm throw cta::exception::UserError("--log and --summary are mutually exclusive"); } - bool is_archive = !has_flag(OptionBoolean::JUSTRETRIEVE); - bool is_retrieve = !has_flag(OptionBoolean::JUSTARCHIVE); - - std::string tapepool = is_archive ? "" : "INVALID_TAPEPOOL"; - std::string vid = is_retrieve ? "" : "INVALID_VID"; + std::string tapepool = has_flag(OptionBoolean::JUSTRETRIEVE) ? "" : "INVALID_TAPEPOOL"; + std::string vid = has_flag(OptionBoolean::JUSTARCHIVE) ? "" : "INVALID_VID"; // Create a XrdSsi stream object to return the results stream = new FailedRequestLsStream(m_scheduler, m_scheddb.getArchiveJobItor(tapepool), m_scheddb.getRetrieveJobItor(vid), - is_archive, is_retrieve, has_flag(OptionBoolean::SHOW_LOG_ENTRIES), - has_flag(OptionBoolean::SUMMARY), m_lc); + has_flag(OptionBoolean::SHOW_LOG_ENTRIES), has_flag(OptionBoolean::SUMMARY), m_lc); // Should the client display column headers? if(has_flag(OptionBoolean::SHOW_HEADER)) {