From 1486f5701691ebe4e329c4c1f9379db7d1022c33 Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Wed, 6 Feb 2019 15:49:00 +0100 Subject: [PATCH] [os-failedrequests] Changes QueueItors to unique_ptr Previously these were references, but sometimes we want only an archive queue itor or only a retrieve queue itor. Now we can set the unused itors to nullptr. --- scheduler/OStoreDB/OStoreDB.cpp | 15 ++ scheduler/OStoreDB/OStoreDB.hpp | 6 + xroot_plugins/XrdCtaFailedRequestLs.hpp | 221 ++++++++++++---------- xroot_plugins/XrdSsiCtaRequestMessage.cpp | 14 +- 4 files changed, 155 insertions(+), 101 deletions(-) diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index d1724b3209..55bd1a3e3b 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -654,6 +654,13 @@ OStoreDB::ArchiveQueueItor_t OStoreDB::getArchiveJobItor(const std::string &tape { return ArchiveQueueItor_t(m_objectStore, queueType, tapePoolName); } +//------------------------------------------------------------------------------ +// OStoreDB::getArchiveJobItorPtr() +//------------------------------------------------------------------------------ +OStoreDB::ArchiveQueueItor_t* OStoreDB::getArchiveJobItorPtr(const std::string &tapePoolName, QueueType queueType) const +{ + return new ArchiveQueueItor_t(m_objectStore, queueType, tapePoolName); +} //------------------------------------------------------------------------------ // OStoreDB::getNextArchiveJobsToReportBatch() @@ -1106,6 +1113,14 @@ OStoreDB::RetrieveQueueItor_t OStoreDB::getRetrieveJobItor(const std::string &vi return RetrieveQueueItor_t(m_objectStore, queueType, vid); } +//------------------------------------------------------------------------------ +// OStoreDB::getRetrieveJobItorPtr() +//------------------------------------------------------------------------------ +OStoreDB::RetrieveQueueItor_t* OStoreDB::getRetrieveJobItorPtr(const std::string &vid, QueueType queueType) const +{ + return new RetrieveQueueItor_t(m_objectStore, queueType, vid); +} + //------------------------------------------------------------------------------ // OStoreDB::getNextRetrieveJobsToReportBatch() //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 707b2c607b..b060a5166d 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -254,6 +254,9 @@ public: ArchiveQueueItor_t getArchiveJobItor(const std::string &tapePoolName, objectstore::QueueType queueType = objectstore::QueueType::JobsToTransfer) const; + ArchiveQueueItor_t* getArchiveJobItorPtr(const std::string &tapePoolName, + objectstore::QueueType queueType = objectstore::QueueType::JobsToTransfer) const; + std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > getNextArchiveJobsToReportBatch(uint64_t filesRequested, log::LogContext & logContext) override; @@ -288,6 +291,9 @@ public: RetrieveQueueItor_t getRetrieveJobItor(const std::string &vid, objectstore::QueueType queueType = objectstore::QueueType::JobsToTransfer) const; + RetrieveQueueItor_t* getRetrieveJobItorPtr(const std::string &vid, + objectstore::QueueType queueType = objectstore::QueueType::JobsToTransfer) const; + std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logContext) override; void setRetrieveJobBatchReported(std::list<cta::SchedulerDatabase::RetrieveJob*> & jobsBatch, diff --git a/xroot_plugins/XrdCtaFailedRequestLs.hpp b/xroot_plugins/XrdCtaFailedRequestLs.hpp index 39edc12bd4..ac29bc1b04 100644 --- a/xroot_plugins/XrdCtaFailedRequestLs.hpp +++ b/xroot_plugins/XrdCtaFailedRequestLs.hpp @@ -32,20 +32,33 @@ namespace cta { namespace xrd { class FailedRequestLsStream : public XrdSsiStream { public: - FailedRequestLsStream(Scheduler &scheduler, OStoreDB::ArchiveQueueItor_t archiveQueueItor, - OStoreDB::RetrieveQueueItor_t retrieveQueueItor, bool is_log_entries, bool is_summary, + /*! + * Constructor + * + * @param[in] scheduler CTA Scheduler + * @param[in] archiveQueueItorPtr Pointer to the Archive Queue iterator + * @param[in] retrieveQueueItorPtr Pointer to the Retrieve Queue iterator + * @param[in] is_summary Show only a summary of failed jobs + * @param[in] is_log_entries Include log messages in output (verbose) + * @param[in] lc CTA Log Context + */ + FailedRequestLsStream(Scheduler &scheduler, OStoreDB::ArchiveQueueItor_t *archiveQueueItorPtr, + OStoreDB::RetrieveQueueItor_t *retrieveQueueItorPtr, bool is_summary, bool is_log_entries, log::LogContext &lc) : XrdSsiStream(XrdSsiStream::isActive), m_scheduler(scheduler), - m_archiveQueueItor(std::move(archiveQueueItor)), - m_retrieveQueueItor(std::move(retrieveQueueItor)), - m_isLogEntries(is_log_entries), + m_archiveQueueItorPtr(archiveQueueItorPtr), + m_retrieveQueueItorPtr(retrieveQueueItorPtr), m_isSummary(is_summary), + m_isLogEntries(is_log_entries), m_lc(lc) { XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "FailedRequestLsStream() constructor"); } + /*! + * Destructor + */ virtual ~FailedRequestLsStream() { XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "~FailedRequestLsStream() destructor"); } @@ -69,94 +82,17 @@ public: * last = true: No more data remains. * last = false: A fatal error occurred, eRef has the reason. */ - virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override { - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); - - XrdSsiPb::OStreamBuffer<Data> *streambuf; - - try { - if(m_isSummary) { - // Special handling for --summary option - streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); - GetBuffSummary(streambuf); - last = true; - } else if(!m_archiveQueueItor.end()) { - // List failed archive requests - streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); - - for(bool is_buffer_full = false; !m_archiveQueueItor.end() && !is_buffer_full; ++m_archiveQueueItor) { - is_buffer_full = pushRecord(streambuf, m_archiveQueueItor.qid(), *m_archiveQueueItor); - } - } else if(!m_retrieveQueueItor.end()) { - // List failed retrieve requests - streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); - - for(bool is_buffer_full = false; !m_retrieveQueueItor.end() && !is_buffer_full; ++m_retrieveQueueItor) { - is_buffer_full = pushRecord(streambuf, m_retrieveQueueItor.qid(), *m_retrieveQueueItor); - } - } else { - // Nothing more to send, close the stream - last = true; - return nullptr; - } - dlen = streambuf->Size(); - XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); - return streambuf; - } catch(exception::Exception &ex) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); - eInfo.Set(errMsg.str().c_str(), ECANCELED); - } catch(std::exception &ex) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: " << ex.what(); - eInfo.Set(errMsg.str().c_str(), ECANCELED); - } catch(...) { - std::ostringstream errMsg; - errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; - eInfo.Set(errMsg.str().c_str(), ECANCELED); - } - delete streambuf; - return nullptr; - } + virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override; +private: /*! * Populate the failed queue summary * - * @param[in] streambuf XRootD SSI stream object to push records to + * @param[in] streambuf XRootD SSI stream object to push records to + * @param[in] isArchiveJobs Include summary of archive jobs in the output + * @param[in] isRetrieveJobs Include summary of retrieve jobs in the output */ - void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf) { - SchedulerDatabase::JobsFailedSummary archive_summary; - SchedulerDatabase::JobsFailedSummary retrieve_summary; - - bool isArchive = !m_archiveQueueItor.end(); - bool isRetrieve = !m_retrieveQueueItor.end(); - - if(isArchive) { - Data record; - archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc); - record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST); - record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles); - record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes); - streambuf->Push(record); - } - if(isRetrieve) { - Data record; - retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc); - record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST); - record.mutable_frls_summary()->set_total_files(retrieve_summary.totalFiles); - record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes); - streambuf->Push(record); - } - if(isArchive && isRetrieve) { - Data record; - record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL); - record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles); - record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes + retrieve_summary.totalBytes); - streambuf->Push(record); - } - - m_isSummary = false; - } + void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, bool isRetrieveJobs); /*! * Add a record to the stream @@ -170,7 +106,6 @@ public: template<typename QueueType> bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const std::string &qid, const QueueType &item); -private: /*! * Map queue type to RequestType * @@ -179,18 +114,112 @@ private: template<typename QueueType> admin::RequestType getRequestType(const QueueType &item); - Scheduler &m_scheduler; //!< Reference to CTA Scheduler - OStoreDB::ArchiveQueueItor_t m_archiveQueueItor; //!< Archive Queue Iterator - OStoreDB::RetrieveQueueItor_t m_retrieveQueueItor; //!< Retrieve Queue Iterator - bool m_isLogEntries; //!< Show failure log messages (verbose) - bool m_isSummary; //!< Show only summary of items in the failed queues - log::LogContext &m_lc; //!< Reference to CTA Log Context + Scheduler &m_scheduler; //!< Reference to CTA Scheduler + std::unique_ptr<OStoreDB::ArchiveQueueItor_t> m_archiveQueueItorPtr; //!< Archive Queue Iterator + std::unique_ptr<OStoreDB::RetrieveQueueItor_t> m_retrieveQueueItorPtr; //!< Retrieve Queue Iterator + bool m_isSummary; //!< Show only summary of items in the failed queues + bool m_isLogEntries; //!< Show failure log messages (verbose) + log::LogContext &m_lc; //!< Reference to CTA Log Context - static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for SSI log messages + static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for SSI log messages }; +XrdSsiStream::Buffer* FailedRequestLsStream::GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) +{ + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); + + XrdSsiPb::OStreamBuffer<Data> *streambuf; + + try { + bool isArchiveJobs = m_archiveQueueItorPtr && !m_archiveQueueItorPtr->end(); + bool isRetrieveJobs = m_retrieveQueueItorPtr && !m_retrieveQueueItorPtr->end(); + + if(!(m_isSummary || isArchiveJobs || isRetrieveJobs)) { + // Nothing more to send, close the stream + last = true; + return nullptr; + } + + // Initialise the stream buffer + streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen); + + if(m_isSummary) { + // Special handling for --summary option + GetBuffSummary(streambuf, isArchiveJobs, isRetrieveJobs); + last = true; + } else { + // List failed archive requests + if(isArchiveJobs) { + for(bool is_buffer_full = false; !m_archiveQueueItorPtr->end() && !is_buffer_full; ++*m_archiveQueueItorPtr) { + is_buffer_full = pushRecord(streambuf, m_archiveQueueItorPtr->qid(), **m_archiveQueueItorPtr); + } + } + // List failed retrieve requests + if(isRetrieveJobs) { + for(bool is_buffer_full = false; !m_retrieveQueueItorPtr->end() && !is_buffer_full; ++*m_retrieveQueueItorPtr) { + is_buffer_full = pushRecord(streambuf, m_retrieveQueueItorPtr->qid(), **m_retrieveQueueItorPtr); + } + } + } + dlen = streambuf->Size(); + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); + return streambuf; + } catch(exception::Exception &ex) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); + eInfo.Set(errMsg.str().c_str(), ECANCELED); + } catch(std::exception &ex) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: " << ex.what(); + eInfo.Set(errMsg.str().c_str(), ECANCELED); + } catch(...) { + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; + eInfo.Set(errMsg.str().c_str(), ECANCELED); + } + delete streambuf; + return nullptr; +} + + + +void FailedRequestLsStream::GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, + bool isArchiveJobs, bool isRetrieveJobs) +{ + SchedulerDatabase::JobsFailedSummary archive_summary; + SchedulerDatabase::JobsFailedSummary retrieve_summary; + + if(isArchiveJobs) { + Data record; + archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc); + record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST); + record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles); + record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes); + streambuf->Push(record); + } + if(isRetrieveJobs) { + Data record; + retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc); + record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST); + record.mutable_frls_summary()->set_total_files(retrieve_summary.totalFiles); + record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes); + streambuf->Push(record); + } + if(isArchiveJobs && isRetrieveJobs) { + Data record; + record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL); + record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles); + record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes + retrieve_summary.totalBytes); + streambuf->Push(record); + } + + m_isSummary = false; +} + + + template<typename QueueType> bool FailedRequestLsStream::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const std::string &qid, const QueueType &item) { diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 4a20b54d95..c7adbed8d4 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -1044,13 +1044,17 @@ void RequestMessage::processFailedRequest_Ls(const cta::admin::AdminCmd &admincm throw cta::exception::UserError("--log and --summary are mutually exclusive"); } - std::string tapepool = has_flag(OptionBoolean::JUSTRETRIEVE) ? "" : "INVALID_TAPEPOOL"; - std::string vid = has_flag(OptionBoolean::JUSTARCHIVE) ? "" : "INVALID_VID"; + // These could be added as command options to allow filtering of results to a single queue + std::string tapepool = ""; + std::string vid = ""; + + OStoreDB::ArchiveQueueItor_t *archiveQueueItorPtr = + has_flag(OptionBoolean::JUSTRETRIEVE) ? nullptr : m_scheddb.getArchiveJobItorPtr(tapepool); + OStoreDB::RetrieveQueueItor_t *retrieveQueueItorPtr = + has_flag(OptionBoolean::JUSTARCHIVE) ? nullptr : m_scheddb.getRetrieveJobItorPtr(vid); // Create a XrdSsi stream object to return the results - stream = new FailedRequestLsStream(m_scheduler, - m_scheddb.getArchiveJobItor(tapepool), m_scheddb.getRetrieveJobItor(vid), - has_flag(OptionBoolean::SHOW_LOG_ENTRIES), has_flag(OptionBoolean::SUMMARY), m_lc); + stream = new FailedRequestLsStream(m_scheduler, archiveQueueItorPtr, retrieveQueueItorPtr, has_flag(OptionBoolean::SUMMARY), has_flag(OptionBoolean::SHOW_LOG_ENTRIES), m_lc); // Should the client display column headers? if(has_flag(OptionBoolean::SHOW_HEADER)) { -- GitLab