Commit 8012a02d authored by Eric Cano's avatar Eric Cano
Browse files

Reviewed promised based thread synchronisation

Fixed calls to promise::get_future() after possible access form other thread. They are now guaranteed to happen before.
Added helgrind annotations for promise based synchronisation.
Added macros enabling helgrind annotations for shared_ptr.
Added suppression for shared_ptr used inside other standard lib object and not covered by the previous macros.
Added unit test for lower level .
Added suppressions for reported race conditions in Rados library.
Review heavily MemArchiveQueue and fixed missing commit in object store, leading to potentially orphaned objects.
Enabled formerly disabled test as it is now fast enough.
parent 5310280d
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
// This file should be included before the relevant c++ headers. It will introduce proper helgrind annotations
// in all templated code (like shared pointers)
// See https://gcc.gnu.org/onlinedocs/libstdc++/manual/debug.html (Data race hunting)
#include <valgrind/helgrind.h>
#undef _GLIBCXX_SYNCHRONIZATION_HAPPENS_BEFORE
#define _GLIBCXX_SYNCHRONIZATION_HAPPENS_BEFORE(A) ANNOTATE_HAPPENS_BEFORE(A)
#undef _GLIBCXX_SYNCHRONIZATION_HAPPENS_AFTER
#define _GLIBCXX_SYNCHRONIZATION_HAPPENS_AFTER(A) ANNOTATE_HAPPENS_AFTER(A)
#undef _GLIBCXX_EXTERN_TEMPLATE
#define _GLIBCXX_EXTERN_TEMPLATE -1
......@@ -55,6 +55,7 @@ BuildRequires: json-c-devel >= 0.11
BuildRequires: libattr-devel >= 2.4.44
BuildRequires: oracle-instantclient12.1-devel
BuildRequires: valgrind
BuildRequires: valgrind-devel
%endif
# only build debug info if you're building the whole code
......
......@@ -25,6 +25,7 @@ namespace cta { namespace objectstore {
// AgentHeartbeatThread::stopAndWaitThread
//------------------------------------------------------------------------------
void AgentHeartbeatThread::stopAndWaitThread() {
ANNOTATE_HAPPENS_BEFORE(&m_exit);
m_exit.set_value();
wait();
}
......@@ -39,6 +40,8 @@ void AgentHeartbeatThread::run() {
while (std::future_status::ready != exitFuture.wait_for(m_heartRate)) {
m_agentReference.bumpHeatbeat(m_backend);
}
ANNOTATE_HAPPENS_AFTER(&m_exit);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_exit);
} catch (cta::exception::Exception & ex) {
log::ScopedParamContainer params(lc);
params.add("Message", ex.getMessageValue());
......
......@@ -16,6 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common/helgrind_annotator.hpp"
#include "AgentReference.hpp"
#include "Agent.hpp"
#include "common/exception/Errnum.hpp"
......@@ -52,6 +53,8 @@ AgentReference::AgentReference(const std::string & clientType) {
// happy, but not really needed
std::unique_lock<std::mutex> ulg(m_currentQueueMutex);
m_nextQueueExecutionPromise.reset(new std::promise<void>);
m_nextQueueExecutionFuture = m_nextQueueExecutionPromise->get_future();
ANNOTATE_HAPPENS_BEFORE(m_nextQueueExecutionPromise.get());
m_nextQueueExecutionPromise->set_value();
}
......@@ -111,32 +114,43 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
m_currentQueue->queue.push_back(action);
// If this is time to run, wake up the serving thread
if (m_currentQueue->queue.size() + 1 >= m_maxQueuedItems) {
ANNOTATE_HAPPENS_BEFORE(&m_currentQueue->promise);
m_currentQueue->promise.set_value();
m_currentQueue = nullptr;
}
// Get hold of the future before the promise gets a chance to be accessed
auto actionFuture=action->promise.get_future();
// Release the locks and wait for action execution
ulQueue.unlock();
ulGlobal.unlock();
action->promise.get_future().get();
actionFuture.get();
ANNOTATE_HAPPENS_AFTER(&action->promise);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&action->promise);
} else {
// There is not queue, so we need to create and serve it ourselves.
// There is no queue, so we need to create and serve it ourselves.
// To make sure there is no lifetime issues, we make it a shared_ptr
std::shared_ptr<ActionQueue> q(new ActionQueue);
// Lock the queue
std::unique_lock<std::mutex> ulq(q->mutex);
// Get it referenced
m_currentQueue = q;
// Get our execution promise and leave one behind
// Get our execution promise and future and leave one behind.
std::shared_ptr<std::promise<void>> promiseForThisQueue(m_nextQueueExecutionPromise);
// Leave a promise behind for the next queue
auto futureForThisQueue = std::move(m_nextQueueExecutionFuture);
// Leave a promise behind for the next queue, and set the future.
m_nextQueueExecutionPromise.reset(new std::promise<void>);
m_nextQueueExecutionFuture=m_nextQueueExecutionPromise->get_future();
// Keep a pointer to it, so we will signal our own completion to our successor queue.
std::shared_ptr<std::promise<void>> promiseForNextQueue = m_nextQueueExecutionPromise;
// Get future from promise before other thread gets a chance to touch the latter.
auto queueFuture=q->promise.get_future();
// We can now unlock the queue and the general lock: queuing is open.
ulq.unlock();
ulGlobal.unlock();
// We wait for time or size of queue
q->promise.get_future().wait_for(m_queueFlushTimeout);
queueFuture.wait_for(m_queueFlushTimeout);
ANNOTATE_HAPPENS_AFTER(&q->promise);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&q->promise);
// Make sure we are not listed anymore as the queue taking jobs (this would happen
// in case of timeout).
ulGlobal.lock();
......@@ -144,7 +158,9 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
m_currentQueue.reset();
ulGlobal.unlock();
// Wait for previous queue to complete
promiseForThisQueue->get_future().get();
futureForThisQueue.get();
ANNOTATE_HAPPENS_AFTER(promiseForThisQueue.get());
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(promiseForThisQueue.get());
// Make sure no leftover thread is still writing to the queue.
ulq.lock();
// Off we go! Add the actions to the queue
......@@ -155,28 +171,33 @@ void AgentReference::queueAndExecuteAction(std::shared_ptr<Action> action, objec
// First we apply our own modification
appyAction(*action, ag);
// Then those of other threads
for (auto a: q->queue)
for (auto a: q->queue) {
std::lock_guard<std::mutex> lg(a->mutex);
appyAction(*a, ag);
}
// and commit
ag.commit();
} catch (...) {
// Something wend wrong: , we release the next batch of changes
ANNOTATE_HAPPENS_BEFORE(promiseForNextQueue.get());
promiseForNextQueue->set_value();
// We now pass the exception to all threads
for (auto a: q->queue) {
std::lock_guard<std::mutex> lg(a->mutex);
ANNOTATE_HAPPENS_BEFORE(&a->promise);
a->promise.set_exception(std::current_exception());
}
// And to our own caller
throw;
}
// Things went well. We pass the token to the next queue
ANNOTATE_HAPPENS_BEFORE(promiseForNextQueue.get());
promiseForNextQueue->set_value();
// and release the other threads
for (auto & a: q->queue) {
for (auto a: q->queue) {
std::lock_guard<std::mutex> lg(a->mutex);
ANNOTATE_HAPPENS_BEFORE(&a->promise);
a->promise.set_value();
a.reset();
}
}
}
......
......@@ -18,6 +18,7 @@
#pragma once
#include "common/helgrind_annotator.hpp"
#include "objectstore/Backend.hpp"
#include <atomic>
#include <string>
......@@ -168,6 +169,10 @@ private:
* One at a time also minimize contention on the object store.
*/
std::shared_ptr<std::promise<void>> m_nextQueueExecutionPromise;
/**
* This future will be immediately extracted from the m_nextQueueExecutionPromise before any other thread touches it.
*/
std::future<void> m_nextQueueExecutionFuture;
const size_t m_maxQueuedItems = 100;
std::chrono::duration<uint64_t, std::milli> m_queueFlushTimeout = std::chrono::milliseconds(100);
};
......
......@@ -22,6 +22,7 @@
#include <sys/syscall.h>
#include <errno.h>
#include <unistd.h>
#include <valgrind/helgrind.h>
namespace cta { namespace objectstore {
......@@ -198,7 +199,8 @@ Backend::AsyncUpdater* BackendRados::asyncUpdate(const std::string & name, std::
return new AsyncUpdater(*this, name, update);
}
BackendRados::AsyncUpdater::AsyncUpdater(BackendRados& be, const std::string& name, std::function<std::string(const std::string&)>& update): m_backend(be), m_name(name), m_update(update) {
BackendRados::AsyncUpdater::AsyncUpdater(BackendRados& be, const std::string& name, std::function<std::string(const std::string&)>& update):
m_backend(be), m_name(name), m_update(update), m_job(), m_jobFuture(m_job.get_future()) {
try {
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(this, statCallback, nullptr);
// At construction time, we just fire a stat.
......@@ -209,6 +211,7 @@ BackendRados::AsyncUpdater::AsyncUpdater(BackendRados& be, const std::string& na
throw Backend::NoSuchObject(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
m_job.set_exception(std::current_exception());
}
}
......@@ -256,11 +259,13 @@ void BackendRados::AsyncUpdater::statCallback(librados::completion_t completion,
throw Backend::CouldNotFetch(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
)));
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
......@@ -309,11 +314,13 @@ void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion
throw Backend::CouldNotCommit(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
)));
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
......@@ -336,6 +343,7 @@ void BackendRados::AsyncUpdater::commitCallback(librados::completion_t completio
throw Backend::CouldNotUnlock(errnum.getMessageValue());
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
......@@ -350,14 +358,18 @@ void BackendRados::AsyncUpdater::unlockCallback(librados::completion_t completio
throw Backend::CouldNotUnlock(errnum.getMessageValue());
}
// Done!
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_value();
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_exception(std::current_exception());
}
}
void BackendRados::AsyncUpdater::wait() {
m_job.get_future().get();
m_jobFuture.get();
ANNOTATE_HAPPENS_AFTER(&m_job);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&m_job);
}
std::string BackendRados::Parameters::toStr() {
......
......@@ -102,6 +102,8 @@ public:
time_t date;
/** The promise that will both do the job and allow synchronization with the caller. */
std::promise<void> m_job;
/** The future from m_jobs, which will be extracted before any thread gets a chance to play with it. */
std::future<void> m_jobFuture;
/** A future used to hold the structure of the lock operation. It will be either empty of complete at
destruction time */
std::unique_ptr<std::future<void>> m_lockAsync;
......
......@@ -79,7 +79,7 @@ protected:
void checkReadable() {
if (!m_locksCount)
throw NotLocked("In ObjectOps::checkReadable: object not locked");
throw NotLocked("In ObjectOps::checkReadable: object not locked");
}
public:
......
......@@ -16,7 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common/helgrind_annotator.hpp"
#include "MemQueues.hpp"
#include "OStoreDB.hpp"
......@@ -29,141 +29,173 @@ std::map<std::string, std::shared_ptr<MemArchiveQueue>> MemArchiveQueue::g_queue
std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueue(objectstore::ArchiveRequest::JobDump& job,
objectstore::ArchiveRequest& archiveRequest, OStoreDB & oStoreDB, log::LogContext & logContext) {
// 1) Take the global lock (implicit in the constructor)
std::unique_lock<std::mutex> ul(g_mutex);
std::unique_lock<std::mutex> globalLock(g_mutex);
std::shared_ptr<MemArchiveQueue> q;
try {
// 2) Determine if the queue exists already or not
auto q = g_queues.at(job.tapePool);
// It does: we just ride the train: queue ourselves
std::unique_lock<std::mutex> ulq(q->m_mutex);
std::shared_ptr<MemArchiveQueueRequest> maqr(new MemArchiveQueueRequest(job, archiveRequest));
q->add(maqr);
// If there are already enough elements, signal to the initiating thread
if (q->m_requests.size() + 1 >= g_maxQueuedElements) {
// signal the initiating thread
q->m_promise.set_value();
// Unreference the queue so no new request gets added to it
g_queues.erase(job.tapePool);
}
// Release the locks
ulq.unlock();
ul.unlock();
// Wait for our request completion (this could throw, if there was a problem)
return maqr->m_promise.get_future().get();
q = g_queues.at(job.tapePool);
} catch (std::out_of_range &) {
utils::Timer timer;
// The queue for our tape pool does not exist. We will create it, wait for
// the necessary amount of time or requests and release it.
// Create the queue
std::shared_ptr<MemArchiveQueue> maq(new MemArchiveQueue);
// Reference it
g_queues[job.tapePool] = maq;
// Release the global list
ul.unlock();
// Wait for timeout or enough jobs.
maq->m_promise.get_future().wait_for(std::chrono::milliseconds(100));
// Re-take the global lock to make sure the queue is not referenced anymore,
// and the queue as well, to make sure the last user is gone.
ul.lock();
std::unique_lock<std::mutex> ulq(maq->m_mutex);
// Remove the entry for our tape pool iff it also has our pointer (a new
// queue could have been created in the mean time.
auto i = g_queues.find(job.tapePool);
if (i != g_queues.end() && (maq == i->second))
g_queues.erase(i);
// Our mem queue is now unreachable so we can let the global part go
ul.unlock();
double waitTime = timer.secs(utils::Timer::resetCounter);
// We can now proceed with the queuing of the jobs.
try {
std::shared_ptr<SharedQueueLock> ret(new SharedQueueLock(logContext));
ret->m_queue.reset(new objectstore::ArchiveQueue(oStoreDB.m_objectStore));
ret->m_lock.reset(new objectstore::ScopedExclusiveLock);
auto & aq = *ret->m_queue;
auto & aql = *ret->m_lock;
oStoreDB.getLockedAndFetchedArchiveQueue(aq, aql, job.tapePool);
size_t aqSizeBefore=aq.dumpJobs().size();
size_t addedJobs=1;
// First add the job for this thread
// The queue is not there. We will just create a new one.
return sharedAddToArchiveQueueWithNewQueue(job, archiveRequest, oStoreDB, logContext, globalLock);
}
// It does: we just ride the train: queue ourselves
// Lock the queue.
std::unique_lock<std::mutex> ulq(q->m_mutex);
std::shared_ptr<MemArchiveQueueRequest> maqr(new MemArchiveQueueRequest(job, archiveRequest));
// Extract the future before the other thread gets a chance to touch the promise.
auto resultFuture = maqr->m_promise.get_future();
q->add(maqr);
// If there are already enough elements, signal to the initiating thread
if (q->m_requests.size() + 1 >= g_maxQueuedElements) {
// signal the initiating thread
ANNOTATE_HAPPENS_BEFORE(&q->m_promise);
q->m_promise.set_value();
// Unreference the queue so no new request gets added to it
g_queues.erase(job.tapePool);
}
// Release the queue, forget the queue, and release the global lock
ulq.unlock();
q.reset();
globalLock.unlock();
// Wait for our request completion (this could throw, if there was a problem)
resultFuture.get();
auto ret=maqr->m_returnValue;
__attribute__((unused)) auto debugMaqr=maqr.get();
ANNOTATE_HAPPENS_AFTER(&maqr->m_promise);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&maqr->m_promise);
maqr.reset();
return ret;
}
std::shared_ptr<SharedQueueLock> MemArchiveQueue::sharedAddToArchiveQueueWithNewQueue(
objectstore::ArchiveRequest::JobDump& job, objectstore::ArchiveRequest& archiveRequest,
OStoreDB& oStoreDB, log::LogContext& logContext, std::unique_lock<std::mutex>& globalLock) {
utils::Timer timer;
// Re-check the queue is not there
if (g_queues.end() != g_queues.find(job.tapePool)) {
throw cta::exception::Exception("In MemArchiveQueue::sharedAddToArchiveQueueWithNewQueue(): the queue is present, while it should not!");
}
// Create the queue and reference it.
auto maq = (g_queues[job.tapePool] = std::make_shared<MemArchiveQueue>());
// Release the global list
// Get hold of the future before the promise could be touched
auto queueFuture=maq->m_promise.get_future();
globalLock.unlock();
// Wait for timeout or enough jobs.
queueFuture.wait_for(std::chrono::milliseconds(100));
ANNOTATE_HAPPENS_AFTER(&maq->m_promise);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&maq->m_promise);
// Re-take the global and make sure the queue is not referenced anymore.
globalLock.lock();
// Remove the entry for our tape pool iff it also has our pointer (a new
// queue could have been created in the mean time.
auto i = g_queues.find(job.tapePool);
if (i != g_queues.end() && (maq == i->second))
g_queues.erase(i);
// Our mem queue is now unreachable so we can let the global part go
globalLock.unlock();
// Lock the queue, to make sure the last user is done posting.
std::unique_lock<std::mutex> ulq(maq->m_mutex);
double waitTime = timer.secs(utils::Timer::resetCounter);
// We can now proceed with the queuing of the jobs in the object store.
try {
std::shared_ptr<SharedQueueLock> ret(new SharedQueueLock(logContext));
ret->m_queue.reset(new objectstore::ArchiveQueue(oStoreDB.m_objectStore));
ret->m_lock.reset(new objectstore::ScopedExclusiveLock);
auto & aq = *ret->m_queue;
auto & aql = *ret->m_lock;
oStoreDB.getLockedAndFetchedArchiveQueue(aq, aql, job.tapePool);
size_t aqSizeBefore=aq.dumpJobs().size();
size_t addedJobs=1;
// First add the job for this thread
{
auto af = archiveRequest.getArchiveFile();
aq.addJob(job, archiveRequest.getAddressIfSet(), af.archiveFileID,
af.fileSize, archiveRequest.getMountPolicy(), archiveRequest.getEntryLog().time);
// Back reference the queue in the job and archive request
job.ArchiveQueueAddress = aq.getAddressIfSet();
archiveRequest.setJobArchiveQueueAddress(job.copyNb, job.ArchiveQueueAddress);
archiveRequest.setJobOwner(job.copyNb, job.ArchiveQueueAddress);
}
// We do the same for all the queued requests
for (auto &maqr: maq->m_requests) {
// Add the job
auto af = maqr->m_archiveRequest.getArchiveFile();
aq.addJob(maqr->m_job, maqr->m_archiveRequest.getAddressIfSet(), af.archiveFileID,
af.fileSize, maqr->m_archiveRequest.getMountPolicy(),
maqr->m_archiveRequest.getEntryLog().time);
// Back reference the queue in the job and archive request
maqr->m_job.ArchiveQueueAddress = aq.getAddressIfSet();
maqr->m_archiveRequest.setJobArchiveQueueAddress(maqr->m_job.copyNb, aq.getAddressIfSet());
maqr->m_archiveRequest.setJobOwner(maqr->m_job.copyNb, aq.getAddressIfSet());
addedJobs++;
}
// We can now commit the multi-request addition to the object store
aq.commit();
size_t aqSizeAfter=aq.dumpJobs().size();
{
log::ScopedParamContainer params(logContext);
params.add("objectQueue", aq.getAddressIfSet())
.add("sizeBefore", aqSizeBefore)
.add("sizeAfter", aqSizeAfter)
.add("addedJobs", addedJobs)
.add("waitTime", waitTime)
.add("enqueueTime", timer.secs());
logContext.log(log::INFO, "In MemArchiveQueue::sharedAddToArchiveQueue(): added batch of jobs to the queue.");
}
// We will also count how much time we mutually wait for the other threads.
ret->m_timer.reset();
// And finally release all the user threads
for (auto &maqr: maq->m_requests) {
{
auto af = archiveRequest.getArchiveFile();
aq.addJob(job, archiveRequest.getAddressIfSet(), af.archiveFileID,
af.fileSize, archiveRequest.getMountPolicy(), archiveRequest.getEntryLog().time);
// Back reference the queue in the job and archive request
job.ArchiveQueueAddress = aq.getAddressIfSet();
archiveRequest.setJobArchiveQueueAddress(job.copyNb, aq.getAddressIfSet());
std::lock_guard<std::mutex> (maqr->m_mutex);
ANNOTATE_HAPPENS_BEFORE(&maqr->m_promise);
maqr->m_returnValue=ret;
maqr->m_promise.set_value();
}
// The do the same for all the queued requests
for (auto &maqr: maq->m_requests) {
// Add the job
auto af = maqr->m_archiveRequest.getArchiveFile();
aq.addJob(maqr->m_job, maqr->m_archiveRequest.getAddressIfSet(), af.archiveFileID,
af.fileSize, maqr->m_archiveRequest.getMountPolicy(),
maqr->m_archiveRequest.getEntryLog().time);
// Back reference the queue in the job and archive request
maqr->m_job.ArchiveQueueAddress = aq.getAddressIfSet();
maqr->m_archiveRequest.setJobArchiveQueueAddress(maqr->m_job.copyNb, aq.getAddressIfSet());
addedJobs++;
}
// We can now commit the multi-request addition to the object store
aq.commit();
size_t aqSizeAfter=aq.dumpJobs().size();
{
log::ScopedParamContainer params(logContext);
params.add("objectQueue", aq.getAddressIfSet())
.add("sizeBefore", aqSizeBefore)
.add("sizeAfter", aqSizeAfter)
.add("addedJobs", addedJobs)
.add("waitTime", waitTime)
.add("enqueueTime", timer.secs());
logContext.log(log::INFO, "In MemArchiveQueue::sharedAddToArchiveQueue(): added batch of jobs to the queue.");
}
// We will also count how much time we mutually wait for the other threads.
ret->m_timer.reset();
// And finally release all the user threads
for (auto &maqr: maq->m_requests) {
maqr->m_promise.set_value(ret);
}
// Done!
return ret;
}
// Done!
return ret;
} catch (...) {
try {
std::rethrow_exception(std::current_exception());
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer params(logContext);
params.add("message", ex.getMessageValue());
logContext.log(log::ERR, "In MemArchiveQueue::sharedAddToArchiveQueue(): got an exception writing. Will propagate to other threads.");
} catch (...) {
logContext.log(log::ERR, "In MemArchiveQueue::sharedAddToArchiveQueue(): got a non cta exption writing. Will propagate to other threads.");
}
size_t exceptionsNotPassed = 0;
// Something went wrong. We should inform the other threads
for (auto & maqr: maq->m_requests) {
try {
std::rethrow_exception(std::current_exception());
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer params(logContext);
params.add("message", ex.getMessageValue());
logContext.log(log::ERR, "In MemArchiveQueue::sharedAddToArchiveQueue(): got an exception writing. Will propagate to other threads.");
std::lock_guard<std::mutex> (maqr->m_mutex);
ANNOTATE_HAPPENS_BEFORE(&maqr->m_promise);
maqr->m_promise.set_exception(std::current_exception());
} catch (...) {
logContext.log(log::ERR, "In MemArchiveQueue::sharedAddToArchiveQueue(): got a non cta exption writing. Will propagate to other threads.");
exceptionsNotPassed++;
}
size_t exceptionsNotPassed = 0;
// Something went wrong. We should inform the other threads
for (auto & maqr: maq->m_requests) {
try {
maqr->m_promise.set_exception(std::current_exception());
} catch (...) {
exceptionsNotPassed++;
}
}
// And we inform the caller in our thread too
if (exceptionsNotPassed) {
try {
std::rethrow_exception(std::current_exception());
} catch (std::exception & ex) {
std::stringstream err;
err << "In MemArchiveQueue::sharedAddToArchiveQueue(), in main thread, failed to notify "
<< exceptionsNotPassed << " other threads out of " << maq->m_requests.size()
<< " : " << ex.what();
log::ScopedParamContainer params(logContext);
params.add("what", ex.what())
.add("exceptionsNotPassed", exceptionsNotPassed);
logContext.log(log::ERR, "In MemArchiveQueue::sharedAddToArchiveQueue(): Failed to propagate exceptions to other threads.");
}
// And we inform the caller in our thread too
if (exceptionsNotPassed) {
try {
std::rethrow_exception(std::current_exception());
} catch (std::exception & ex) {
std::stringstream err;
err << "In MemArchiveQueue::sharedAddToArchiveQueue(), in main thread, failed to notify "
<< exceptionsNotPassed << " other threads out of " << maq->m_requests.size()
<< " : " << ex.what();
log::ScopedParamContainer params(logContext);
params.add("what", ex.what())
.add("exceptionsNotPassed", exceptionsNotPassed);
logContext.log(log::ERR, "In MemArchiveQueue::sharedAddToArchiveQueue(): Failed to propagate exceptions to other threads.");
throw cta::exception::Exception(err.str());
}
} else
throw;
}
}
throw cta::exception::Exception(err.str());
}
} else
throw;
}
}
SharedQueueLock::~SharedQueueLock() {
......@@ -174,7 +206,7 @@ SharedQueueLock::~SharedQueueLock() {
m_logContext.log(log::INFO, "In SharedQueueLock::~SharedQueueLock(): unlocked the archive queue pointer.");
}