Skip to content
Snippets Groups Projects
Commit c68c67eb authored by Michael Davis's avatar Michael Davis
Browse files

[lpa_stream] QueueItor does async fetch of retrieve jobs

parent 19872d46
No related branches found
No related tags found
No related merge requests found
......@@ -91,20 +91,20 @@ std::cerr << "ArchiveQueue getQueueJobs() : fetching " << jobQueueChunk.size() <
// Async fetch of the archive jobs from the objectstore
for(auto &j : jobQueueChunk) {
auto ar = ArchiveRequest(j.address, m_objectStore);
requests.push_back(std::make_pair(ar, std::unique_ptr<ArchiveRequest::AsyncLockfreeFetcher>(ar.asyncLockfreeFetch())));
requests.push_back(std::make_pair(ArchiveRequest(j.address, m_objectStore), nullptr));
requests.back().second.reset(requests.back().first.asyncLockfreeFetch());
}
// Populate the jobs cache from the archive jobs
for(auto &osar : requests) {
try {
osar.second->wait();
} catch (Backend::NoSuchObject &ex) {
} catch(Backend::NoSuchObject &ex) {
// Skip non-existent objects
continue;
}
// Find the copy number for this tape pool
// Find the copy for this TapePool
for(auto &j : osar.first.dumpJobs()) {
if(j.tapePool == m_jobQueuesQueueIt->tapePool) {
auto job = cta::common::dataStructures::ArchiveJob();
......@@ -187,24 +187,37 @@ getQueueJobs(const jobQueue_t &jobQueueChunk)
using namespace objectstore;
std::cerr << "RetrieveQueue getQueueJobs() : fetching " << jobQueueChunk.size() << " jobs." << std::endl;
// Populate the jobs cache from the retrieve jobs
for(auto &j: jobQueueChunk) {
typedef std::pair<RetrieveRequest,
std::unique_ptr<RetrieveRequest::AsyncLockfreeFetcher>> retrieveJob_t;
std::list<retrieveJob_t> requests;
// Async fetch of the retrieve jobs from the objectstore
for(auto &j : jobQueueChunk) {
requests.push_back(std::make_pair(RetrieveRequest(j.address, m_objectStore), nullptr));
requests.back().second.reset(requests.back().first.asyncLockfreeFetch());
}
// Populate the jobs cache from the retrieve jobs
for(auto &osrr : requests) {
try {
auto job = cta::common::dataStructures::RetrieveJob();
osrr.second->wait();
} catch (Backend::NoSuchObject &ex) {
// Skip non-existent objects
continue;
}
// Find the copy for this Volume ID
for(auto &tf: osrr.first.getArchiveFile().tapeFiles) {
if(tf.second.vid == m_jobQueuesQueueIt->vid) {
auto job = cta::common::dataStructures::RetrieveJob();
objectstore::RetrieveRequest rr(j.address, m_objectStore);
objectstore::ScopedSharedLock rrl(rr);
rr.fetch();
job.request = rr.getSchedulerRequest();
for(auto &tf: rr.getArchiveFile().tapeFiles) {
job.tapeCopies[tf.second.vid].first = tf.second.copyNb;
job.tapeCopies[tf.second.vid].second = tf.second;
}
m_jobCache.push_back(job);
} catch(...) {
// This implementation gives imperfect consistency and is racy. If the queue has gone, move on.
m_jobCache.push_back(job);
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment