Skip to content
Snippets Groups Projects
Commit 7885f74b authored by Michael Davis's avatar Michael Davis
Browse files

[lpa-stream] Implements listarchivefiles summary

parent e310887f
No related branches found
No related tags found
No related merge requests found
......@@ -23,6 +23,27 @@
namespace cta {
//------------------------------------------------------------------------------
// QueueItor::getQueueJobs (Archive specialisation)
//------------------------------------------------------------------------------
template<typename JobQueuesQueue, typename JobQueue>
void QueueItor<JobQueuesQueue, JobQueue>::getQueueJobs()
{
// Behaviour is racy: it's possible that the queue can disappear before we read it.
// In this case, we ignore the error and move on.
try {
JobQueue osaq(m_jobQueuesQueueIt->address, m_objectStore);
objectstore::ScopedSharedLock ostpl(osaq);
osaq.fetch();
m_jobQueue = osaq.dumpJobs();
ostpl.release();
m_jobQueueIt = m_jobQueue.begin();
} catch(...) {
// Force an increment to the next queue
m_jobQueueIt = m_jobQueue.end();
}
}
//------------------------------------------------------------------------------
// QueueItor::qid (Archive specialisation)
//------------------------------------------------------------------------------
......
......@@ -37,15 +37,32 @@ public:
//! Increment iterator
void operator++() { ++m_jobQueueIt; }
//! Check iterator is valid
/*!
* Ensures that the QueueItor points to a valid object, returning true if there
* are no further objects.
*
* If we have reached the end of a job queue, attempts to advance to the next
* queue and rechecks validity.
*
* @retval true if we have passed the last item in the last queue
* @retval false if internal iterators point to a valid object
*/
bool end();
/*!
* Check if the QueueItor points to the last item of the current queue
*
* This is used to allow iterating over queues one-at-a-time to create a summary.
*/
bool isLastItem() const { return m_jobQueueIt == --m_jobQueue.end(); }
//! Queue ID (returns tapepool for archives/vid for retrieves)
const std::string &qid() const;
//! Get the current job, bool is set to true if the data retrieved is valid
std::pair<bool,typename JobQueue::job_t> getJob() const;
private:
//! Get the list of jobs in the queue
void getQueueJobs();
objectstore::Backend &m_objectStore; // Reference to ObjectStore Backend
......@@ -56,12 +73,11 @@ private:
typename std::list<typename JobQueue::JobDump>::const_iterator m_jobQueueIt; // iterator across m_jobQueue
};
/*!
* Check if the QueueItor points to a valid object
*
* @retval false if internal iterators point to a valid object
* @retval true if we have passed the last item in the last queue
*/
//------------------------------------------------------------------------------
// QueueItor::end
//------------------------------------------------------------------------------
template<typename JobQueuesQueue, typename JobQueue>
bool QueueItor<JobQueuesQueue, JobQueue>::end()
{
......@@ -82,25 +98,4 @@ bool QueueItor<JobQueuesQueue, JobQueue>::end()
return m_jobQueueIt == m_jobQueue.end();
}
/*!
* Get the list of jobs in the queue
*/
template<typename JobQueuesQueue, typename JobQueue>
void QueueItor<JobQueuesQueue, JobQueue>::getQueueJobs()
{
// Behaviour is racy: it's possible that the queue can disappear before we read it.
// In this case, we ignore the error and move on.
try {
JobQueue osaq(m_jobQueuesQueueIt->address, m_objectStore);
objectstore::ScopedSharedLock ostpl(osaq);
osaq.fetch();
m_jobQueue = osaq.dumpJobs();
ostpl.release();
m_jobQueueIt = m_jobQueue.begin();
} catch(...) {
// Force an increment to the next queue
m_jobQueueIt = m_jobQueue.end();
}
}
} // namespace cta
......@@ -79,11 +79,30 @@ public:
for(bool is_buffer_full = false; !m_queueItor.end() && !is_buffer_full; ++m_queueItor)
{
auto job = m_queueItor.getJob();
if(!job.first) continue;
Data record = fillRecord(m_queueItor.qid(), job.second);
Data record;
if(m_isExtended) {
// One record on the stream = one file
auto job = m_queueItor.getJob();
if(!job.first) continue;
record = fillRecord(m_queueItor.qid(), job.second);
} else {
// One record on the stream = summary of the current queue
uint64_t total_files = 0;
uint64_t total_size = 0;
for(auto job = m_queueItor.getJob(); ; ++m_queueItor) {
if(job.first) {
++total_files;
total_size += job.second.request.fileSize;
}
// Break before incrementing the queueItor if we are on the last item. m_queueItor
// is incremented in the outer loop, so we don't want to increment twice.
if(m_queueItor.isLastItem()) break;
}
record = fillRecord(m_queueItor.qid(), total_files, total_size);
}
// is_buffer_full is set to true when we have one full block of data in the buffer, i.e.
// enough data to send to the client. The actual buffer size is double the block size,
......@@ -117,6 +136,8 @@ private:
typedef decltype(m_queueItor.getJob().second) data_t; //!< Infer data type from template type
Data fillRecord(const std::string &tape_id, const data_t &job); //!< Convert data to protobuf
Data fillRecord(const std::string &tape_id,
const uint64_t &total_files, const uint64_t &total_size); //!< Convert summary to protobuf
static constexpr const char* const LOG_SUFFIX = "ListPendingQueue"; //!< Identifier for log messages
};
......@@ -126,7 +147,8 @@ private:
// Template specialisations for Archive and Retrieve Queue types
template<>
Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool, const common::dataStructures::ArchiveJob &job)
Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool,
const common::dataStructures::ArchiveJob &job)
{
Data record;
......@@ -152,42 +174,24 @@ Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::strin
af->mutable_df()->set_group(job.request.requester.group);
af->mutable_df()->set_path(job.request.diskFileInfo.path);
#if 0
if(has_flag(OptionBoolean::EXTENDED))
{
for(auto it = result.cbegin(); it != result.cend(); it++) {
for(auto jt = it->second.cbegin(); jt != it->second.cend(); jt++)
{
std::vector<std::string> currentRow;
currentRow.push_back(it->first);
currentRow.push_back(std::to_string(static_cast<unsigned long long>(jt->archiveFileID)));
currentRow.push_back(jt->request.storageClass);
currentRow.push_back(std::to_string(static_cast<unsigned long long>(jt->copyNumber)));
currentRow.push_back(jt->request.diskFileID);
currentRow.push_back(jt->instanceName);
currentRow.push_back(jt->request.checksumType);
currentRow.push_back(jt->request.checksumValue);
currentRow.push_back(std::to_string(static_cast<unsigned long long>(jt->request.fileSize)));
currentRow.push_back(jt->request.requester.name);
currentRow.push_back(jt->request.requester.group);
currentRow.push_back(jt->request.diskFileInfo.path);
responseTable.push_back(currentRow);
}
}
} else {
for(auto it = result.cbegin(); it != result.cend(); it++) {
std::vector<std::string> currentRow;
currentRow.push_back(it->first);
currentRow.push_back(std::to_string(static_cast<unsigned long long>(it->second.size())));
uint64_t size = 0;
for(auto jt = it->second.cbegin(); jt != it->second.cend(); jt++) {
size += jt->request.fileSize;
}
currentRow.push_back(std::to_string(static_cast<unsigned long long>(size)));
responseTable.push_back(currentRow);
}
}
#endif
return record;
}
template<>
Data ListPendingQueue<OStoreDB::ArchiveQueueItor_t>::fillRecord(const std::string &tapepool,
const uint64_t &total_files, const uint64_t &total_size)
{
Data record;
// Response type
record.mutable_af_summary_item()->set_type(cta::admin::ArchiveFileSummaryItem::LISTPENDINGARCHIVES);
// Tapepool
record.mutable_af_summary_item()->set_tapepool(tapepool);
// Summary statistics
record.mutable_af_summary_item()->set_total_files(total_files);
record.mutable_af_summary_item()->set_total_size(total_size);
return record;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment