Skip to content
Snippets Groups Projects
Commit 43cef6ae authored by Eric Cano's avatar Eric Cano
Browse files

Fixed race potentially leading to contention.

In the MemQueue, the promise for the next batch was set after the queue was committed, but
before the lock was released (by the last user of the queue, through a shared pointer). This
would lead to an uselessly early start of the next queue batch for writing an avoidable contention
on the object store lock. This would no lead to a pile up though as only 2 thread would be contended
(previous and early starting next).
parent 5ab70c9c
No related branches found
No related tags found
No related merge requests found
......@@ -42,7 +42,7 @@ class OStoreDB;
* to the caller of sharedAddToArchiveQueue, so they can delete their copy AFTER
* updating the ownership of their requests.
*/
template <class Queue>
template <class Queue, class Request>
class SharedQueueLock {
template <class, class>
friend class MemQueue;
......@@ -52,18 +52,33 @@ public:
private:
std::unique_ptr<objectstore::ScopedExclusiveLock> m_lock;
std::unique_ptr<Queue> m_queue;
std::string m_queueIndex;
std::shared_ptr<std::promise<void>> m_promiseForNext;
log::LogContext m_logContext;
utils::Timer m_timer;
};
template <class Queue>
SharedQueueLock<Queue>::~SharedQueueLock() {
template <class Queue, class Request>
SharedQueueLock<Queue, Request>::~SharedQueueLock() {
m_lock->release();
log::ScopedParamContainer params(m_logContext);
params.add("objectQueue", m_queue->getAddressIfSet())
.add("waitAndUnlockTime", m_timer.secs());
m_logContext.log(log::INFO, "In SharedQueueLock::~SharedQueueLock(): unlocked the archive queue pointer.");
}
// The next update of the queue can now proceed
ANNOTATE_HAPPENS_BEFORE(m_promiseForNext.get());
m_promiseForNext->set_value();
// We can now cleanup the promise/future couple if they were not picked up to trim the maps.
// A next thread finding them unlocked or absent will be equivalent.
threading::MutexLocker globalLock(MemQueue<Queue, Request>::g_mutex);
// If there is an promise AND it is ours, we remove it.
try {
if (MemQueue<Queue, Request>::g_promises.at(m_queueIndex).get() == m_promiseForNext.get()) {
MemQueue<Queue, Request>::g_futures.erase(m_queueIndex);
MemQueue<Queue, Request>::g_promises.erase(m_queueIndex);
}
} catch (std::out_of_range &) {}
}
template <class Request, class Queue>
class MemQueueRequest {
......@@ -79,7 +94,7 @@ private:
typename Request::JobDump & m_job;
Request & m_request;
std::promise<void> m_promise;
std::shared_ptr<SharedQueueLock<Queue>> m_returnValue;
std::shared_ptr<SharedQueueLock<Queue, Request>> m_returnValue;
// Mutex protecting users against premature deletion
threading::Mutex m_mutex;
// Helper for debugging
......@@ -88,6 +103,8 @@ private:
template <class Request, class Queue>
class MemQueue {
template <class, class>
friend class SharedQueueLock;
public:
/**
* This function adds ArchiveRequeuest to an ArchiveQueue in batch.
......@@ -108,7 +125,7 @@ public:
* if needed
* @param logContext log context to log addition of jobs to the queue.
*/
static std::shared_ptr<SharedQueueLock<Queue>> sharedAddToQueue(typename Request::JobDump & job,
static std::shared_ptr<SharedQueueLock<Queue, Request>> sharedAddToQueue(typename Request::JobDump & job,
const std::string & queueIndex, Request & request, OStoreDB & oStoreDB, log::LogContext & logContext);
private:
......@@ -139,7 +156,7 @@ private:
static std::map<std::string, std::future<void>> g_futures;
/** Helper function for sharedAddToArchiveQueue */
static std::shared_ptr<SharedQueueLock<Queue>> sharedAddToNewQueue(typename Request::JobDump & job, const std::string & queueIndex,
static std::shared_ptr<SharedQueueLock<Queue, Request>> sharedAddToNewQueue(typename Request::JobDump & job, const std::string & queueIndex,
Request & request, OStoreDB & oStoreDB, log::LogContext & logContext, threading::MutexLocker &globalLock);
/** Helper function handling the difference between archive and retrieve (vid vs tapepool) */
......@@ -162,7 +179,7 @@ template <class Request, class Queue>
std::map<std::string, std::future<void>> MemQueue<Request, Queue>::g_futures;
template <class Request, class Queue>
std::shared_ptr<SharedQueueLock<Queue>> MemQueue<Request, Queue>::sharedAddToQueue(typename Request::JobDump& job,
std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::sharedAddToQueue(typename Request::JobDump& job,
const std::string & queueIndex, Request& request, OStoreDB & oStoreDB, log::LogContext & logContext) {
// 1) Take the global lock (implicit in the constructor)
threading::MutexLocker globalLock(g_mutex);
......@@ -196,7 +213,7 @@ std::shared_ptr<SharedQueueLock<Queue>> MemQueue<Request, Queue>::sharedAddToQue
}
template <class Request, class Queue>
std::shared_ptr<SharedQueueLock<Queue>> MemQueue<Request, Queue>::sharedAddToNewQueue(
std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::sharedAddToNewQueue(
typename Request::JobDump& job, const std::string & queueIndex, Request& request,
OStoreDB& oStoreDB, log::LogContext& logContext, threading::MutexLocker &globalLock) {
utils::Timer timer;
......@@ -250,7 +267,9 @@ std::shared_ptr<SharedQueueLock<Queue>> MemQueue<Request, Queue>::sharedAddToNew
double waitTime = timer.secs(utils::Timer::resetCounter);
// We can now proceed with the queuing of the jobs in the object store.
try {
std::shared_ptr<SharedQueueLock<Queue>> ret(new SharedQueueLock<Queue>(logContext));
std::shared_ptr<SharedQueueLock<Queue, Request>> ret(new SharedQueueLock<Queue, Request>(logContext));
ret->m_promiseForNext=promiseForSuccessor;
ret->m_queueIndex=queueIndex;
ret->m_queue.reset(new Queue(oStoreDB.m_objectStore));
ret->m_lock.reset(new objectstore::ScopedExclusiveLock);
auto & queue = *ret->m_queue;
......@@ -280,9 +299,6 @@ std::shared_ptr<SharedQueueLock<Queue>> MemQueue<Request, Queue>::sharedAddToNew
// Update the cache stats in memory as we hold the queue.
specializedUpdateCachedQueueStats(queue);
double cacheUpdateTime = timer.secs(utils::Timer::resetCounter);
// The next update of the queue can now proceed
ANNOTATE_HAPPENS_BEFORE(promiseForSuccessor.get());
promiseForSuccessor->set_value();
double successorPromiseSetTime = timer.secs(utils::Timer::resetCounter);
// Log
size_t qJobsAfter=queue.dumpJobs().size();
......@@ -319,17 +335,6 @@ std::shared_ptr<SharedQueueLock<Queue>> MemQueue<Request, Queue>::sharedAddToNew
maqr->m_promise.set_value();
}
}
// We can now cleanup our promise/future couple if they were not picked up to trim the maps.
// A next thread finding them unlocked or absent will be equivalent.
globalLock.lock();
// If there is an promise AND it is ours, we remove it.
try {
if (g_promises.at(queueIndex).get() == promiseForSuccessor.get()) {
g_futures.erase(queueIndex);
g_promises.erase(queueIndex);
}
} catch (std::out_of_range &) {}
globalLock.unlock();
// Done!
return ret;
} catch (...) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment