Commit fa7e62ba authored by Eric Cano's avatar Eric Cano
Browse files

Moved the retrieve requeuest queueing to an asynchronous task.

This version is not data safe as the request is created late, right after
referencing the job to the queue.
parent 7304f380
......@@ -696,16 +696,16 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
}
vidFound:
// In order to post the job, construct it first in memory.
objectstore::RetrieveRequest rReq(m_agentReference->nextId("RetrieveRequest"), m_objectStore);
rReq.initialize();
rReq.setSchedulerRequest(rqst);
rReq.setRetrieveFileQueueCriteria(criteria);
auto rReq = std::make_shared<objectstore::RetrieveRequest> (m_agentReference->nextId("RetrieveRequest"), m_objectStore);
rReq->initialize();
rReq->setSchedulerRequest(rqst);
rReq->setRetrieveFileQueueCriteria(criteria);
// Find the job corresponding to the vid (and check we indeed have one).
auto jobs = rReq.getJobs();
objectstore::RetrieveRequest::JobDump * job = nullptr;
auto jobs = rReq->getJobs();
objectstore::RetrieveRequest::JobDump job;
for (auto & j:jobs) {
if (j.copyNb == bestCopyNb) {
job = &j;
job = j;
goto jobFound;
}
}
......@@ -717,29 +717,41 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR
}
jobFound:
{
// Add the request to the queue (with a shared access).
auto sharedLock = ostoredb::MemRetrieveQueue::sharedAddToQueue(*job, bestVid, rReq, *this, logContext);
double qTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
// The object ownership was set in SharedAdd.
// We need to extract the owner before inserting. After, we would need to hold a lock.
auto owner = rReq.getOwner();
rReq.insert();
double iTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
// The lock on the queue is released here (has to be after the request commit for consistency.
sharedLock.reset();
double qUnlockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
log::ScopedParamContainer params(logContext);
params.add("vid", bestVid)
.add("queueObject", owner)
.add("jobObject", rReq.getAddressIfSet())
.add("fileId", rReq.getArchiveFile().archiveFileID)
.add("diskInstance", rReq.getArchiveFile().diskInstance)
.add("diskFilePath", rReq.getArchiveFile().diskFileInfo.path)
.add("diskFileId", rReq.getArchiveFile().diskFileId)
.add("queueingTime", qTime)
.add("insertTime", iTime)
.add("queueUnlockTime", qUnlockTime);
logContext.log(log::INFO, "In OStoreDB::queueRetrieve(): added job to queue (enqueueing fnished).");
m_threadCounter++;
m_enqueueingTasksQueue.push(new EnqueueingTask([rReq, job, bestVid, this]{
// This unique_ptr's destructor will ensure the OStoreDB object is not deleted before the thread exits.
auto scopedCounterDecrement = [this](void *){
m_threadCounter--;
};
// A bit ugly, but we need a non-null pointer for the "deleter" to be called.
std::unique_ptr<void, decltype(scopedCounterDecrement)> scopedCounterDecrementerInstance((void *)1, scopedCounterDecrement);
log::LogContext logContext(m_logger);
utils::Timer timer;
// Add the request to the queue (with a shared access).
auto nonConstJob = job;
auto sharedLock = ostoredb::MemRetrieveQueue::sharedAddToQueue(nonConstJob, bestVid, *rReq, *this, logContext);
double qTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
// The object ownership was set in SharedAdd.
// We need to extract the owner before inserting. After, we would need to hold a lock.
auto owner = rReq->getOwner();
rReq->insert();
double iTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
// The lock on the queue is released here (has to be after the request commit for consistency.
sharedLock.reset();
double qUnlockTime = timer.secs(cta::utils::Timer::reset_t::resetCounter);
log::ScopedParamContainer params(logContext);
params.add("vid", bestVid)
.add("queueObject", owner)
.add("jobObject", rReq->getAddressIfSet())
.add("fileId", rReq->getArchiveFile().archiveFileID)
.add("diskInstance", rReq->getArchiveFile().diskInstance)
.add("diskFilePath", rReq->getArchiveFile().diskFileInfo.path)
.add("diskFileId", rReq->getArchiveFile().diskFileId)
.add("queueingTime", qTime)
.add("insertTime", iTime)
.add("queueUnlockTime", qUnlockTime);
logContext.log(log::INFO, "In OStoreDB::queueRetrieve(): added job to queue (enqueueing finished).");
}));
}
return bestVid;
}
......
......@@ -494,6 +494,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
request.requester.group = "userGroup";
scheduler.queueRetrieve("disk_instance", request, lc);
}
scheduler.waitSchedulerDbSubthreadsComplete();
// Check that the retrieve request is queued
{
......
......@@ -427,6 +427,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayRecall) {
scheduler.queueRetrieve(diskInstance, rReq, logContext);
}
}
scheduler.waitSchedulerDbSubthreadsComplete();
// 6) Report the drive's existence and put it up in the drive register.
cta::tape::daemon::TpconfigLine driveConfig("T10D6116", "TestLogicalLibrary", "/dev/tape_T10D6116", "manual");
......@@ -620,6 +621,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) {
scheduler.queueRetrieve(diskInstance, rReq, logContext);
}
}
scheduler.waitSchedulerDbSubthreadsComplete();
// 6) Report the drive's existence and put it up in the drive register.
cta::tape::daemon::TpconfigLine driveConfig("T10D6116", "TestLogicalLibrary", "/dev/tape_T10D6116", "manual");
......@@ -825,6 +827,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
}
}
}
scheduler.waitSchedulerDbSubthreadsComplete();
// 6) Report the drive's existence and put it up in the drive register.
cta::tape::daemon::TpconfigLine driveConfig("T10D6116", "TestLogicalLibrary", "/dev/tape_T10D6116", "manual");
......@@ -998,6 +1001,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionNoSuchDrive) {
scheduler.queueRetrieve(diskInstance, rReq, logContext);
}
}
scheduler.waitSchedulerDbSubthreadsComplete();
// 7) Report the drive's existence and put it up in the drive register.
cta::tape::daemon::TpconfigLine driveConfig("T10D6116", "TestLogicalLibrary", "/dev/noSuchDrive", "manual");
......@@ -1142,7 +1146,8 @@ TEST_P(DataTransferSessionTest, DataTransferSessionFailtoMount) {
scheduler.queueRetrieve(diskInstance, rReq, logContext);
}
}
scheduler.waitSchedulerDbSubthreadsComplete();
// 7) Report the drive's existence and put it up in the drive register.
cta::tape::daemon::TpconfigLine driveConfig("T10D6116", "TestLogicalLibrary", "/dev/tape_T10D6116", "manual");
cta::common::dataStructures::DriveInfo driveInfo;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment