Skip to content
Snippets Groups Projects
Commit 023b6270 authored by Eric Cano's avatar Eric Cano
Browse files

Switched to bulk addition od jobs in archive queues.

Adapted MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializedAddJobsToQueue()
to call the new function.
parent 1b685511
No related branches found
No related tags found
No related merge requests found
......@@ -144,34 +144,33 @@ std::string ArchiveQueue::getTapePool() {
return m_payload.tapepool();
}
void ArchiveQueue::addJob(const ArchiveRequest::JobDump& job,
const std::string & archiveRequestAddress, uint64_t archiveFileId,
uint64_t fileSize, const cta::common::dataStructures::MountPolicy & policy,
time_t startTime) {
void ArchiveQueue::addJobs(std::list<JobToAdd> & jobsToAdd) {
checkPayloadWritable();
// Keep track of the mounting criteria
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
maxDriveAllowedMap.incCount(policy.maxDrivesAllowed);
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
priorityMap.incCount(policy.archivePriority);
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
minArchiveRequestAgeMap.incCount(policy.archiveMinRequestAge);
if (m_payload.pendingarchivejobs_size()) {
if ((uint64_t)startTime < m_payload.oldestjobcreationtime())
m_payload.set_oldestjobcreationtime(startTime);
m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + fileSize);
} else {
m_payload.set_archivejobstotalsize(fileSize);
m_payload.set_oldestjobcreationtime(startTime);
for (auto & jta: jobsToAdd) {
// Keep track of the mounting criteria
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed);
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
priorityMap.incCount(jta.policy.archivePriority);
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
minArchiveRequestAgeMap.incCount(jta.policy.archiveMinRequestAge);
if (m_payload.pendingarchivejobs_size()) {
if ((uint64_t)jta.startTime < m_payload.oldestjobcreationtime())
m_payload.set_oldestjobcreationtime(jta.startTime);
m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + jta.fileSize);
} else {
m_payload.set_archivejobstotalsize(jta.fileSize);
m_payload.set_oldestjobcreationtime(jta.startTime);
}
auto * j = m_payload.add_pendingarchivejobs();
j->set_address(jta.archiveRequestAddress);
j->set_size(jta.fileSize);
j->set_fileid(jta.archiveFileId);
j->set_copynb(jta.job.copyNb);
j->set_maxdrivesallowed(jta.policy.maxDrivesAllowed);
j->set_priority(jta.policy.archivePriority);
j->set_minarchiverequestage(jta.policy.archiveMinRequestAge);
}
auto * j = m_payload.add_pendingarchivejobs();
j->set_address(archiveRequestAddress);
j->set_size(fileSize);
j->set_fileid(archiveFileId);
j->set_copynb(job.copyNb);
j->set_maxdrivesallowed(policy.maxDrivesAllowed);
j->set_priority(policy.archivePriority);
j->set_minarchiverequestage(policy.archiveMinRequestAge);
}
auto ArchiveQueue::getJobsSummary() -> JobsSummary {
......
......@@ -56,10 +56,16 @@ public:
void setTapePool(const std::string & name);
std::string getTapePool();
// Archive jobs management ===================================================
void addJob(const ArchiveRequest::JobDump & job,
const std::string & archiveRequestAddress, uint64_t archiveFileId,
uint64_t fileSize, const cta::common::dataStructures::MountPolicy & policy, time_t startTime);
// Archive jobs management ===================================================
struct JobToAdd {
ArchiveRequest::JobDump & job;
const std::string archiveRequestAddress;
uint64_t archiveFileId;
uint64_t fileSize;
const cta::common::dataStructures::MountPolicy policy;
time_t startTime;
};
void addJobs(std::list<JobToAdd> & jobsToAdd);
/// This version will check for existence of the job in the queue before
// returns true if a new job was actually inserted.
bool addJobIfNecessary(const ArchiveRequest::JobDump & job,
......
......@@ -329,7 +329,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
cta::objectstore::ArchiveQueue aq(tpAddr[i], be);
}
// Create the various ATFR's, stopping one step further each time.
int pass=0;
unsigned int pass=0;
while (true)
{
// -just referenced
......@@ -385,7 +385,9 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
policy.archiveMinRequestAge = 0;
policy.archivePriority = 1;
policy.maxDrivesAllowed = 1;
aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL));
std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta;
jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000U+pass, policy, time(NULL)});
aq.addJobs(jta);
aq.commit();
}
if (pass < 4) { pass++; continue; }
......@@ -403,7 +405,9 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
policy.archiveMinRequestAge = 0;
policy.archivePriority = 1;
policy.maxDrivesAllowed = 1;
aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL));
std::list <cta::objectstore::ArchiveQueue::JobToAdd> jta;
jta.push_back({jd, ar.getAddressIfSet(), ar.getArchiveFile().archiveFileID, 1000+pass, policy, time(NULL)});
aq.addJobs(jta);
aq.commit();
}
if (pass < 5) { pass++; continue; }
......
......@@ -24,31 +24,43 @@
namespace cta { namespace ostoredb {
template<>
void MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializedAddJobToQueue(
objectstore::ArchiveRequest::JobDump& job, objectstore::ArchiveRequest& request, objectstore::ArchiveQueue& queue) {
auto af = request.getArchiveFile();
queue.addJob(job, request.getAddressIfSet(), af.archiveFileID,
af.fileSize, request.getMountPolicy(), request.getEntryLog().time);
// Back reference the queue in the job and archive request
job.owner = queue.getAddressIfSet();
request.setJobOwner(job.copyNb, job.owner);
void MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::specializedAddJobsToQueue(
std::list<MemQueue<objectstore::ArchiveRequest, objectstore::ArchiveQueue>::JobAndRequest> & jobsToAdd,
objectstore::ArchiveQueue& queue) {
std::list<objectstore::ArchiveQueue::JobToAdd> jtal;
auto queueAddress = queue.getAddressIfSet();
for (auto & j: jobsToAdd) {
jtal.push_back({j.job, j.request.getAddressIfSet(), j.request.getArchiveFile().archiveFileID, j.request.getArchiveFile().fileSize,
j.request.getMountPolicy(), j.request.getEntryLog().time});
// We pre-mark (in memory) request as being owned by the queue.
// The actual commit of the request will happen after the queue's,
// so the back reference will be valid.
j.job.owner = queueAddress;
j.request.setJobOwner(j.job.copyNb, j.job.owner);
}
queue.addJobs(jtal);
}
template<>
void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::specializedAddJobToQueue(
objectstore::RetrieveRequest::JobDump& job, objectstore::RetrieveRequest& request, objectstore::RetrieveQueue& queue) {
// We need to find corresponding to the copyNb
for (auto & j: request.getArchiveFile().tapeFiles) {
if (j.second.copyNb == job.copyNb) {
auto criteria = request.getRetrieveFileQueueCriteria();
queue.addJob(j.second.copyNb, j.second.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize,
criteria.mountPolicy, request.getEntryLog().time);
request.setActiveCopyNumber(j.second.copyNb);
request.setOwner(queue.getAddressIfSet());
goto jobAdded;
void MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::specializedAddJobsToQueue(
std::list<MemQueue<objectstore::RetrieveRequest, objectstore::RetrieveQueue>::JobAndRequest> & jobsToAdd,
objectstore::RetrieveQueue &queue) {
for (auto & jta: jobsToAdd) {
// We need to find corresponding to the copyNb
auto & job = jta.job;
auto & request = jta.request;
for (auto & j: request.getArchiveFile().tapeFiles) {
if (j.second.copyNb == job.copyNb) {
auto criteria = request.getRetrieveFileQueueCriteria();
queue.addJob(j.second.copyNb, j.second.fSeq, request.getAddressIfSet(), criteria.archiveFile.fileSize,
criteria.mountPolicy, request.getEntryLog().time);
request.setActiveCopyNumber(j.second.copyNb);
request.setOwner(queue.getAddressIfSet());
goto jobAdded;
}
}
}
jobAdded:;
}
}
template<>
......
......@@ -178,8 +178,14 @@ private:
static std::shared_ptr<SharedQueueLock<Queue, Request>> sharedAddToNewQueue(typename Request::JobDump & job, const std::string & queueIndex,
Request & request, OStoreDB & oStoreDB, log::LogContext & logContext, threading::MutexLocker &globalLock);
/** Struct holding the job plus request data */
struct JobAndRequest {
typename Request::JobDump & job;
Request & request;
};
/** Helper function handling the difference between archive and retrieve (vid vs tapepool) */
static void specializedAddJobToQueue(typename Request::JobDump & job, Request & request, Queue & queue);
static void specializedAddJobsToQueue(std::list<JobAndRequest> & jobsToAdd, Queue & queue);
/** Helper function updating the cached retrieve queue stats. Noop for archive queues */
static void specializedUpdateCachedQueueStats(Queue &queue);
......@@ -301,16 +307,20 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share
qBytesBefore+=j.size;
}
size_t addedJobs=1;
// Build the list of jobs to add to the queue
std::list<JobAndRequest> jta;
// First add the job for this thread
specializedAddJobToQueue(job, request, queue);
jta.push_back({job, request});
// We are done with the queue: release the lock to make helgrind happy.
ulq.unlock();
// We do the same for all the queued requests
for (auto &maqr: maq->m_requests) {
// Add the job
specializedAddJobToQueue(maqr->m_job, maqr->m_request, queue);
jta.push_back({maqr->m_job, maqr->m_request});
addedJobs++;
}
// Actually ass the jobs.
specializedAddJobsToQueue(jta, queue);
double inMemoryQueueProcessTime = timer.secs(utils::Timer::resetCounter);
// We can now commit the multi-request addition to the object store
queue.commit();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment