Commit 643faf09 authored by Michael Davis's avatar Michael Davis
Browse files

[os-failedrequests] Uses QueueItor class to iterate across failed requests

parent 4060188b
/*!
* @project The CERN Tape Archive (CTA)
* @brief CTA Frontend Archive File Ls stream implementation
* @copyright Copyright 2017 CERN
* @brief CTA Frontend Failed Request Ls stream implementation
* @copyright Copyright 2019 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
......@@ -27,20 +27,18 @@
namespace cta { namespace xrd {
/*!
* Stream object which implements "af ls" command.
* Stream object which implements "failedrequest ls" command.
*/
class FailedRequestLsStream : public XrdSsiStream
{
public:
FailedRequestLsStream(Scheduler &scheduler, OStoreDB::ArchiveQueueItor_t archiveQueueItor,
OStoreDB::RetrieveQueueItor_t retrieveQueueItor, bool is_archive, bool is_retrieve,
bool is_log_entries, bool is_summary, log::LogContext &lc) :
OStoreDB::RetrieveQueueItor_t retrieveQueueItor, bool is_log_entries, bool is_summary,
log::LogContext &lc) :
XrdSsiStream(XrdSsiStream::isActive),
m_scheduler(scheduler),
m_archiveQueueItor(std::move(archiveQueueItor)),
m_retrieveQueueItor(std::move(retrieveQueueItor)),
m_isArchive(is_archive),
m_isRetrieve(is_retrieve),
m_isLogEntries(is_log_entries),
m_isSummary(is_summary),
m_lc(lc)
......@@ -82,79 +80,28 @@ public:
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
GetBuffSummary(streambuf);
last = true;
} else if(m_isArchive) {
} else if(!m_archiveQueueItor.end()) {
// List failed archive requests
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
pushRecord(streambuf, admin::RequestType::ARCHIVE_REQUEST);
m_isArchive = false;
} else if(m_isRetrieve) {
for(bool is_buffer_full = false; !m_archiveQueueItor.end() && !is_buffer_full; ++m_archiveQueueItor) {
is_buffer_full = pushRecord(streambuf, m_archiveQueueItor.qid(), *m_archiveQueueItor);
}
} else if(!m_retrieveQueueItor.end()) {
// List failed retrieve requests
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
pushRecord(streambuf, admin::RequestType::RETRIEVE_REQUEST);
/*
Take a look at OStoreDB::getNextRetrieveJobsFailedBatch()
1. new method to get a list of Failed queues
2. for each queue, get a list of jobs (without popping from the queue)
3. have some maximum number of items that can be viewed (how should this be configured, in the config file, on the command line?)
4. pop items off list of jobs, get details, add to stream, while !is_buffer_full
5. when all queues are done and all jobs are done, set m_isRetrieve to false
*/
m_isRetrieve = false;
for(bool is_buffer_full = false; !m_retrieveQueueItor.end() && !is_buffer_full; ++m_retrieveQueueItor) {
is_buffer_full = pushRecord(streambuf, m_retrieveQueueItor.qid(), *m_retrieveQueueItor);
}
} else {
// Nothing more to send, close the stream
last = true;
return nullptr;
}
dlen = streambuf->Size();
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
return streambuf;
#if 0
for(bool is_buffer_full = false; m_archiveFileItor.hasMore() && !is_buffer_full; )
{
const cta::common::dataStructures::FailedRequest archiveFile = m_archiveFileItor.next();
for(auto jt = archiveFile.tapeFiles.cbegin(); jt != archiveFile.tapeFiles.cend(); jt++) {
Data record;
// Copy number
record.mutable_afls_item()->set_copy_nb(jt->first);
// Archive file
auto af = record.mutable_afls_item()->mutable_af();
af->set_archive_id(archiveFile.archiveFileID);
af->set_disk_instance(archiveFile.diskInstance);
af->set_disk_id(archiveFile.diskFileId);
af->set_size(archiveFile.fileSize);
af->mutable_cs()->set_type(archiveFile.checksumType);
af->mutable_cs()->set_value(archiveFile.checksumValue);
af->set_storage_class(archiveFile.storageClass);
af->mutable_df()->set_owner(archiveFile.diskFileInfo.owner);
af->mutable_df()->set_group(archiveFile.diskFileInfo.group);
af->mutable_df()->set_path(archiveFile.diskFileInfo.path);
af->set_creation_time(archiveFile.creationTime);
// Tape file
auto tf = record.mutable_afls_item()->mutable_tf();
tf->set_vid(jt->second.vid);
tf->set_f_seq(jt->second.fSeq);
tf->set_block_id(jt->second.blockId);
// is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
// enough data to send to the client. The actual buffer size is double the block size,
// so we can keep writing a few additional records after is_buffer_full is true. These
// will be sent on the next iteration. If we exceed the hard limit of double the block
// size, Push() will throw an exception.
is_buffer_full = streambuf->Push(record);
}
}
#endif
} catch(exception::Exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what();
......@@ -181,7 +128,10 @@ public:
SchedulerDatabase::JobsFailedSummary archive_summary;
SchedulerDatabase::JobsFailedSummary retrieve_summary;
if(m_isArchive) {
bool isArchive = !m_archiveQueueItor.end();
bool isRetrieve = !m_retrieveQueueItor.end();
if(isArchive) {
Data record;
archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc);
record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST);
......@@ -189,7 +139,7 @@ public:
record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes);
streambuf->Push(record);
}
if(m_isRetrieve) {
if(isRetrieve) {
Data record;
retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc);
record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST);
......@@ -197,7 +147,7 @@ public:
record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes);
streambuf->Push(record);
}
if(m_isArchive && m_isRetrieve) {
if(isArchive && isRetrieve) {
Data record;
record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL);
record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles);
......@@ -213,41 +163,64 @@ public:
*
* @param[in] streambuf XRootD SSI stream object to push records to
* @param[in] requestType The type of failed request (archive or retrieve)
*
* @retval true Stream buffer is full and ready to send
* @retval false Stream buffer is not full
*/
void pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, admin::RequestType requestType) {
Data record;
switch(requestType) {
case admin::RequestType::ARCHIVE_REQUEST:
record.mutable_frls_item()->set_tapepool("tapepool"); break;
case admin::RequestType::RETRIEVE_REQUEST:
record.mutable_frls_item()->mutable_tf()->set_vid("vid"); break;
default:
throw exception::Exception("Unrecognised RequestType: " + std::to_string(requestType));
}
record.mutable_frls_item()->set_request_type(requestType);
record.mutable_frls_item()->set_copy_nb(1);
record.mutable_frls_item()->mutable_requester()->set_username("u");
record.mutable_frls_item()->mutable_requester()->set_groupname("g");
record.mutable_frls_item()->mutable_af()->mutable_df()->set_path("/path/");
//record.mutable_frls()->set_failurelogs(...
streambuf->Push(record);
}
template<typename QueueType>
bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const std::string &qid, const QueueType &item);
private:
Scheduler &m_scheduler; //!< Reference to CTA Scheduler
/*!
* Map queue type to RequestType
*
* @return RequestType for the template specialisation
*/
template<typename QueueType>
admin::RequestType getRequestType(const QueueType &item);
OStoreDB::ArchiveQueueItor_t m_archiveQueueItor; //!< Archive Queue Iterator
Scheduler &m_scheduler; //!< Reference to CTA Scheduler
OStoreDB::ArchiveQueueItor_t m_archiveQueueItor; //!< Archive Queue Iterator
OStoreDB::RetrieveQueueItor_t m_retrieveQueueItor; //!< Retrieve Queue Iterator
bool m_isArchive; //!< List failed archive requests
bool m_isRetrieve; //!< List failed retrieve requests
bool m_isLogEntries; //!< Show failure log messages (verbose)
bool m_isSummary; //!< Short summary of number of failures
bool m_isSummary; //!< Show only summary of items in the failed queues
log::LogContext &m_lc; //!< Reference to CTA Log Context
static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for SSI log messages
};
template<typename QueueType>
bool FailedRequestLsStream::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, const std::string &qid, const QueueType &item)
{
Data record;
record.mutable_frls_item()->set_request_type(getRequestType(item));
#if 0
switch(requestType) {
case admin::RequestType::ARCHIVE_REQUEST:
record.mutable_frls_item()->set_tapepool("tapepool"); break;
case admin::RequestType::RETRIEVE_REQUEST:
record.mutable_frls_item()->mutable_tf()->set_vid("vid"); break;
default:
throw exception::Exception("Unrecognised RequestType: " + std::to_string(requestType));
}
record.mutable_frls_item()->set_copy_nb(1);
record.mutable_frls_item()->mutable_requester()->set_username("u");
record.mutable_frls_item()->mutable_requester()->set_groupname("g");
record.mutable_frls_item()->mutable_af()->mutable_df()->set_path("/path/");
//record.mutable_frls()->set_failurelogs(...
#endif
return streambuf->Push(record);
}
template<>
admin::RequestType FailedRequestLsStream::getRequestType(const common::dataStructures::ArchiveJob &item) { return admin::RequestType::ARCHIVE_REQUEST; }
template<>
admin::RequestType FailedRequestLsStream::getRequestType(const common::dataStructures::RetrieveJob &item) { return admin::RequestType::RETRIEVE_REQUEST; }
}} // namespace cta::xrd
......@@ -1044,17 +1044,13 @@ void RequestMessage::processFailedRequest_Ls(const cta::admin::AdminCmd &admincm
throw cta::exception::UserError("--log and --summary are mutually exclusive");
}
bool is_archive = !has_flag(OptionBoolean::JUSTRETRIEVE);
bool is_retrieve = !has_flag(OptionBoolean::JUSTARCHIVE);
std::string tapepool = is_archive ? "" : "INVALID_TAPEPOOL";
std::string vid = is_retrieve ? "" : "INVALID_VID";
std::string tapepool = has_flag(OptionBoolean::JUSTRETRIEVE) ? "" : "INVALID_TAPEPOOL";
std::string vid = has_flag(OptionBoolean::JUSTARCHIVE) ? "" : "INVALID_VID";
// Create a XrdSsi stream object to return the results
stream = new FailedRequestLsStream(m_scheduler,
m_scheddb.getArchiveJobItor(tapepool), m_scheddb.getRetrieveJobItor(vid),
is_archive, is_retrieve, has_flag(OptionBoolean::SHOW_LOG_ENTRIES),
has_flag(OptionBoolean::SUMMARY), m_lc);
has_flag(OptionBoolean::SHOW_LOG_ENTRIES), has_flag(OptionBoolean::SUMMARY), m_lc);
// Should the client display column headers?
if(has_flag(OptionBoolean::SHOW_HEADER)) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment