Skip to content
Snippets Groups Projects
Commit df0f4da3 authored by Michael Davis's avatar Michael Davis
Browse files

[lpa-stream] Refactors XrdCtaListPendingQueue.hpp

Allows pushing multiple protobufs per queue item, needed in the case of
listing retrieve queue jobs, which can have several VIDs in the same
job.
parent 10cc98dc
No related branches found
No related tags found
No related merge requests found
......@@ -80,7 +80,13 @@ public:
// Fill the buffer
streambuf = new XrdSsiPb::OStreamBuffer<Data>(dlen);
Data record;
/*
* is_buffer_full is set to true when we have one full block of data in the buffer, i.e. enough
* data to send to the client. The actual buffer size is double the block size, so we can keep
* writing a few additional records after is_buffer_full is true. These will be sent on the next
* iteration. If we exceed the hard limit of double the block size, Push() will throw an exception.
*/
if(m_isExtended) {
// Detailed listing of all queued files
......@@ -89,14 +95,7 @@ public:
{
auto job = m_queueItor.getJob();
if(!job.first) continue;
record = fillRecord(m_queueItor.qid(), job.second);
// is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
// enough data to send to the client. The actual buffer size is double the block size,
// so we can keep writing a few additional records after is_buffer_full is true. These
// will be sent on the next iteration. If we exceed the hard limit of double the block
// size, Push() will throw an exception.
is_buffer_full = streambuf->Push(record);
is_buffer_full = pushRecord(streambuf, m_queueItor.qid(), job.second);
}
} else {
// Summary by tapepool or vid
......@@ -114,9 +113,7 @@ public:
}
}
record = fillRecord(m_queueItor.qid(), total_files, total_size);
is_buffer_full = streambuf->Push(record);
is_buffer_full = pushRecord(streambuf, m_queueItor.qid(), total_files, total_size);
}
}
......@@ -144,9 +141,11 @@ private:
typedef decltype(m_queueItor.getJob().second) data_t; //!< Infer data type from template type
Data fillRecord(const std::string &tape_id, const data_t &job); //!< Convert data to protobuf
Data fillRecord(const std::string &tape_id,
const uint64_t &total_files, const uint64_t &total_size); //!< Convert summary to protobuf
bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, //!< Convert data to protobufs and put on stream
const std::string &tape_id, const data_t &job);
bool pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf, //!< Convert summary to protobufs and put on stream
const std::string &tape_id, const uint64_t &total_files,
const uint64_t &total_size);
static uint64_t fileSize(const data_t &job); //!< Obtain file size from queue item
static constexpr const char* const LOG_SUFFIX = "ListPendingQueue"; //!< Identifier for log messages
......@@ -157,8 +156,8 @@ private:
// Template specialisations for Archive Queue types
template<>
Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool,
const common::dataStructures::ArchiveJob &job)
bool ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
const std::string &tapepool, const common::dataStructures::ArchiveJob &job)
{
Data record;
......@@ -184,12 +183,12 @@ Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::strin
af->mutable_df()->set_group(job.request.requester.group);
af->mutable_df()->set_path(job.request.diskFileInfo.path);
return record;
return streambuf->Push(record);
}
template<>
Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool,
const uint64_t &total_files, const uint64_t &total_size)
bool ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
const std::string &tapepool, const uint64_t &total_files, const uint64_t &total_size)
{
Data record;
......@@ -203,7 +202,7 @@ Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::strin
record.mutable_af_summary_item()->set_total_files(total_files);
record.mutable_af_summary_item()->set_total_size(total_size);
return record;
return streambuf->Push(record);
}
template<>
......@@ -216,8 +215,8 @@ uint64_t ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fileSize(const data_t &
// Template specialisations for Retrieve Queue types
template<>
Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::string &vid,
const common::dataStructures::RetrieveJob &job)
bool ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
const std::string &vid, const common::dataStructures::RetrieveJob &job)
{
#if 0
......@@ -267,12 +266,12 @@ Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::stri
af->mutable_df()->set_group(job.request.requester.group);
af->mutable_df()->set_path(job.request.diskFileInfo.path);
return record;
return streambuf->Push(record);
}
template<>
Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::string &tapepool,
const uint64_t &total_files, const uint64_t &total_size)
bool ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::pushRecord(XrdSsiPb::OStreamBuffer<Data> *streambuf,
const std::string &vid, const uint64_t &total_files, const uint64_t &total_size)
{
#if 0
for(auto it = result.cbegin(); it != result.cend(); it++)
......@@ -296,13 +295,13 @@ Data ListPendingQueue<OStoreDB::RetrieveQueueItor_t>::fillRecord(const std::stri
record.mutable_af_summary_item()->set_type(cta::admin::ArchiveFileSummaryItem::LISTPENDINGRETRIEVES);
// Tapepool
record.mutable_af_summary_item()->set_tapepool(tapepool);
record.mutable_af_summary_item()->set_vid(vid);
// Summary statistics
record.mutable_af_summary_item()->set_total_files(total_files);
record.mutable_af_summary_item()->set_total_size(total_size);
return record;
return streambuf->Push(record);
}
template<>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment