Skip to content
Snippets Groups Projects
Commit 8f3bc4c2 authored by Eric Cano's avatar Eric Cano
Browse files

Fixed delete-before user is gone race conditions.

This is done by replacing unique_ptr/bare pointer with 2 shared_ptr.
Also added suppressions for helgrind complains about rados.
parent 38e84a59
Branches
Tags
No related merge requests found
......@@ -70,7 +70,7 @@ void AgentReference::setQueueFlushTimeout(std::chrono::duration<uint64_t, std::m
}
void AgentReference::addToOwnership(const std::string& objectAddress, objectstore::Backend& backend) {
Action a{AgentOperation::Add, objectAddress, std::promise<void>()};
std::shared_ptr<Action> a (new Action(AgentOperation::Add, objectAddress));
queueAndExecuteAction(a, backend);
}
......@@ -83,7 +83,7 @@ void AgentReference::addBatchToOwnership(const std::list<std::string>& objectAdr
}
void AgentReference::removeFromOwnership(const std::string& objectAddress, objectstore::Backend& backend) {
Action a{AgentOperation::Remove, objectAddress, std::promise<void>()};
std::shared_ptr<Action> a (new Action(AgentOperation::Remove, objectAddress));
queueAndExecuteAction(a, backend);
}
......@@ -96,19 +96,19 @@ void AgentReference::removeBatchFromOwnership(const std::list<std::string>& obje
}
void AgentReference::bumpHeatbeat(objectstore::Backend& backend) {
Action a{AgentOperation::Heartbeat, "", std::promise<void>()};
std::shared_ptr<Action> a (new Action(AgentOperation::Heartbeat, ""));
queueAndExecuteAction(a, backend);
}
void AgentReference::queueAndExecuteAction(Action& action, objectstore::Backend& backend) {
void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objectstore::Backend& backend) {
// First, we need to determine if a queue exists or not.
// If so, we just use it, and if not, we create and serve it.
std::unique_lock<std::mutex> ulGlobal(m_currentQueueMutex);
if (m_currentQueue) {
// There is already a queue
std::unique_lock<std::mutex> ulQueue(m_currentQueue->mutex);
m_currentQueue->queue.push_back(&action);
m_currentQueue->queue.push_back(action);
// If this is time to run, wake up the serving thread
if (m_currentQueue->queue.size() + 1 >= m_maxQueuedItems) {
m_currentQueue->promise.set_value();
......@@ -117,30 +117,31 @@ void AgentReference::queueAndExecuteAction(Action& action, objectstore::Backend&
// Release the locks and wait for action execution
ulQueue.unlock();
ulGlobal.unlock();
action.promise.get_future().get();
action->promise.get_future().get();
} else {
// There is not queue, so we need to create and serve it ourselves
ActionQueue q;
// There is not queue, so we need to create and serve it ourselves.
// To make sure there is no lifetime issues, we make it a shared_ptr
std::shared_ptr<ActionQueue> q(new ActionQueue);
// Lock the queue
std::unique_lock<std::mutex> ulq(q.mutex);
std::unique_lock<std::mutex> ulq(q->mutex);
// Get it referenced
m_currentQueue = &q;
m_currentQueue = q;
// Get our execution promise and leave one behind
std::unique_ptr<std::promise<void>> promiseForThisQueue(std::move(m_nextQueueExecutionPromise));
std::shared_ptr<std::promise<void>> promiseForThisQueue(m_nextQueueExecutionPromise);
// Leave a promise behind for the next queue
m_nextQueueExecutionPromise.reset(new std::promise<void>);
// Keep a pointer to it, so we will signal our own completion to our successor queue.
std::promise<void> * promiseForNextQueue = m_nextQueueExecutionPromise.get();
std::shared_ptr<std::promise<void>> promiseForNextQueue = m_nextQueueExecutionPromise;
// We can now unlock the queue and the general lock: queuing is open.
ulq.unlock();
ulGlobal.unlock();
// We wait for time or size of queue
q.promise.get_future().wait_for(m_queueFlushTimeout);
// Make sure we are not listed anymore a the queue taking jobs (this would happen
// in case of timeout.
q->promise.get_future().wait_for(m_queueFlushTimeout);
// Make sure we are not listed anymore as the queue taking jobs (this would happen
// in case of timeout).
ulGlobal.lock();
if (m_currentQueue == &q)
m_currentQueue = nullptr;
if (m_currentQueue == q)
m_currentQueue.reset();
ulGlobal.unlock();
// Wait for previous queue to complete
promiseForThisQueue->get_future().get();
......@@ -152,9 +153,9 @@ void AgentReference::queueAndExecuteAction(Action& action, objectstore::Backend&
objectstore::ScopedExclusiveLock agl(ag);
ag.fetch();
// First we apply our own modification
appyAction(action, ag);
appyAction(*action, ag);
// Then those of other threads
for (auto a: q.queue)
for (auto a: q->queue)
appyAction(*a, ag);
// and commit
ag.commit();
......@@ -162,16 +163,21 @@ void AgentReference::queueAndExecuteAction(Action& action, objectstore::Backend&
// Something wend wrong: , we release the next batch of changes
promiseForNextQueue->set_value();
// We now pass the exception to all threads
for (auto a: q.queue)
for (auto a: q->queue) {
std::lock_guard<std::mutex> lg(a->mutex);
a->promise.set_exception(std::current_exception());
}
// And to our own caller
throw;
}
// Things went well. We pass the token to the next queue
promiseForNextQueue->set_value();
// and release the other threads
for (auto a: q.queue)
for (auto & a: q->queue) {
std::lock_guard<std::mutex> lg(a->mutex);
a->promise.set_value();
a.reset();
}
}
}
......
......@@ -116,9 +116,20 @@ private:
* An operation with its parameter and promise
*/
struct Action {
Action(AgentOperation op, const std::string & objectAddress): op(op), objectAddress(objectAddress) {}
AgentOperation op;
const std::string & objectAddress;
std::promise<void> promise;
/***
* A mutex ensuring the object will not be released before the promise's result
* is fully pushed.
*/
std::mutex mutex;
~Action() {
// The setting of promise result will be protected by this mutex, so destruction
// will only happen after promise setting is complete.
std::lock_guard<std::mutex> lm(mutex);
}
};
/**
......@@ -126,7 +137,7 @@ private:
*/
struct ActionQueue {
std::mutex mutex;
std::list<Action *> queue;
std::list<std::shared_ptr<Action>> queue;
std::promise<void> promise;
};
......@@ -143,10 +154,10 @@ private:
* similar to queueing in ArchiveQueues and RetrieveQeueues.
* @param action the action
*/
void queueAndExecuteAction(Action& action, objectstore::Backend& backend);
void queueAndExecuteAction(std::shared_ptr<Action> action, objectstore::Backend& backend);
std::mutex m_currentQueueMutex;
ActionQueue * m_currentQueue = nullptr;
std::shared_ptr<ActionQueue> m_currentQueue;
/**
* This pointer holds a promise that will be picked up by the thread managing
* the a queue in memory (promise(n)). The same thread will leave a fresh promise
......@@ -156,7 +167,7 @@ private:
* This will ensure that the queues will be flushed in order, one at a time.
* One at a time also minimize contention on the object store.
*/
std::unique_ptr<std::promise<void>> m_nextQueueExecutionPromise;
std::shared_ptr<std::promise<void>> m_nextQueueExecutionPromise;
const size_t m_maxQueuedItems = 100;
std::chrono::duration<uint64_t, std::milli> m_queueFlushTimeout = std::chrono::milliseconds(100);
};
......
......@@ -24,7 +24,7 @@ namespace cta { namespace ostoredb {
std::mutex MemArchiveQueue::g_mutex;
std::map<std::string, MemArchiveQueue *> MemArchiveQueue::g_queues;
std::map<std::string, std::shared_ptr<MemArchiveQueue>> MemArchiveQueue::g_queues;
std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump& job,
objectstore::ArchiveRequest& archiveRequest, OStoreDB & oStoreDB, log::LogContext & logContext) {
......@@ -32,15 +32,15 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(object
std::unique_lock<std::mutex> ul(g_mutex);
try {
// 2) Determine if the queue exists already or not
auto & q = *g_queues.at(job.tapePool);
auto q = g_queues.at(job.tapePool);
// It does: we just ride the train: queue ourselves
std::unique_lock<std::mutex> ulq(q.m_mutex);
MemArchiveQueueRequest maqr(job, archiveRequest);
q.add(maqr);
std::unique_lock<std::mutex> ulq(q->m_mutex);
std::shared_ptr<MemArchiveQueueRequest> maqr(new MemArchiveQueueRequest(job, archiveRequest));
q->add(maqr);
// If there are already enough elements, signal to the initiating thread
if (q.m_requests.size() + 1 >= g_maxQueuedElements) {
if (q->m_requests.size() + 1 >= g_maxQueuedElements) {
// signal the initiating thread
q.m_promise.set_value();
q->m_promise.set_value();
// Unreference the queue so no new request gets added to it
g_queues.erase(job.tapePool);
}
......@@ -48,27 +48,27 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(object
ulq.unlock();
ul.unlock();
// Wait for our request completion (this could throw, if there was a problem)
return maqr.m_promise.get_future().get();
return maqr->m_promise.get_future().get();
} catch (std::out_of_range &) {
utils::Timer timer;
// The queue for our tape pool does not exist. We will create it, wait for
// the necessary amount of time or requests and release it.
// Create the queue
MemArchiveQueue maq;
std::shared_ptr<MemArchiveQueue> maq(new MemArchiveQueue);
// Reference it
g_queues[job.tapePool] = &maq;
g_queues[job.tapePool] = maq;
// Release the global list
ul.unlock();
// Wait for timeout or enough jobs.
maq.m_promise.get_future().wait_for(std::chrono::milliseconds(100));
maq->m_promise.get_future().wait_for(std::chrono::milliseconds(100));
// Re-take the global lock to make sure the queue is not referenced anymore,
// and the queue as well, to make sure the last user is gone.
ul.lock();
std::unique_lock<std::mutex> ulq(maq.m_mutex);
std::unique_lock<std::mutex> ulq(maq->m_mutex);
// Remove the entry for our tape pool iff it also has our pointer (a new
// queue could have been created in the mean time.
auto i = g_queues.find(job.tapePool);
if (i != g_queues.end() && (&maq == i->second))
if (i != g_queues.end() && (maq == i->second))
g_queues.erase(i);
// Our mem queue is now unreachable so we can let the global part go
ul.unlock();
......@@ -93,7 +93,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(object
archiveRequest.setJobArchiveQueueAddress(job.copyNb, aq.getAddressIfSet());
}
// The do the same for all the queued requests
for (auto &maqr: maq.m_requests) {
for (auto &maqr: maq->m_requests) {
// Add the job
auto af = maqr->m_archiveRequest.getArchiveFile();
aq.addJob(maqr->m_job, maqr->m_archiveRequest.getAddressIfSet(), af.archiveFileID,
......@@ -120,7 +120,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(object
// We will also count how much time we mutually wait for the other threads.
ret->m_timer.reset();
// And finally release all the user threads
for (auto &maqr: maq.m_requests) {
for (auto &maqr: maq->m_requests) {
maqr->m_promise.set_value(ret);
}
// Done!
......@@ -137,7 +137,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(object
}
size_t exceptionsNotPassed = 0;
// Something went wrong. We should inform the other threads
for (auto & maqr: maq.m_requests) {
for (auto & maqr: maq->m_requests) {
try {
maqr->m_promise.set_exception(std::current_exception());
} catch (...) {
......@@ -151,7 +151,7 @@ std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(object
} catch (std::exception & ex) {
std::stringstream err;
err << "In MemArchiveQueue::sharedAddToArchiveQueue(), in main thread, failed to notify "
<< exceptionsNotPassed << " other threads out of " << maq.m_requests.size()
<< exceptionsNotPassed << " other threads out of " << maq->m_requests.size()
<< " : " << ex.what();
log::ScopedParamContainer params(logContext);
params.add("what", ex.what())
......@@ -174,8 +174,8 @@ SharedQueueLock::~SharedQueueLock() {
m_logContext.log(log::INFO, "In SharedQueueLock::~SharedQueueLock(): unlocked the archive queue pointer.");
}
void MemArchiveQueue::add(MemArchiveQueueRequest& request) {
m_requests.emplace_back(&request);
void MemArchiveQueue::add(std::shared_ptr<MemArchiveQueueRequest> request) {
m_requests.emplace_back(request);
}
......
......@@ -86,14 +86,14 @@ private:
/** Mutex that should be locked before attempting any operation */
std::mutex m_mutex;
/** Add the object */
void add(MemArchiveQueueRequest & request);
void add(std::shared_ptr<MemArchiveQueueRequest> request);
/** Static function implementing the shared addition of archive requests to
* the object store queue */
static const size_t g_maxQueuedElements = 100;
std::list<MemArchiveQueueRequest *> m_requests;
std::list<std::shared_ptr<MemArchiveQueueRequest>> m_requests;
std::promise<void> m_promise;
static std::mutex g_mutex;
static std::map<std::string, MemArchiveQueue *> g_queues;
static std::map<std::string, std::shared_ptr<MemArchiveQueue>> g_queues;
};
}} // namespace cta::ostoreDBUtils
\ No newline at end of file
......@@ -355,4 +355,43 @@
fun:_ZNSt14__shared_countILN9__gnu_cxx12_Lock_policyE2EED1Ev
fun:_ZNSt12__shared_ptrINSt13__future_base11_State_baseELN9__gnu_cxx12_Lock_policyE2EED1Ev
fun:_ZNSt10shared_ptrINSt13__future_base11_State_baseEED1Ev
}
\ No newline at end of file
}
{
Rados_context_create
Helgrind:Race
...
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/libstdc++.so.6.0.19
fun:mythread_wrapper
fun:start_thread
fun:clone
}
{
Rados_context_create_2
Helgrind:Race
...
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/librados.so.2.0.0
fun:_ZN8librados11RadosClient7connectEv
fun:_ZN3cta11objectstore12BackendRadosC1ERKSsS3_S3_
...
}
{
Rados_context_create_3
Helgrind:Race
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/librados.so.2.0.0
fun:_ZN7Context8completeEi
obj:/usr/lib64/librados.so.2.0.0
obj:/usr/lib64/librados.so.2.0.0
...
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment