Skip to content
Snippets Groups Projects
Commit 2dcfa894 authored by Eric Cano's avatar Eric Cano
Browse files

Ported improvements of Helpers::getLockedAndFetchedQueue<ArchiveQueue>() to...

Ported improvements of Helpers::getLockedAndFetchedQueue<ArchiveQueue>() to Helpers::getLockedAndFetchedQueue<RetrieveQueue>().

Also used lockfree in initial access to root.
parent 68828fc3
No related branches found
No related tags found
No related merge requests found
......@@ -41,9 +41,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
// Try and find an existing one first, create if needed
Backend & be = archiveQueue.m_objectStore;
for (size_t i=0; i<5; i++) {
double rootLockSharedTime = 0;
double rootFetchTime = 0;
double rootUnlockSharedTime = 0;
double rootFetchNoLockTime = 0;
double rootRelockExclusiveTime = 0;
double rootUnlockExclusiveTime = 0;
double rootRefetchTime = 0;
......@@ -52,16 +50,12 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
double queueFetchTime = 0;
utils::Timer t;
{
RootEntry re (be);
ScopedSharedLock rel(re);
rootLockSharedTime = t.secs(utils::Timer::resetCounter);
re.fetch();
rootFetchTime = t.secs(utils::Timer::resetCounter);
RootEntry re(be);
re.fetchNoLock();
rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
try {
archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool));
} catch (cta::exception::Exception & ex) {
rel.release();
rootUnlockSharedTime = t.secs(utils::Timer::resetCounter);
ScopedExclusiveLock rexl(re);
rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter);
re.fetch();
......@@ -70,9 +64,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter);
}
}
if (!rootUnlockSharedTime)
rootUnlockSharedTime = t.secs(utils::Timer::resetCounter);
else
if (rootRelockExclusiveTime)
rootUnlockExclusiveTime = t.secs(utils::Timer::resetCounter);
try {
archiveQueueLock.lock(archiveQueue);
......@@ -82,9 +74,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("queueObject", archiveQueue.getAddressIfSet())
.add("rootLockSharedTime", rootLockSharedTime)
.add("rootFetchTime", rootFetchTime)
.add("rootUnlockSharedTime", rootUnlockSharedTime)
.add("rootFetchNoLockTime", rootFetchNoLockTime)
.add("rootRelockExclusiveTime", rootRelockExclusiveTime)
.add("rootRefetchTime", rootRefetchTime)
.add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
......@@ -105,9 +95,7 @@ void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
params.add("attemptNb", i+1)
.add("exceptionMessage", ex.getMessageValue())
.add("queueObject", archiveQueue.getAddressIfSet())
.add("rootLockSharedTime", rootLockSharedTime)
.add("rootFetchTime", rootFetchTime)
.add("rootUnlockSharedTime", rootUnlockSharedTime)
.add("rootFetchNoLockTime", rootFetchNoLockTime)
.add("rootRefetchTime", rootRefetchTime)
.add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
.add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
......@@ -145,31 +133,79 @@ void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQue
// Try and find an existing one first, create if needed
Backend & be = retrieveQueue.m_objectStore;
for (size_t i=0; i<5; i++) {
double rootFetchNoLockTime = 0;
double rootRelockExclusiveTime = 0;
double rootUnlockExclusiveTime = 0;
double rootRefetchTime = 0;
double addOrGetQueueandCommitTime = 0;
double queueLockTime = 0;
double queueFetchTime = 0;
utils::Timer t;
{
RootEntry re (be);
ScopedSharedLock rel(re);
re.fetch();
re.fetchNoLock();
rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
try {
retrieveQueue.setAddress(re.getRetrieveQueueAddress(vid));
} catch (cta::exception::Exception & ex) {
rel.release();
ScopedExclusiveLock rexl(re);
rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter);
re.fetch();
rootRefetchTime = t.secs(utils::Timer::resetCounter);
retrieveQueue.setAddress(re.addOrGetRetrieveQueueAndCommit(vid, agentReference));
addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter);
}
}
if (rootRelockExclusiveTime)
rootUnlockExclusiveTime = t.secs(utils::Timer::resetCounter);
try {
retrieveQueueLock.lock(retrieveQueue);
queueLockTime = t.secs(utils::Timer::resetCounter);
retrieveQueue.fetch();
queueFetchTime = t.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("queueObject", retrieveQueue.getAddressIfSet())
.add("rootFetchNoLockTime", rootFetchNoLockTime)
.add("rootRelockExclusiveTime", rootRelockExclusiveTime)
.add("rootRefetchTime", rootRefetchTime)
.add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
.add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
.add("queueLockTime", queueLockTime)
.add("queueFetchTime", queueFetchTime);
lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): Successfully found and locked a retrieve queue.");
return;
} catch (cta::exception::Exception & ex) {
// We have a (rare) opportunity for a race condition, where we identify the
// queue and it gets deleted before we manage to lock it.
// The locking of fetching will fail in this case.
// We hence allow ourselves to retry a couple times.
// We also need to make sure the lock on the queue is released (it is in
// an object and hence not scoped).
if (retrieveQueueLock.isLocked()) retrieveQueueLock.release();
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("exceptionMessage", ex.getMessageValue())
.add("queueObject", retrieveQueue.getAddressIfSet())
.add("rootFetchNoLockTime", rootFetchNoLockTime)
.add("rootRefetchTime", rootRefetchTime)
.add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
.add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
.add("queueLockTime", queueLockTime)
.add("queueFetchTime", queueFetchTime);
lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): failed to fetch an existing queue. Retrying.");
retrieveQueue.resetAddress();
continue;
} catch (...) {
// Also release the lock if needed here.
if (retrieveQueueLock.isLocked()) retrieveQueueLock.release();
retrieveQueue.resetAddress();
throw;
}
}
// Also release the lock if needed here.
if (retrieveQueueLock.isLocked()) retrieveQueueLock.release();
retrieveQueue.resetAddress();
throw cta::exception::Exception(std::string(
"In OStoreDB::getLockedAndFetchedRetrieveQueue(): failed to find or create and lock archive queue after 5 retries for vid: ")
+ vid);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment