diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 0361133acbb9e2688f6ae9ae1b9667a5f66b329b..b2fb41a7180ba7e781f5f96eafcf3244ead56393 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -693,6 +693,24 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::getNextArch } } +//------------------------------------------------------------------------------ +// OStoreDB::getArchiveJobsFailedSummary +//------------------------------------------------------------------------------ +SchedulerDatabase::JobsFailedSummary OStoreDB::getArchiveJobsFailedSummary(log::LogContext &logContext) { + RootEntry re(m_objectStore); + re.fetchNoLock(); + + SchedulerDatabase::JobsFailedSummary ret; + auto queueList = re.dumpArchiveQueues(QueueType::FailedJobs); + for(auto &aj : queueList) { + ArchiveQueue aq(aj.address, m_objectStore); + auto summary = aq.getCandidateSummary(); + ret.totalFiles += summary.candidateFiles; + ret.totalBytes += summary.candidateBytes; + } + return ret; +} + //------------------------------------------------------------------------------ // OStoreDB::setArchiveJobBatchReported() //------------------------------------------------------------------------------ @@ -1115,6 +1133,24 @@ getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logCo } } +//------------------------------------------------------------------------------ +// OStoreDB::getRetrieveJobsFailedSummary +//------------------------------------------------------------------------------ +SchedulerDatabase::JobsFailedSummary OStoreDB::getRetrieveJobsFailedSummary(log::LogContext &logContext) { + RootEntry re(m_objectStore); + re.fetchNoLock(); + + SchedulerDatabase::JobsFailedSummary ret; + auto queueList = re.dumpRetrieveQueues(QueueType::FailedJobs); + for(auto &rj : queueList) { + RetrieveQueue rq(rj.address, m_objectStore); + auto summary = rq.getCandidateSummary(); + ret.totalFiles += summary.candidateFiles; + ret.totalBytes += summary.candidateBytes; + } + return ret; +} + //------------------------------------------------------------------------------ // OStoreDB::getNextRetrieveJobsFailedBatch() //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 9909faa01969a1bea0c243a0cce1166f1910a71b..0e48c8fe6c62fafd7dd976c85f077a7adac76141 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -255,6 +255,8 @@ public: std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > getNextArchiveJobsToReportBatch(uint64_t filesRequested, log::LogContext & logContext) override; + + JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &logContext) override; void setArchiveJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*> & jobsBatch, log::TimingList & timingList, utils::Timer & t, log::LogContext & lc) override; @@ -289,7 +291,9 @@ public: log::TimingList & timingList, utils::Timer & t, log::LogContext & lc) override; std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext) override; - + + JobsFailedSummary getRetrieveJobsFailedSummary(log::LogContext &logContext) override; + /* === Drive state handling ============================================== */ /** * Get states of all drives. diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index 5e394962827dcb23506315e13adc4c35e14586cd..b6020bfa77a68098c2665bc9c9098958a074b989 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -138,6 +138,10 @@ public: return m_OStoreDB.getNextArchiveJobsToReportBatch(filesRequested, lc); } + JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &lc) override { + return m_OStoreDB.getArchiveJobsFailedSummary(lc); + } + std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &lc) override { return m_OStoreDB.getNextRetrieveJobsToReportBatch(filesRequested, lc); } @@ -146,6 +150,10 @@ public: return m_OStoreDB.getNextRetrieveJobsFailedBatch(filesRequested, lc); } + JobsFailedSummary getRetrieveJobsFailedSummary(log::LogContext &lc) override { + return m_OStoreDB.getRetrieveJobsFailedSummary(lc); + } + void setArchiveJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::TimingList & timingList, utils::Timer & t, log::LogContext& lc) override { m_OStoreDB.setArchiveJobBatchReported(jobsBatch, timingList, t, lc); diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 768aaac4c4159d494ccde359d8b875cdca411aab..bff62821b9d8e969de131b20d8cd83ad7d32738c 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -1079,6 +1079,13 @@ std::list<std::unique_ptr<ArchiveJob> > Scheduler::getNextArchiveJobsToReportBat return ret; } +//------------------------------------------------------------------------------ +// getArchiveJobsFailedSummary +//------------------------------------------------------------------------------ +SchedulerDatabase::JobsFailedSummary Scheduler::getArchiveJobsFailedSummary(log::LogContext &logContext) { + return m_db.getArchiveJobsFailedSummary(logContext); +} + //------------------------------------------------------------------------------ // getNextRetrieveJobsToReportBatch //------------------------------------------------------------------------------ @@ -1114,6 +1121,13 @@ getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logCont return ret; } +//------------------------------------------------------------------------------ +// getRetrieveJobsFailedSummary +//------------------------------------------------------------------------------ +SchedulerDatabase::JobsFailedSummary Scheduler::getRetrieveJobsFailedSummary(log::LogContext &logContext) { + return m_db.getRetrieveJobsFailedSummary(logContext); +} + //------------------------------------------------------------------------------ // reportArchiveJobsBatch //------------------------------------------------------------------------------ diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 7f00f61e1c54d073433cb66334c16b9aae0516d9..3ff9f9ba6753c2a583862524be533f9741d75cec 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -74,7 +74,7 @@ class RetrieveJob; class Scheduler { public: - + /** * Constructor. */ @@ -224,7 +224,6 @@ public: const std::string &driveName, const std::string &vid, const uint64_t numberOfFiles, const uint64_t fileSize, const cta::common::dataStructures::TestSourceType testSourceType) const; - std::map<std::string, std::list<cta::common::dataStructures::ArchiveJob> > getPendingArchiveJobs(log::LogContext &lc) const; std::list<cta::common::dataStructures::ArchiveJob> getPendingArchiveJobs(const std::string &tapePoolName, log::LogContext &lc) const; std::map<std::string, std::list<cta::common::dataStructures::RetrieveJob> > getPendingRetrieveJobs(log::LogContext &lc) const; @@ -319,7 +318,7 @@ public: */ std::list<common::dataStructures::QueueAndMountSummary> getQueuesAndMountSummaries(log::LogContext & lc); - /*============== Archive reporting support =================================*/ + /*======================== Archive reporting support =======================*/ /** * Batch job factory * @@ -335,7 +334,10 @@ public: void reportArchiveJobsBatch(std::list<std::unique_ptr<ArchiveJob>> & archiveJobsBatch, eos::DiskReporterFactory & reporterFactory, log::TimingList&, utils::Timer &, log::LogContext &); - /* ============================== Retrieve reporting support ============================== */ + /*======================= Failed archive jobs support ======================*/ + SchedulerDatabase::JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &lc); + + /*======================= Retrieve reporting support =======================*/ /*! * Batch job factory * @@ -364,9 +366,11 @@ public: */ std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext); - -public: - /*============== Administrator management ==================================*/ + + /*====================== Failed retrieve jobs support ======================*/ + SchedulerDatabase::JobsFailedSummary getRetrieveJobsFailedSummary(log::LogContext &lc); + + /*======================== Administrator management ========================*/ void authorizeAdmin(const cta::common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc); private: diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 06802714827e4cd23a6d267aab3fdae89c80812c..7187b0905b4be6383b1868d0cb5ecbfa45739367 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -192,7 +192,16 @@ public: */ virtual std::list<std::unique_ptr<ArchiveJob>> getNextArchiveJobsToReportBatch(uint64_t filesRequested, log::LogContext & logContext) = 0; - + + /*======================= Failed archive jobs support ======================*/ + struct JobsFailedSummary { + JobsFailedSummary(uint64_t f = 0, uint64_t b = 0) : totalFiles(f), totalBytes(b) {} + uint64_t totalFiles; + uint64_t totalBytes; + }; + + virtual JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &logContext) = 0; + /** * Set a batch of jobs as reported (modeled on ArchiveMount::setJobBatchSuccessful(). * @param jobsBatch @@ -379,6 +388,8 @@ public: virtual std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext) = 0; + virtual JobsFailedSummary getRetrieveJobsFailedSummary(log::LogContext &logContext) = 0; + /*============ Label management: user side =================================*/ // TODO diff --git a/xroot_plugins/XrdCtaFailedRequestLs.hpp b/xroot_plugins/XrdCtaFailedRequestLs.hpp index f7ceca729137b881b74b25b471f3f9248ba79fe7..01c919e5b52bec976ad0ccd010768f9897ffae87 100644 --- a/xroot_plugins/XrdCtaFailedRequestLs.hpp +++ b/xroot_plugins/XrdCtaFailedRequestLs.hpp @@ -19,10 +19,8 @@ #pragma once #include <XrdSsiPbOStreamBuffer.hpp> -#include <objectstore/ArchiveQueue.hpp> -#include <objectstore/RetrieveQueue.hpp> #include <scheduler/Scheduler.hpp> -#include <scheduler/RetrieveJob.hpp> +//#include <scheduler/RetrieveJob.hpp> @@ -35,13 +33,14 @@ class FailedRequestLsStream : public XrdSsiStream { public: FailedRequestLsStream(Scheduler &scheduler, bool is_archive, bool is_retrieve, - bool is_log_entries, bool is_summary) : + bool is_log_entries, bool is_summary, cta::log::LogContext &lc) : XrdSsiStream(XrdSsiStream::isActive), m_scheduler(scheduler), m_isArchive(is_archive), m_isRetrieve(is_retrieve), m_isLogEntries(is_log_entries), - m_isSummary(is_summary) + m_isSummary(is_summary), + m_lc(lc) { XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "FailedRequestLsStream() constructor"); } @@ -88,6 +87,7 @@ public: GetBuffSummary(streambuf); dlen = streambuf->Size(); last = true; + XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); return streambuf; } #if 0 @@ -141,7 +141,10 @@ public: dlen = streambuf->Size(); XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); } catch(cta::exception::Exception &ex) { - throw std::runtime_error(ex.getMessage().str()); + std::ostringstream errMsg; + errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what(); + eInfo.Set(errMsg.str().c_str(), ECANCELED); + delete streambuf; } catch(std::exception &ex) { std::ostringstream errMsg; errMsg << __FUNCTION__ << " failed: " << ex.what(); @@ -156,38 +159,31 @@ public: return streambuf; } -#if 0 - // failed archive jobs - auto archive_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc); - responseTable.push_back({ "archive", std::to_string(archive_summary.candidateFiles), std::to_string(archive_summary.candidateBytes) }); - - // failed retrieve jobs - auto retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc); - responseTable.push_back({ "retrieve", std::to_string(retrieve_summary.candidateFiles), std::to_string(retrieve_summary.candidateBytes) }); -#endif void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf) { - - cta::objectstore::ArchiveQueue::CandidateJobList archive_summary; - cta::objectstore::RetrieveQueue::CandidateJobList retrieve_summary; - - Data record; + SchedulerDatabase::JobsFailedSummary archive_summary; + SchedulerDatabase::JobsFailedSummary retrieve_summary; if(m_isArchive) { + Data record; + archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc); record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::ARCHIVE_REQUEST); - record.mutable_frls_summary()->set_total_files(archive_summary.candidateFiles); - record.mutable_frls_summary()->set_total_size(archive_summary.candidateBytes); + record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles); + record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes); streambuf->Push(record); } if(m_isRetrieve) { + Data record; + retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc); record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::RETRIEVE_REQUEST); - record.mutable_frls_summary()->set_total_files(retrieve_summary.candidateFiles); - record.mutable_frls_summary()->set_total_size(retrieve_summary.candidateBytes); + record.mutable_frls_summary()->set_total_files(retrieve_summary.totalFiles); + record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes); streambuf->Push(record); } if(m_isArchive && m_isRetrieve) { + Data record; record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::TOTAL); - record.mutable_frls_summary()->set_total_files(archive_summary.candidateFiles + retrieve_summary.candidateFiles); - record.mutable_frls_summary()->set_total_size(archive_summary.candidateBytes + retrieve_summary.candidateBytes); + record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles); + record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes + retrieve_summary.totalBytes); streambuf->Push(record); } @@ -195,14 +191,16 @@ public: } private: - Scheduler &m_scheduler; //!< Reference to CTA Scheduler + cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler bool m_isArchive; //!< List failed archive requests bool m_isRetrieve; //!< List failed retrieve requests bool m_isLogEntries; //!< Show failure log messages (verbose) bool m_isSummary; //!< Short summary of number of failures - static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for log messages + cta::log::LogContext &m_lc; //!< Reference to CTA Log Context + + static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for SSI log messages }; }} // namespace cta::xrd diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp index 61b6d8f8028bfb533e4f5425f3e1ac386c98775a..1820f649737ccdcbc4f26a6b961e4743a5eac80e 100644 --- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp +++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp @@ -1049,7 +1049,7 @@ void RequestMessage::processFailedRequest_Ls(const cta::admin::AdminCmd &admincm // Create a XrdSsi stream object to return the results stream = new FailedRequestLsStream(m_scheduler, is_archive, is_retrieve, - has_flag(OptionBoolean::SHOW_LOG_ENTRIES), has_flag(OptionBoolean::SUMMARY)); + has_flag(OptionBoolean::SHOW_LOG_ENTRIES), has_flag(OptionBoolean::SUMMARY), m_lc); // Should the client display column headers? if(has_flag(OptionBoolean::SHOW_HEADER)) {