Commit 6b8afb6e authored by Michael Davis's avatar Michael Davis
Browse files

[os-failedrequests] Changes QueueItors to unique_ptr

Previously these were references, but sometimes we want only an archive
queue itor or only a retrieve queue itor. Now we can set the unused
itors to nullptr.
parent 643faf09
......@@ -654,6 +654,13 @@ OStoreDB::ArchiveQueueItor_t OStoreDB::getArchiveJobItor(const std::string &tape
{
return ArchiveQueueItor_t(m_objectStore, queueType, tapePoolName);
}
//------------------------------------------------------------------------------
// OStoreDB::getArchiveJobItorPtr()
//------------------------------------------------------------------------------
OStoreDB::ArchiveQueueItor_t* OStoreDB::getArchiveJobItorPtr(const std::string &tapePoolName, QueueType queueType) const
{
return new ArchiveQueueItor_t(m_objectStore, queueType, tapePoolName);
}
//------------------------------------------------------------------------------
// OStoreDB::getNextArchiveJobsToReportBatch()
......@@ -1106,6 +1113,14 @@ OStoreDB::RetrieveQueueItor_t OStoreDB::getRetrieveJobItor(const std::string &vi
return RetrieveQueueItor_t(m_objectStore, queueType, vid);
}
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveJobItorPtr()
//------------------------------------------------------------------------------
OStoreDB::RetrieveQueueItor_t* OStoreDB::getRetrieveJobItorPtr(const std::string &vid, QueueType queueType) const
{
return new RetrieveQueueItor_t(m_objectStore, queueType, vid);
}
//------------------------------------------------------------------------------
// OStoreDB::getNextRetrieveJobsToReportBatch()
//------------------------------------------------------------------------------
......
......@@ -254,6 +254,9 @@ public:
ArchiveQueueItor_t getArchiveJobItor(const std::string &tapePoolName,
objectstore::QueueType queueType = objectstore::QueueType::JobsToTransfer) const;
ArchiveQueueItor_t* getArchiveJobItorPtr(const std::string &tapePoolName,
objectstore::QueueType queueType = objectstore::QueueType::JobsToTransfer) const;
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > getNextArchiveJobsToReportBatch(uint64_t filesRequested,
log::LogContext & logContext) override;
......@@ -288,6 +291,9 @@ public:
RetrieveQueueItor_t getRetrieveJobItor(const std::string &vid,
objectstore::QueueType queueType = objectstore::QueueType::JobsToTransfer) const;
RetrieveQueueItor_t* getRetrieveJobItorPtr(const std::string &vid,
objectstore::QueueType queueType = objectstore::QueueType::JobsToTransfer) const;
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logContext) override;
void setRetrieveJobBatchReported(std::list<cta::SchedulerDatabase::RetrieveJob*> & jobsBatch,
......
......@@ -32,20 +32,33 @@ namespace cta { namespace xrd {
class FailedRequestLsStream : public XrdSsiStream
{
public:
FailedRequestLsStream(Scheduler &scheduler, OStoreDB::ArchiveQueueItor_t archiveQueueItor,
OStoreDB::RetrieveQueueItor_t retrieveQueueItor, bool is_log_entries, bool is_summary,
/*!
* Constructor
*
* @param[in] scheduler CTA Scheduler
* @param[in] archiveQueueItorPtr Pointer to the Archive Queue iterator
* @param[in] retrieveQueueItorPtr Pointer to the Retrieve Queue iterator
* @param[in] is_summary Show only a summary of failed jobs
* @param[in] is_log_entries Include log messages in output (verbose)
* @param[in] lc CTA Log Context
*/
FailedRequestLsStream(Scheduler &scheduler, OStoreDB::ArchiveQueueItor_t *archiveQueueItorPtr,
OStoreDB::RetrieveQueueItor_t *retrieveQueueItorPtr, bool is_summary, bool is_log_entries,
log::LogContext &lc) :
XrdSsiStream(XrdSsiStream::isActive),
m_scheduler(scheduler),
m_archiveQueueItor(std::move(archiveQueueItor)),
m_retrieveQueueItor(std::move(retrieveQueueItor)),
m_isLogEntries(is_log_entries),
m_archiveQueueItorPtr(archiveQueueItorPtr),
m_retrieveQueueItorPtr(retrieveQueueItorPtr),
m_isSummary(is_summary),
m_isLogEntries(is_log_entries),
m_lc(lc)
{
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "FailedRequestLsStream() constructor");
}
/*!
* Destructor
*/
virtual ~FailedRequestLsStream() {
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "~FailedRequestLsStream() destructor");
}
......@@ -69,94 +82,17 @@ public:
* last = true: No more data remains.
* last = false: A fatal error occurred, eRef has the reason.
*/
virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override {
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)");
XrdSsiPb::OStreamBuffer<Data> *streambuf;
try {
if(m_isSummary) {
// Special handling for --summary option
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
GetBuffSummary(streambuf);
last = true;
} else if(!m_archiveQueueItor.end()) {
// List failed archive requests
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
for(bool is_buffer_full = false; !m_archiveQueueItor.end() && !is_buffer_full; ++m_archiveQueueItor) {
is_buffer_full = pushRecord(streambuf, m_archiveQueueItor.qid(), *m_archiveQueueItor);
}
} else if(!m_retrieveQueueItor.end()) {
// List failed retrieve requests
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
for(bool is_buffer_full = false; !m_retrieveQueueItor.end() && !is_buffer_full; ++m_retrieveQueueItor) {
is_buffer_full = pushRecord(streambuf, m_retrieveQueueItor.qid(), *m_retrieveQueueItor);
}
} else {
// Nothing more to send, close the stream
last = true;
return nullptr;
}
dlen = streambuf->Size();
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
return streambuf;
} catch(exception::Exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
} catch(std::exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
} catch(...) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
eInfo.Set(errMsg.str().c_str(), ECANCELED);
}
delete streambuf;
return nullptr;
}
virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) override;
private:
/*!
* Populate the failed queue summary
*
* @param[in] streambuf XRootD SSI stream object to push records to
* @param[in] streambuf XRootD SSI stream object to push records to
* @param[in] isArchiveJobs Include summary of archive jobs in the output
* @param[in] isRetrieveJobs Include summary of retrieve jobs in the output
*/
void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf) {
SchedulerDatabase::JobsFailedSummary archive_summary;
SchedulerDatabase::JobsFailedSummary retrieve_summary;
bool isArchive = !m_archiveQueueItor.end();
bool isRetrieve = !m_retrieveQueueItor.end();
if(isArchive) {
Data record;
archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc);
record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST);
record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles);
record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes);
streambuf->Push(record);
}
if(isRetrieve) {
Data record;
retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc);
record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST);
record.mutable_frls_summary()->set_total_files(retrieve_summary.totalFiles);
record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes);
streambuf->Push(record);
}
if(isArchive && isRetrieve) {
Data record;
record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL);
record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles);
record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes + retrieve_summary.totalBytes);
streambuf->Push(record);
}
m_isSummary = false;
}
void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf, bool isArchiveJobs, bool isRetrieveJobs);
/*!
* Add a record to the stream
......@@ -170,7 +106,6 @@ public:
template<typename QueueType>
bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const std::string &qid, const QueueType &item);
private:
/*!
* Map queue type to RequestType
*
......@@ -179,18 +114,112 @@ private:
template<typename QueueType>
admin::RequestType getRequestType(const QueueType &item);
Scheduler &m_scheduler; //!< Reference to CTA Scheduler
OStoreDB::ArchiveQueueItor_t m_archiveQueueItor; //!< Archive Queue Iterator
OStoreDB::RetrieveQueueItor_t m_retrieveQueueItor; //!< Retrieve Queue Iterator
bool m_isLogEntries; //!< Show failure log messages (verbose)
bool m_isSummary; //!< Show only summary of items in the failed queues
log::LogContext &m_lc; //!< Reference to CTA Log Context
Scheduler &m_scheduler; //!< Reference to CTA Scheduler
std::unique_ptr<OStoreDB::ArchiveQueueItor_t> m_archiveQueueItorPtr; //!< Archive Queue Iterator
std::unique_ptr<OStoreDB::RetrieveQueueItor_t> m_retrieveQueueItorPtr; //!< Retrieve Queue Iterator
bool m_isSummary; //!< Show only summary of items in the failed queues
bool m_isLogEntries; //!< Show failure log messages (verbose)
log::LogContext &m_lc; //!< Reference to CTA Log Context
static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for SSI log messages
static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for SSI log messages
};
XrdSsiStream::Buffer* FailedRequestLsStream::GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last)
{
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)");
XrdSsiPb::OStreamBuffer<Data> *streambuf;
try {
bool isArchiveJobs = m_archiveQueueItorPtr && !m_archiveQueueItorPtr->end();
bool isRetrieveJobs = m_retrieveQueueItorPtr && !m_retrieveQueueItorPtr->end();
if(!(m_isSummary || isArchiveJobs || isRetrieveJobs)) {
// Nothing more to send, close the stream
last = true;
return nullptr;
}
// Initialise the stream buffer
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
if(m_isSummary) {
// Special handling for --summary option
GetBuffSummary(streambuf, isArchiveJobs, isRetrieveJobs);
last = true;
} else {
// List failed archive requests
if(isArchiveJobs) {
for(bool is_buffer_full = false; !m_archiveQueueItorPtr->end() && !is_buffer_full; ++*m_archiveQueueItorPtr) {
is_buffer_full = pushRecord(streambuf, m_archiveQueueItorPtr->qid(), **m_archiveQueueItorPtr);
}
}
// List failed retrieve requests
if(isRetrieveJobs) {
for(bool is_buffer_full = false; !m_retrieveQueueItorPtr->end() && !is_buffer_full; ++*m_retrieveQueueItorPtr) {
is_buffer_full = pushRecord(streambuf, m_retrieveQueueItorPtr->qid(), **m_retrieveQueueItorPtr);
}
}
}
dlen = streambuf->Size();
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
return streambuf;
} catch(exception::Exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
} catch(std::exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
} catch(...) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
eInfo.Set(errMsg.str().c_str(), ECANCELED);
}
delete streambuf;
return nullptr;
}
void FailedRequestLsStream::GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf,
bool isArchiveJobs, bool isRetrieveJobs)
{
SchedulerDatabase::JobsFailedSummary archive_summary;
SchedulerDatabase::JobsFailedSummary retrieve_summary;
if(isArchiveJobs) {
Data record;
archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc);
record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST);
record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles);
record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes);
streambuf->Push(record);
}
if(isRetrieveJobs) {
Data record;
retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc);
record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST);
record.mutable_frls_summary()->set_total_files(retrieve_summary.totalFiles);
record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes);
streambuf->Push(record);
}
if(isArchiveJobs && isRetrieveJobs) {
Data record;
record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL);
record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles);
record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes + retrieve_summary.totalBytes);
streambuf->Push(record);
}
m_isSummary = false;
}
template<typename QueueType>
bool FailedRequestLsStream::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const std::string &qid, const QueueType &item)
{
......
......@@ -1044,13 +1044,17 @@ void RequestMessage::processFailedRequest_Ls(const cta::admin::AdminCmd &admincm
throw cta::exception::UserError("--log and --summary are mutually exclusive");
}
std::string tapepool = has_flag(OptionBoolean::JUSTRETRIEVE) ? "" : "INVALID_TAPEPOOL";
std::string vid = has_flag(OptionBoolean::JUSTARCHIVE) ? "" : "INVALID_VID";
// These could be added as command options to allow filtering of results to a single queue
std::string tapepool = "";
std::string vid = "";
OStoreDB::ArchiveQueueItor_t *archiveQueueItorPtr =
has_flag(OptionBoolean::JUSTRETRIEVE) ? nullptr : m_scheddb.getArchiveJobItorPtr(tapepool);
OStoreDB::RetrieveQueueItor_t *retrieveQueueItorPtr =
has_flag(OptionBoolean::JUSTARCHIVE) ? nullptr : m_scheddb.getRetrieveJobItorPtr(vid);
// Create a XrdSsi stream object to return the results
stream = new FailedRequestLsStream(m_scheduler,
m_scheddb.getArchiveJobItor(tapepool), m_scheddb.getRetrieveJobItor(vid),
has_flag(OptionBoolean::SHOW_LOG_ENTRIES), has_flag(OptionBoolean::SUMMARY), m_lc);
stream = new FailedRequestLsStream(m_scheduler, archiveQueueItorPtr, retrieveQueueItorPtr, has_flag(OptionBoolean::SUMMARY), has_flag(OptionBoolean::SHOW_LOG_ENTRIES), m_lc);
// Should the client display column headers?
if(has_flag(OptionBoolean::SHOW_HEADER)) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment