Commit fa7eeb31 authored by Eric Cano's avatar Eric Cano
Browse files

Updated timing logging in SharedQueueLock<Queue, Request>::~SharedQueueLock().

parent f6e11ac5
......@@ -60,24 +60,31 @@ private:
template <class Queue, class Request>
SharedQueueLock<Queue, Request>::~SharedQueueLock() {
double waitTime = m_timer.secs(utils::Timer::resetCounter);
m_lock->release();
log::ScopedParamContainer params(m_logContext);
params.add("objectQueue", m_queue->getAddressIfSet())
.add("waitAndUnlockTime", m_timer.secs());
m_logContext.log(log::INFO, "In SharedQueueLock::~SharedQueueLock(): unlocked the archive queue pointer.");
double queueUnlockTime = m_timer.secs(utils::Timer::resetCounter);
// The next update of the queue can now proceed
ANNOTATE_HAPPENS_BEFORE(m_promiseForSuccessor.get());
m_promiseForSuccessor->set_value();
// We can now cleanup the promise/future couple if they were not picked up to trim the maps.
// A next thread finding them unlocked or absent will be equivalent.
threading::MutexLocker globalLock(MemQueue<Queue, Request>::g_mutex);
// If there is an promise AND it is ours, we remove it.
try {
if (MemQueue<Queue, Request>::g_promises.at(m_queueIndex).get() == m_promiseForSuccessor.get()) {
MemQueue<Queue, Request>::g_futures.erase(m_queueIndex);
MemQueue<Queue, Request>::g_promises.erase(m_queueIndex);
}
} catch (std::out_of_range &) {}
double successorUnlockTime = m_timer.secs(utils::Timer::resetCounter);
// We can now cleanup the promise/future couple if they were not picked up to trim the maps.
// A next thread finding them unlocked or absent will be equivalent.
threading::MutexLocker globalLock(MemQueue<Queue, Request>::g_mutex);
// If there is an promise AND it is ours, we remove it.
try {
if (MemQueue<Queue, Request>::g_promises.at(m_queueIndex).get() == m_promiseForSuccessor.get()) {
MemQueue<Queue, Request>::g_futures.erase(m_queueIndex);
MemQueue<Queue, Request>::g_promises.erase(m_queueIndex);
}
} catch (std::out_of_range &) {}
double inMemoryQueuesCleanupTime = m_timer.secs();
log::ScopedParamContainer params(m_logContext);
params.add("objectQueue", m_queue->getAddressIfSet())
.add("waitTime", waitTime)
.add("queueUnlockTime", queueUnlockTime)
.add("successorUnlockTime", successorUnlockTime)
.add("inMemoryQueuesCleanupTime", inMemoryQueuesCleanupTime);
m_logContext.log(log::INFO, "In SharedQueueLock::~SharedQueueLock(): unlocked the archive queue pointer.");
}
template <class Request, class Queue>
......@@ -299,7 +306,6 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share
// Update the cache stats in memory as we hold the queue.
specializedUpdateCachedQueueStats(queue);
double cacheUpdateTime = timer.secs(utils::Timer::resetCounter);
double successorPromiseSetTime = timer.secs(utils::Timer::resetCounter);
// Log
size_t qJobsAfter=queue.dumpJobs().size();
uint64_t qBytesAfter=0;
......@@ -318,10 +324,9 @@ std::shared_ptr<SharedQueueLock<Queue, Request>> MemQueue<Request, Queue>::share
.add("getFetchedQueueTime", getFetchedQueueTime)
.add("inMemoryQueueProcessTime", inMemoryQueueProcessTime)
.add("queueCommitTime", queueCommitTime)
.add("cacheUpdateTime", cacheUpdateTime)
.add("successorPromiseSetTime", successorPromiseSetTime)
.add("cacheUpdateTime", cacheUpdateTime)
.add("totalEnqueueTime", getFetchedQueueTime + inMemoryQueueProcessTime + queueCommitTime
+ cacheUpdateTime + successorPromiseSetTime + timer.secs());
+ cacheUpdateTime + timer.secs());
logContext.log(log::INFO, "In MemQueue::sharedAddToNewQueue(): added batch of jobs to the queue.");
}
// We will also count how much time we mutually wait for the other threads.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment