Skip to content
Snippets Groups Projects
Commit 9f175254 authored by Michael Davis's avatar Michael Davis
Browse files

[os-failedrequests] Implements "cta-admin fr ls"

parent 12ff9ab6
No related branches found
No related tags found
No related merge requests found
......@@ -451,18 +451,40 @@ void CtaAdminCmd::print(const cta::admin::ArchiveFileLsSummary &afls_summary)
void CtaAdminCmd::printFrLsHeader()
{
std::cout << TEXT_RED
<< std::setfill(' ') << std::setw(11) << std::right << "request type" << ' '
<< std::setfill(' ') << std::setw(7) << std::right << "copy no" << ' '
<< std::setfill(' ') << std::setw(7) << std::right << "vid" << ' '
<< std::setfill(' ') << std::setw(8) << std::right << "requester" << ' '
<< std::setfill(' ') << std::setw(8) << std::right << "group" << ' '
<< std::setfill(' ') << std::setw(12) << std::right << "request type" << ' '
<< std::setfill(' ') << std::setw(8) << std::right << "copy no" << ' '
<< std::setfill(' ') << std::setw(13) << std::right << "tapepool/vid" << ' '
<< std::setfill(' ') << std::setw(10) << std::right << "requester" << ' '
<< std::setfill(' ') << std::setw(6) << std::right << "group" << ' '
<< "path"
<< TEXT_NORMAL << std::endl;
}
void CtaAdminCmd::print(const cta::admin::FailedRequestLsItem &frls_item)
{
throw std::runtime_error("Not implemented.");
std::string request_type;
std::string tapepool_vid;
switch(frls_item.request_type()) {
case admin::RequestType::ARCHIVE_REQUEST:
request_type = "archive";
tapepool_vid = frls_item.tapepool();
break;
case admin::RequestType::RETRIEVE_REQUEST:
request_type = "retrieve";
tapepool_vid = frls_item.tf().vid();
break;
default:
throw std::runtime_error("Unrecognised request type: " + std::to_string(frls_item.request_type()));
}
std::cout << std::setfill(' ') << std::setw(11) << std::right << request_type << ' '
<< std::setfill(' ') << std::setw(8) << std::right << frls_item.copy_nb() << ' '
<< std::setfill(' ') << std::setw(14) << std::right << tapepool_vid << ' '
<< std::setfill(' ') << std::setw(10) << std::right << frls_item.requester().username() << ' '
<< std::setfill(' ') << std::setw(6) << std::right << frls_item.requester().groupname() << ' '
<< frls_item.af().df().path()
<< std::endl;
}
void CtaAdminCmd::printFrLsSummaryHeader()
......
......@@ -33,7 +33,7 @@ class FailedRequestLsStream : public XrdSsiStream
{
public:
FailedRequestLsStream(Scheduler &scheduler, bool is_archive, bool is_retrieve,
bool is_log_entries, bool is_summary, cta::log::LogContext &lc) :
bool is_log_entries, bool is_summary, log::LogContext &lc) :
XrdSsiStream(XrdSsiStream::isActive),
m_scheduler(scheduler),
m_isArchive(is_archive),
......@@ -74,30 +74,42 @@ public:
XrdSsiPb::OStreamBuffer<Data> *streambuf;
try {
if(!m_isSummary && true) {
// Nothing more to send, close the stream
last = true;
return nullptr;
}
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
// Special handling for -S option
if(m_isSummary) {
GetBuffSummary(streambuf);
dlen = streambuf->Size();
last = true;
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
return streambuf;
// Special handling for --summary option
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
GetBuffSummary(streambuf);
last = true;
} else if(m_isArchive) {
// List failed archive requests
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
//auto aqList = m_scheduler.getArchiveJobsFailedQueues(m_lc);
pushRecord(streambuf, admin::RequestType::ARCHIVE_REQUEST);
} else if(m_isRetrieve) {
// List failed retrieve requests
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
/*
Take a look at OStoreDB::getNextRetrieveJobsFailedBatch()
1. new method to get a list of Failed queues
2. for each queue, get a list of jobs (without popping from the queue)
3. have some maximum number of items that can be viewed (how should this be configured, in the config file, on the command line?)
4. pop items off list of jobs, get details, add to stream, while !is_buffer_full
5. when all queues are done and all jobs are done, set m_isRetrieve to false
*/
} else {
// Nothing more to send, close the stream
last = true;
return nullptr;
}
#if 0
m_scheduler.listQueueItems(m_cliIdentity.username, "failed queue", m_lc);
auto archiveJobFailedList = m_scheduler.getNextArchiveJobsFailedBatch(10,m_lc);
cmdlineOutput << "Failed archive jobs: " << archiveJobFailedList.size() << std::endl;
auto retrieveJobFailedList = m_scheduler.getRetrieveJobsFailedSummary(m_lc);
cmdlineOutput << "Failed retrieve jobs: " << retrieveJobFailedList.size() << std::endl;
#endif
dlen = streambuf->Size();
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
return streambuf;
#if 0
for(bool is_buffer_full = false; m_archiveFileItor.hasMore() && !is_buffer_full; )
{
......@@ -138,27 +150,28 @@ public:
}
}
#endif
dlen = streambuf->Size();
XrdSsiPb::Log::Msg(XrdSsiPb::Log::DEBUG, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data.");
} catch(cta::exception::Exception &ex) {
} catch(exception::Exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught CTA exception: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
} catch(std::exception &ex) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: " << ex.what();
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
} catch(...) {
std::ostringstream errMsg;
errMsg << __FUNCTION__ << " failed: Caught an unknown exception";
eInfo.Set(errMsg.str().c_str(), ECANCELED);
delete streambuf;
}
return streambuf;
delete streambuf;
return nullptr;
}
/*!
* Populate the failed queue summary
*
* @param[in] streambuf XRootD SSI stream object to push records to
*/
void GetBuffSummary(XrdSsiPb::OStreamBuffer<Data> *streambuf) {
SchedulerDatabase::JobsFailedSummary archive_summary;
SchedulerDatabase::JobsFailedSummary retrieve_summary;
......@@ -166,7 +179,7 @@ public:
if(m_isArchive) {
Data record;
archive_summary = m_scheduler.getArchiveJobsFailedSummary(m_lc);
record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::ARCHIVE_REQUEST);
record.mutable_frls_summary()->set_request_type(admin::RequestType::ARCHIVE_REQUEST);
record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles);
record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes);
streambuf->Push(record);
......@@ -174,14 +187,14 @@ public:
if(m_isRetrieve) {
Data record;
retrieve_summary = m_scheduler.getRetrieveJobsFailedSummary(m_lc);
record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::RETRIEVE_REQUEST);
record.mutable_frls_summary()->set_request_type(admin::RequestType::RETRIEVE_REQUEST);
record.mutable_frls_summary()->set_total_files(retrieve_summary.totalFiles);
record.mutable_frls_summary()->set_total_size(retrieve_summary.totalBytes);
streambuf->Push(record);
}
if(m_isArchive && m_isRetrieve) {
Data record;
record.mutable_frls_summary()->set_request_type(cta::admin::RequestType::TOTAL);
record.mutable_frls_summary()->set_request_type(admin::RequestType::TOTAL);
record.mutable_frls_summary()->set_total_files(archive_summary.totalFiles + retrieve_summary.totalFiles);
record.mutable_frls_summary()->set_total_size(archive_summary.totalBytes + retrieve_summary.totalBytes);
streambuf->Push(record);
......@@ -190,15 +203,41 @@ public:
m_isSummary = false;
}
/*!
* Add a record to the stream
*
* @param[in] streambuf XRootD SSI stream object to push records to
* @param[in] requestType The type of failed request (archive or retrieve)
*/
void pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, admin::RequestType requestType) {
Data record;
switch(requestType) {
case admin::RequestType::ARCHIVE_REQUEST:
record.mutable_frls_item()->set_tapepool("tapepool"); break;
case admin::RequestType::RETRIEVE_REQUEST:
record.mutable_frls_item()->mutable_tf()->set_vid("vid"); break;
default:
throw exception::Exception("Unrecognised RequestType: " + std::to_string(requestType));
}
record.mutable_frls_item()->set_request_type(requestType);
record.mutable_frls_item()->set_copy_nb(1);
record.mutable_frls_item()->mutable_requester()->set_username("u");
record.mutable_frls_item()->mutable_requester()->set_groupname("g");
record.mutable_frls_item()->mutable_af()->mutable_df()->set_path("/path/");
//record.mutable_frls()->set_failurelogs(...
streambuf->Push(record);
}
private:
cta::Scheduler &m_scheduler; //!< Reference to CTA Scheduler
Scheduler &m_scheduler; //!< Reference to CTA Scheduler
bool m_isArchive; //!< List failed archive requests
bool m_isRetrieve; //!< List failed retrieve requests
bool m_isLogEntries; //!< Show failure log messages (verbose)
bool m_isSummary; //!< Short summary of number of failures
bool m_isArchive; //!< List failed archive requests
bool m_isRetrieve; //!< List failed retrieve requests
bool m_isLogEntries; //!< Show failure log messages (verbose)
bool m_isSummary; //!< Short summary of number of failures
cta::log::LogContext &m_lc; //!< Reference to CTA Log Context
log::LogContext &m_lc; //!< Reference to CTA Log Context
static constexpr const char* const LOG_SUFFIX = "FailedRequestLsStream"; //!< Identifier for SSI log messages
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment