diff --git a/xroot_plugins/XrdCtaArchiveFileLs.hpp b/xroot_plugins/XrdCtaArchiveFileLs.hpp index 67017598f87952f3a4ae78eb560b4d58844837e1..dccc2cdd8d5868e33e87af462bf16ccfef8e41b7 100644 --- a/xroot_plugins/XrdCtaArchiveFileLs.hpp +++ b/xroot_plugins/XrdCtaArchiveFileLs.hpp @@ -45,12 +45,12 @@ private: */ virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf); - static constexpr const char* const LOG_SUFFIX = "ArchiveFileLsStream"; //!< Identifier for log messages - cta::catalogue::TapeFileSearchCriteria m_searchCriteria; //!< Search criteria catalogue::ArchiveFileItor m_archiveFileItor; //!< Iterator across files which have been archived bool m_isSummary; //!< Full listing or short summary? bool m_isSummaryDone; //!< Summary has been sent + + static constexpr const char* const LOG_SUFFIX = "ArchiveFileLsStream"; //!< Identifier for log messages }; diff --git a/xroot_plugins/XrdCtaFailedRequestLs.hpp b/xroot_plugins/XrdCtaFailedRequestLs.hpp index 3a36f26f3c467dd1b4a9bd8abf89647f9558d80b..99a13ea91023a80e66c586bde650f42294e641ea 100644 --- a/xroot_plugins/XrdCtaFailedRequestLs.hpp +++ b/xroot_plugins/XrdCtaFailedRequestLs.hpp @@ -18,9 +18,9 @@ #pragma once -#include <XrdSsiPbOStreamBuffer.hpp> -#include <scheduler/Scheduler.hpp> -#include <scheduler/OStoreDB/OStoreDB.hpp> +#include <xroot_plugins/XrdCtaStream.hpp> +#include <xroot_plugins/XrdSsiCtaRequestMessage.hpp> +//#include <scheduler/OStoreDB/OStoreDB.hpp> @@ -29,70 +29,42 @@ namespace cta { namespace xrd { /*! * Stream object which implements "failedrequest ls" command. */ -class FailedRequestLsStream : public XrdSsiStream +class FailedRequestLsStream : public XrdCtaStream { public: /*! * Constructor * + * @param[in] requestMsg RequestMessage containing command-line arguments + * @param[in] catalogue CTA Catalogue * @param[in] scheduler CTA Scheduler - * @param[in] archiveQueueItorPtr Pointer to the Archive Queue iterator - * @param[in] retrieveQueueItorPtr Pointer to the Retrieve Queue iterator - * @param[in] is_summary Show only a summary of failed jobs - * @param[in] is_log_entries Include log messages in output (verbose) + * @param[in] schedDb CTA ObjectStore * @param[in] lc CTA Log Context */ - FailedRequestLsStream(Scheduler &scheduler, OStoreDB::ArchiveQueueItor_t *archiveQueueItorPtr, - OStoreDB::RetrieveQueueItor_t *retrieveQueueItorPtr, bool is_summary, bool is_log_entries, - log::LogContext &lc) : - XrdSsiStream(XrdSsiStream::isActive), - m_scheduler(scheduler), - m_archiveQueueItorPtr(archiveQueueItorPtr), - m_retrieveQueueItorPtr(retrieveQueueItorPtr), - m_isSummary(is_summary), - m_isLogEntries(is_log_entries), - m_lc(lc) - { - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "FailedRequestLsStream() constructor"); - } + FailedRequestLsStream(const RequestMessage &requestMsg, cta::catalogue::Catalogue &catalogue, + cta::Scheduler &scheduler, OStoreDB &schedDb, log::LogContext &lc); +private: /*! - * Destructor + * Can we close the stream? */ - virtual ~FailedRequestLsStream() { - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "~FailedRequestLsStream() destructor"); + virtual bool isDone() const { + return m_isSummary ? m_isSummaryDone + : !isArchiveJobs() && !isRetrieveJobs(); } - /*! - * Synchronously obtain data from an active stream - * - * Active streams can only exist on the server-side. This XRootD SSI Stream class is marked as an - * active stream in the constructor. - * - * @param[out] eInfo The object to receive any error description. - * @param[in,out] dlen input: the optimal amount of data wanted (this is a hint) - * output: the actual amount of data returned in the buffer. - * @param[in,out] last input: should be set to false. - * output: if true it indicates that no more data remains to be returned - * either for this call or on the next call. - * - * @return Pointer to the Buffer object that contains a pointer to the the data (see below). The - * buffer must be returned to the stream using Buffer::Recycle(). The next member is usable. - * @retval 0 No more data remains or an error occurred: - * last = true: No more data remains. - * last = false: A fatal error occurred, eRef has the reason. - */ - virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override; + bool isArchiveJobs() const { + return m_archiveQueueItorPtr && !m_archiveQueueItorPtr->end(); + } + + bool isRetrieveJobs() const { + return m_retrieveQueueItorPtr && !m_retrieveQueueItorPtr->end(); + } -private: /*! - * Populate the failed queue summary - * - * @param[in] streambuf XRootD SSI stream object to push records to - * @param[in] isArchiveJobs Include summary of archive jobs in the output - * @param[in] isRetrieveJobs Include summary of retrieve jobs in the output + * Fill the buffer */ - void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, bool isRetrieveJobs); + virtual int fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf); /*! * Add a record to the stream @@ -106,12 +78,18 @@ private: template<typename QueueType> bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const QueueType &item); - // Member variables + /*! + * Populate the failed queue summary + * + * @param[in] streambuf XRootD SSI stream object to push records to + */ + void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf); + - Scheduler &m_scheduler; //!< Reference to CTA Scheduler std::unique_ptr<OStoreDB::ArchiveQueueItor_t> m_archiveQueueItorPtr; //!< Archive Queue Iterator std::unique_ptr<OStoreDB::RetrieveQueueItor_t> m_retrieveQueueItorPtr; //!< Retrieve Queue Iterator bool m_isSummary; //!< Show only summary of items in the failed queues + bool m_isSummaryDone; //!< Summary has been sent bool m_isLogEntries; //!< Show failure log messages (verbose) log::LogContext &m_lc; //!< Reference to CTA Log Context @@ -119,6 +97,36 @@ private: }; +FailedRequestLsStream::FailedRequestLsStream(const RequestMessage &requestMsg, + cta::catalogue::Catalogue &catalogue, cta::Scheduler &scheduler, OStoreDB &schedDb, + log::LogContext &lc) : + XrdCtaStream(catalogue, scheduler), + m_isSummary(requestMsg.has_flag(admin::OptionBoolean::SUMMARY)), + m_isSummaryDone(false), + m_isLogEntries(requestMsg.has_flag(admin::OptionBoolean::SHOW_LOG_ENTRIES)), + m_lc(lc) +{ + using namespace cta::admin; + + if(m_isLogEntries && m_isSummary) { + throw cta::exception::UserError("--log and --summary are mutually exclusive"); + } + + auto tapepool = requestMsg.getOptional(OptionString::TAPE_POOL); + auto vid = requestMsg.getOptional(OptionString::VID); + bool justarchive = requestMsg.has_flag(OptionBoolean::JUSTARCHIVE) || tapepool; + bool justretrieve = requestMsg.has_flag(OptionBoolean::JUSTRETRIEVE) || vid; + + if(justarchive && justretrieve) { + throw cta::exception::UserError("--justarchive/--tapepool and --justretrieve/--vid options are mutually exclusive"); + } + + if(!justretrieve) + m_archiveQueueItorPtr.reset(schedDb.getArchiveJobItorPtr(tapepool ? *tapepool : "", objectstore::JobQueueType::FailedJobs)); + if(!justarchive) + m_retrieveQueueItorPtr.reset(schedDb.getRetrieveJobItorPtr(vid ? *vid : "", objectstore::JobQueueType::FailedJobs)); +} + /*! * pushRecord ArchiveJob specialisation @@ -142,7 +150,6 @@ pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const common::dataStructure } - /*! * pushRecord RetrieveJob specialisation */ @@ -167,73 +174,36 @@ pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const common::dataStructure } - -XrdSsiStream::Buffer* FailedRequestLsStream::GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) -{ - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); - - XrdSsiPb::OStreamBuffer<Data> *streambuf; - - try { - bool isArchiveJobs = m_archiveQueueItorPtr && !m_archiveQueueItorPtr->end(); - bool isRetrieveJobs = m_retrieveQueueItorPtr && !m_retrieveQueueItorPtr->end(); - - if(!(m_isSummary || isArchiveJobs || isRetrieveJobs)) { - // Nothing more to send, close the stream - last = true; - return nullptr; - } - - // Initialise the stream buffer - streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); - - if(m_isSummary) { - // Special handling for --summary option - GetBuffSummary(streambuf, isArchiveJobs, isRetrieveJobs); - last = true; - } else { - // List failed archive requests - if(isArchiveJobs) { - for(bool is_buffer_full = false; !m_archiveQueueItorPtr->end() && !is_buffer_full; ++*m_archiveQueueItorPtr) { - is_buffer_full = pushRecord(streambuf, **m_archiveQueueItorPtr); - } +int FailedRequestLsStream::fillBuffer(XrdSsiPb::OStreamBuffer<Data> *streambuf) { + if(m_isSummary) { + // Special handling for -S (Summary) option + GetBuffSummary(streambuf); + } else { + // List failed archive requests + if(isArchiveJobs()) { + for(bool is_buffer_full = false; !m_archiveQueueItorPtr->end() && !is_buffer_full; ++*m_archiveQueueItorPtr) { + is_buffer_full = pushRecord(streambuf, **m_archiveQueueItorPtr); } - // List failed retrieve requests - if(isRetrieveJobs) { - for(bool is_buffer_full = false; !m_retrieveQueueItorPtr->end() && !is_buffer_full; ++*m_retrieveQueueItorPtr) { - is_buffer_full = pushRecord(streambuf, **m_retrieveQueueItorPtr); - } + } + // List failed retrieve requests + if(isRetrieveJobs()) { + for(bool is_buffer_full = false; !m_retrieveQueueItorPtr->end() && !is_buffer_full; ++*m_retrieveQueueItorPtr) { + is_buffer_full = pushRecord(streambuf, **m_retrieveQueueItorPtr); } } - dlen = streambuf->Size(); - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); - return streambuf; - } catch(exception::Exception &ex) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); - eInfo.Set(errMsg.str().c_str(), ECANCELED); - } catch(std::exception &ex) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: " << ex.what(); - eInfo.Set(errMsg.str().c_str(), ECANCELED); - } catch(...) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; - eInfo.Set(errMsg.str().c_str(), ECANCELED); } - delete streambuf; - return nullptr; + return streambuf->Size(); } void FailedRequestLsStream:: -GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, bool isRetrieveJobs) +GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf) { SchedulerDatabase::JobsFailedSummary archive_summary; SchedulerDatabase::JobsFailedSummary retrieve_summary; - if(isArchiveJobs) { + if(isArchiveJobs()) { Data record; archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc); record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST); @@ -241,7 +211,7 @@ GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, boo record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes); streambuf->Push(record); } - if(isRetrieveJobs) { + if(isRetrieveJobs()) { Data record; retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc); record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST); @@ -249,7 +219,7 @@ GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, boo record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes); streambuf->Push(record); } - if(isArchiveJobs && isRetrieveJobs) { + if(isArchiveJobs() && isRetrieveJobs()) { Data record; record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL); record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles); @@ -257,7 +227,7 @@ GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, boo streambuf->Push(record); } - m_isSummary = false; + m_isSummaryDone = true; } }} // namespace cta::xrd diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 68d6eb6dab9aa845dd1fe40a6589600b5eadbb90..fdbaf658ed2dc2a12290c59d0e78aaa96ad53aad 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -704,18 +704,18 @@ void RequestMessage::processAdmin_Ls(const cta::admin::AdminCmd &admincmd, cta:: void RequestMessage::processArchiveFile_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream) { - using namespace cta::admin; + using namespace cta::admin; - // Create a XrdSsi stream object to return the results - stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler); + // Create a XrdSsi stream object to return the results + stream = new ArchiveFileLsStream(*this, m_catalogue, m_scheduler); - // Should the client display column headers? - if(has_flag(OptionBoolean::SHOW_HEADER)) { - response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY - : HeaderType::ARCHIVEFILE_LS); - } + // Should the client display column headers? + if(has_flag(OptionBoolean::SHOW_HEADER)) { + response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? HeaderType::ARCHIVEFILE_LS_SUMMARY + : HeaderType::ARCHIVEFILE_LS); + } - response.set_type(cta::xrd::Response::RSP_SUCCESS); + response.set_type(cta::xrd::Response::RSP_SUCCESS); } @@ -984,36 +984,17 @@ void RequestMessage::processDrive_Rm(const cta::admin::AdminCmd &admincmd, cta:: void RequestMessage::processFailedRequest_Ls(const cta::admin::AdminCmd &admincmd, cta::xrd::Response &response, XrdSsiStream* &stream) { - using namespace cta::admin; - - if(has_flag(OptionBoolean::SHOW_LOG_ENTRIES) && has_flag(OptionBoolean::SUMMARY)) { - throw cta::exception::UserError("--log and --summary are mutually exclusive"); - } - - auto tapepool = getOptional(OptionString::TAPE_POOL); - auto vid = getOptional(OptionString::VID); - bool justarchive = has_flag(OptionBoolean::JUSTARCHIVE) || tapepool; - bool justretrieve = has_flag(OptionBoolean::JUSTRETRIEVE) || vid; - - if(justarchive && justretrieve) { - throw cta::exception::UserError("--justarchive/--tapepool and --justretrieve/--vid options are mutually exclusive"); - } - - OStoreDB::ArchiveQueueItor_t *archiveQueueItorPtr = justretrieve ? nullptr : - m_scheddb.getArchiveJobItorPtr(tapepool ? *tapepool : "", objectstore::JobQueueType::FailedJobs); - OStoreDB::RetrieveQueueItor_t *retrieveQueueItorPtr = justarchive ? nullptr : - m_scheddb.getRetrieveJobItorPtr(vid ? *vid : "", objectstore::JobQueueType::FailedJobs); + using namespace cta::admin; - // Create a XrdSsi stream object to return the results - stream = new FailedRequestLsStream(m_scheduler, archiveQueueItorPtr, retrieveQueueItorPtr, has_flag(OptionBoolean::SUMMARY), has_flag(OptionBoolean::SHOW_LOG_ENTRIES), m_lc); + stream = new FailedRequestLsStream(*this, m_catalogue, m_scheduler, m_scheddb, m_lc); - // Should the client display column headers? - if(has_flag(OptionBoolean::SHOW_HEADER)) { - response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? - HeaderType::FAILEDREQUEST_LS_SUMMARY : HeaderType::FAILEDREQUEST_LS); - } + // Should the client display column headers? + if(has_flag(OptionBoolean::SHOW_HEADER)) { + response.set_show_header(has_flag(OptionBoolean::SUMMARY) ? + HeaderType::FAILEDREQUEST_LS_SUMMARY : HeaderType::FAILEDREQUEST_LS); + } - response.set_type(cta::xrd::Response::RSP_SUCCESS); + response.set_type(cta::xrd::Response::RSP_SUCCESS); }