Skip to content
Snippets Groups Projects
Commit db412924 authored by Michael Davis's avatar Michael Davis
Browse files

[lpa_stream] Adds cache for queue jobs and dereference operator

parent 642be592
Branches
Tags v0.0-129
No related merge requests found
......@@ -46,6 +46,35 @@ getQueueJobs()
}
}
//auto ArchiveQueue::dumpJobs() -> std::list<JobDump> {
//checkPayloadReadable();
// Go read the shards in parallel...
// std::list<JobDump> ret;
// std::list<ArchiveQueueShard> shards;
// std::list<std::unique_ptr<ArchiveQueueShard::AsyncLockfreeFetcher>> shardsFetchers;
// for (auto & sa: m_payload.archivequeueshards()) {
// shards.emplace_back(ArchiveQueueShard(sa.address(), m_objectStore));
// shardsFetchers.emplace_back(shards.back().asyncLockfreeFetch());
// }
// auto s = shards.begin();
// auto sf = shardsFetchers.begin();
// while (s != shards.end()) {
// try {
// (*sf)->wait();
// } catch (Backend::NoSuchObject & ex) {
// // We are possibly in read only mode, so we cannot rebuild.
// // Just skip this shard.
// goto nextShard;
// }
// for (auto & j: s->dumpJobs()) {
// ret.emplace_back(JobDump{j.size, j.address, j.copyNb});
// }
// nextShard:
// s++; sf++;
// }
// return ret;
// }
//------------------------------------------------------------------------------
// QueueItor::QueueItor (Archive specialisation)
//------------------------------------------------------------------------------
......
......@@ -31,12 +31,21 @@ namespace cta {
template<typename JobQueuesQueue, typename JobQueue>
class QueueItor {
public:
//! Constructor
/*!
* Default constructor
*/
QueueItor(objectstore::Backend &objectStore, const std::string &queue_id = "");
/*!
* No assignment constructor
*/
QueueItor operator=(QueueItor &rhs) = delete;
// Default copy constructor is deleted in favour of move constructor
//! Move constructor
/*!
* Move constructor
*/
QueueItor(QueueItor &&rhs) :
m_objectStore(std::move(rhs).m_objectStore),
m_onlyThisQueueId(std::move(rhs).m_onlyThisQueueId),
......@@ -57,9 +66,6 @@ public:
}
}
//! No assignment constructor
QueueItor operator=(QueueItor &rhs) = delete;
/*!
* Increment iterator
*
......@@ -117,21 +123,44 @@ public:
return m_jobQueuesQueueIt == m_jobQueuesQueue.end() || (m_onlyThisQueueId && endq());
}
//! Queue ID (returns tapepool for archives/vid for retrieves)
/*!
* Queue ID (returns tapepool for archives/vid for retrieves)
*/
const std::string &qid() const;
//! Get the current job, bool is set to true if the data retrieved is valid
/*!
* Dereference the QueueItor
*/
typename JobQueue::job_t &operator*() const {
return m_jobsCache.front();
}
/*!
* Get the current job, bool is set to true if the data retrieved is valid
*/
std::pair<bool,typename JobQueue::job_t> getJob() const;
private:
//! Get the list of jobs in the queue
/*!
* Get the list of jobs in the queue
*/
void getQueueJobs();
objectstore::Backend &m_objectStore; // Reference to ObjectStore Backend
bool m_onlyThisQueueId; // true if a queue_id parameter was passed to the constructor
typename std::list<JobQueuesQueue> m_jobQueuesQueue; // list of Archive or Retrieve Job Queues
typename std::list<JobQueuesQueue>::const_iterator m_jobQueuesQueueIt; // iterator across m_jobQueuesQueue
typename std::list<typename JobQueue::JobDump> m_jobQueue; // list of Archive or Retrieve Jobs
typename std::list<typename JobQueue::JobDump>::const_iterator m_jobQueueIt; // iterator across m_jobQueue
/*!
* Update the cache of queue jobs
*/
void updateJobsCache();
const int JOB_CACHE_SIZE = 100; //! Maximum number of jobs to asynchronously fetch from the queue at once
objectstore::Backend &m_objectStore; //! Reference to ObjectStore Backend
bool m_onlyThisQueueId; //! true if a queue_id parameter was passed to the constructor
typename std::list<JobQueuesQueue> m_jobQueuesQueue; //! list of Archive or Retrieve Job Queues
typename std::list<JobQueuesQueue>::const_iterator m_jobQueuesQueueIt; //! iterator across m_jobQueuesQueue
typename std::list<typename JobQueue::JobDump> m_jobQueue; //! list of Archive or Retrieve Jobs
typename std::list<typename JobQueue::JobDump>::const_iterator m_jobQueueIt; //! iterator across m_jobQueue
typename std::list<typename JobQueue::job_t> m_jobsCache; //! local cache of queue jobs
};
} // namespace cta
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment