Helpers.cpp 23.69 KiB
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Helpers.hpp"
#include "Backend.hpp"
#include "ArchiveQueue.hpp"
#include "AgentReference.hpp"
#include "RetrieveQueue.hpp"
#include "RootEntry.hpp"
#include "DriveRegister.hpp"
#include "DriveState.hpp"
#include "catalogue/Catalogue.hpp"
#include "common/exception/NonRetryableError.hpp"
#include <random>
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// Helpers::getLockedAndFetchedArchiveQueue()
//------------------------------------------------------------------------------
template <>
void Helpers::getLockedAndFetchedQueue<ArchiveQueue>(ArchiveQueue& archiveQueue,
ScopedExclusiveLock& archiveQueueLock, AgentReference & agentReference,
const std::string& tapePool, log::LogContext & lc) {
// TODO: if necessary, we could use a singleton caching object here to accelerate
// lookups.
// Getting a locked AQ is the name of the game.
// Try and find an existing one first, create if needed
Backend & be = archiveQueue.m_objectStore;
for (size_t i=0; i<5; i++) {
double rootFetchNoLockTime = 0;
double rootRelockExclusiveTime = 0;
double rootUnlockExclusiveTime = 0;
double rootRefetchTime = 0;
double addOrGetQueueandCommitTime = 0;
double queueLockTime = 0;
double queueFetchTime = 0;
utils::Timer t;
{
RootEntry re(be);
re.fetchNoLock();
rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
try {
archiveQueue.setAddress(re.getArchiveQueueAddress(tapePool));
} catch (cta::exception::Exception & ex) {
ScopedExclusiveLock rexl(re);
rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter);
re.fetch();
rootRefetchTime = t.secs(utils::Timer::resetCounter);
archiveQueue.setAddress(re.addOrGetArchiveQueueAndCommit(tapePool, agentReference, lc));
addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter);
}
}
if (rootRelockExclusiveTime)
rootUnlockExclusiveTime = t.secs(utils::Timer::resetCounter);
try {
archiveQueueLock.lock(archiveQueue);
queueLockTime = t.secs(utils::Timer::resetCounter);
archiveQueue.fetch();
queueFetchTime = t.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("queueObject", archiveQueue.getAddressIfSet())
.add("rootFetchNoLockTime", rootFetchNoLockTime)
.add("rootRelockExclusiveTime", rootRelockExclusiveTime)
.add("rootRefetchTime", rootRefetchTime)
.add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
.add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
.add("queueLockTime", queueLockTime)
.add("queueFetchTime", queueFetchTime);
lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<ArchiveQueue>(): Successfully found and locked an archive queue.");
return;
} catch (cta::exception::Exception & ex) {
// We have a (rare) opportunity for a race condition, where we identify the
// queue and it gets deleted before we manage to lock it.
// The locking of fetching will fail in this case.
// We hence allow ourselves to retry a couple times.
// We also need to make sure the lock on the queue is released (it is in
// an object and hence not scoped).
if (archiveQueueLock.isLocked()) archiveQueueLock.release();
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("exceptionMessage", ex.getMessageValue())
.add("queueObject", archiveQueue.getAddressIfSet())
.add("rootFetchNoLockTime", rootFetchNoLockTime)
.add("rootRefetchTime", rootRefetchTime)
.add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
.add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
.add("queueLockTime", queueLockTime)
.add("queueFetchTime", queueFetchTime);
lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<ArchiveQueue>(): failed to fetch an existing queue. Retrying.");
archiveQueue.resetAddress();
continue;
} catch (...) {
// Also release the lock if needed here.
if (archiveQueueLock.isLocked()) archiveQueueLock.release();
archiveQueue.resetAddress();
throw;
}
}
// Also release the lock if needed here.
if (archiveQueueLock.isLocked()) archiveQueueLock.release();
archiveQueue.resetAddress();
throw cta::exception::Exception(std::string(
"In OStoreDB::getLockedAndFetchedArchiveQueue(): failed to find or create and lock archive queue after 5 retries for tapepool: ")
+ tapePool);
}
//------------------------------------------------------------------------------
// Helpers::getLockedAndFetchedRetrieveQueue()
//------------------------------------------------------------------------------
template <>
void Helpers::getLockedAndFetchedQueue<RetrieveQueue>(RetrieveQueue& retrieveQueue,
ScopedExclusiveLock& retrieveQueueLock, AgentReference& agentReference,
const std::string& vid, log::LogContext & lc) {
// TODO: if necessary, we could use a singleton caching object here to accelerate
// lookups.
// Getting a locked AQ is the name of the game.
// Try and find an existing one first, create if needed
Backend & be = retrieveQueue.m_objectStore;
for (size_t i=0; i<5; i++) {
double rootFetchNoLockTime = 0;
double rootRelockExclusiveTime = 0;
double rootUnlockExclusiveTime = 0;
double rootRefetchTime = 0;
double addOrGetQueueandCommitTime = 0;
double queueLockTime = 0;
double queueFetchTime = 0;
utils::Timer t;
{
RootEntry re (be);
re.fetchNoLock();
rootFetchNoLockTime = t.secs(utils::Timer::resetCounter);
try {
retrieveQueue.setAddress(re.getRetrieveQueueAddress(vid));
} catch (cta::exception::Exception & ex) {
ScopedExclusiveLock rexl(re);
rootRelockExclusiveTime = t.secs(utils::Timer::resetCounter);
re.fetch();
rootRefetchTime = t.secs(utils::Timer::resetCounter);
retrieveQueue.setAddress(re.addOrGetRetrieveQueueAndCommit(vid, agentReference));
addOrGetQueueandCommitTime = t.secs(utils::Timer::resetCounter);
}
}
if (rootRelockExclusiveTime)
rootUnlockExclusiveTime = t.secs(utils::Timer::resetCounter);
try {
retrieveQueueLock.lock(retrieveQueue);
queueLockTime = t.secs(utils::Timer::resetCounter);
retrieveQueue.fetch();
queueFetchTime = t.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("queueObject", retrieveQueue.getAddressIfSet())
.add("rootFetchNoLockTime", rootFetchNoLockTime)
.add("rootRelockExclusiveTime", rootRelockExclusiveTime)
.add("rootRefetchTime", rootRefetchTime)
.add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
.add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
.add("queueLockTime", queueLockTime)
.add("queueFetchTime", queueFetchTime);
lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): Successfully found and locked a retrieve queue.");
return;
} catch (cta::exception::Exception & ex) {
// We have a (rare) opportunity for a race condition, where we identify the
// queue and it gets deleted before we manage to lock it.
// The locking of fetching will fail in this case.
// We hence allow ourselves to retry a couple times.
// We also need to make sure the lock on the queue is released (it is in
// an object and hence not scoped).
if (retrieveQueueLock.isLocked()) retrieveQueueLock.release();
log::ScopedParamContainer params(lc);
params.add("attemptNb", i+1)
.add("exceptionMessage", ex.getMessageValue())
.add("queueObject", retrieveQueue.getAddressIfSet())
.add("rootFetchNoLockTime", rootFetchNoLockTime)
.add("rootRefetchTime", rootRefetchTime)
.add("addOrGetQueueandCommitTime", addOrGetQueueandCommitTime)
.add("rootUnlockExclusiveTime", rootUnlockExclusiveTime)
.add("queueLockTime", queueLockTime)
.add("queueFetchTime", queueFetchTime);
lc.log(log::INFO, "In Helpers::getLockedAndFetchedQueue<RetrieveQueue>(): failed to fetch an existing queue. Retrying.");
retrieveQueue.resetAddress();
continue;
} catch (...) {
// Also release the lock if needed here.
if (retrieveQueueLock.isLocked()) retrieveQueueLock.release();
retrieveQueue.resetAddress();
throw;
}
}
// Also release the lock if needed here.
if (retrieveQueueLock.isLocked()) retrieveQueueLock.release();
retrieveQueue.resetAddress();
throw cta::exception::Exception(std::string(
"In OStoreDB::getLockedAndFetchedRetrieveQueue(): failed to find or create and lock archive queue after 5 retries for vid: ")
+ vid);
}
//------------------------------------------------------------------------------
// Helpers::selectBestRetrieveQueue()
//------------------------------------------------------------------------------
std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candidateVids, cta::catalogue::Catalogue & catalogue,
objectstore::Backend & objectstore) {
// We will build the retrieve stats of the non-disable candidate vids here
std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStats;
// A promise we create so we can make users wait on it.
// Take the global lock
cta::threading::MutexLocker grqsmLock(g_retrieveQueueStatisticsMutex);
// Create a promise just in case
// Find the vids to be fetched (if any).
for (auto & v: candidateVids) {
try {
// Out of range or outdated will be updated the same way.
// If an update is in progress, we wait on it, and get the result after.
// We have to release the global lock while doing so.
if (g_retrieveQueueStatistics.at(v).updating) {
// Cache is updating, we wait on update.
auto updateFuture = g_retrieveQueueStatistics.at(v).updateFuture;
grqsmLock.unlock();
updateFuture.wait();
grqsmLock.lock();
if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled) {
candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
}
} else {
// We have a cache hit, check it's not stale.
if (g_retrieveQueueStatistics.at(v).updateTime + c_retrieveQueueCacheMaxAge > time(nullptr))
throw std::out_of_range("");
// We're lucky: cache hit (and not stale)
if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled)
candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
}
} catch (std::out_of_range) {
// We need to update the entry in the cache (miss or stale, we handle the same way).
// We just update one vid at a time as doing several in parallel would be quite
// hairy lock-wise (but give a slight performance boost).
g_retrieveQueueStatistics[v].updating = true;
std::promise<void> updatePromise;
g_retrieveQueueStatistics[v].updateFuture = updatePromise.get_future();
// Give other threads a chance to access the cache for other vids.
grqsmLock.unlock();
// Get the informations (stages, so we don't access the global variable without the mutex.
auto tapeStatus=catalogue.getTapesByVid({v});
// Build a minimal service retrieve file queue criteria to query queues.
common::dataStructures::RetrieveFileQueueCriteria rfqc;
rfqc.archiveFile.tapeFiles[1].vid=v;
auto queuesStats=Helpers::getRetrieveQueueStatistics(rfqc, {v}, objectstore);
// We now have the data we need. Update the cache.
grqsmLock.lock();
g_retrieveQueueStatistics[v].updating=false;
g_retrieveQueueStatistics[v].updateFuture=std::shared_future<void>();
// Check we got the expected vid (and size of stats).
if (queuesStats.size()!=1)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for queueStats.");
if (queuesStats.front().vid!=v)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in queueStats.");
if (tapeStatus.size()!=1)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for tapeStatus.");
if (tapeStatus.begin()->first!=v)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in tapeStatus.");
g_retrieveQueueStatistics[v].stats = queuesStats.front();
g_retrieveQueueStatistics[v].tapeStatus = tapeStatus.at(v);
// Signal to potential waiters
updatePromise.set_value();
// Update our own candidate list if needed.
if(!g_retrieveQueueStatistics.at(v).tapeStatus.disabled)
candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
}
}
// We now have all the candidates listed (if any).
if (candidateVidsStats.empty())
throw NoTapeAvailableForRetrieve("In Helpers::selectBestRetrieveQueue(): no tape available to recall from.");
// Sort the tapes.
candidateVidsStats.sort(SchedulerDatabase::RetrieveQueueStatistics::leftGreaterThanRight);
// Get a list of equivalent best tapes
std::set<std::string> shortlistVids;
for (auto & s: candidateVidsStats) {
if (!(s<candidateVidsStats.front()) && !(s>candidateVidsStats.front()))
shortlistVids.insert(s.vid);
}
// If there is only one best tape, we're done
if (shortlistVids.size()==1) return *shortlistVids.begin();
// There are several equivalent entries, choose randomly among them.
// First element will always be selected.
// We need to get a random number [0, candidateVids.size() -1]
std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count());
std::uniform_int_distribution<size_t> distribution(0, candidateVids.size() -1);
size_t index=distribution(dre);
auto it=candidateVids.cbegin();
std::advance(it, index);
return *it;
}
//------------------------------------------------------------------------------
// Helpers::updateRetrieveQueueStatisticsCache()
//------------------------------------------------------------------------------
void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_t files, uint64_t bytes, uint64_t priority) {
// We will not update the status of the tape if we already cached it (caller did not check),
// We will also not update the update time, to force an update after a while.
// If we update the entry while another thread is updating it, this is harmless (cache users will
// anyway wait, and just not profit from our update.
threading::MutexLocker ml(g_retrieveQueueStatisticsMutex);
try {
g_retrieveQueueStatistics.at(vid).stats.filesQueued=files;
g_retrieveQueueStatistics.at(vid).stats.bytesQueued=bytes;
g_retrieveQueueStatistics.at(vid).stats.currentPriority = priority;
} catch (std::out_of_range &) {
// The entry is missing. We just create it.
g_retrieveQueueStatistics[vid].stats.bytesQueued=bytes;
g_retrieveQueueStatistics[vid].stats.filesQueued=files;
g_retrieveQueueStatistics[vid].stats.currentPriority=priority;
g_retrieveQueueStatistics[vid].stats.vid=vid;
g_retrieveQueueStatistics[vid].tapeStatus.disabled=false;
g_retrieveQueueStatistics[vid].tapeStatus.full=false;
g_retrieveQueueStatistics[vid].updating = false;
g_retrieveQueueStatistics[vid].updateTime = time(nullptr);
}
}
//------------------------------------------------------------------------------
// Helpers::g_retrieveQueueStatistics
//------------------------------------------------------------------------------
std::map<std::string, Helpers::RetrieveQueueStatisticsWithTime> Helpers::g_retrieveQueueStatistics;
//------------------------------------------------------------------------------
// Helpers::g_retrieveQueueStatisticsMutex
//------------------------------------------------------------------------------
cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex;
//------------------------------------------------------------------------------
// Helpers::getRetrieveQueueStatistics()
//------------------------------------------------------------------------------
std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueStatistics(
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string>& vidsToConsider,
objectstore::Backend & objectstore) {
std::list<SchedulerDatabase::RetrieveQueueStatistics> ret;
// Find the retrieve queues for each vid if they exist (absence is possible).
RootEntry re(objectstore);
ScopedSharedLock rel(re);
re.fetch();
rel.release();
for (auto &tf:criteria.archiveFile.tapeFiles) {
if (!vidsToConsider.count(tf.second.vid))
continue;
std::string rqAddr;
try {
std::string rqAddr = re.getRetrieveQueueAddress(tf.second.vid);
} catch (cta::exception::Exception &) {
ret.push_back(SchedulerDatabase::RetrieveQueueStatistics());
ret.back().vid=tf.second.vid;
ret.back().bytesQueued=0;
ret.back().currentPriority=0;
ret.back().filesQueued=0;
continue;
}
RetrieveQueue rq(rqAddr, objectstore);
ScopedSharedLock rql(rq);
rq.fetch();
rql.release();
if (rq.getVid() != tf.second.vid)
throw cta::exception::Exception("In OStoreDB::getRetrieveQueueStatistics(): unexpected vid for retrieve queue");
ret.push_back(SchedulerDatabase::RetrieveQueueStatistics());
ret.back().vid=rq.getVid();
ret.back().currentPriority=rq.getJobsSummary().priority;
ret.back().bytesQueued=rq.getJobsSummary().bytes;
ret.back().filesQueued=rq.getJobsSummary().files;
}
return ret;
}
//------------------------------------------------------------------------------
// Helpers::getLockedAndFetchedDriveState()
//------------------------------------------------------------------------------
void Helpers::getLockedAndFetchedDriveState(DriveState& driveState, ScopedExclusiveLock& driveStateLock,
AgentReference& agentReference, const std::string& driveName, log::LogContext& lc, CreateIfNeeded doCreate) {
Backend & be = driveState.m_objectStore;
// Try and get the location of the derive state lockfree (this should be most of the cases).
try {
RootEntry re(be);
re.fetchNoLock();
DriveRegister dr(re.getDriveRegisterAddress(), be);
dr.fetchNoLock();
driveState.setAddress(dr.getDriveAddress(driveName));
driveStateLock.lock(driveState);
driveState.fetch();
if (driveState.getOwner() != dr.getAddressIfSet()) {
std::string previouslySeenOwner=driveState.getOwner();
// We have a special case: the drive state is not owned by the
// drive register.
// As we are lock free, we will re-lock in proper order.
if (driveStateLock.isLocked()) driveStateLock.release();
ScopedExclusiveLock drl(dr);
dr.fetch();
// Re-get the state (could have changed).
driveState.resetAddress();
driveState.setAddress(dr.getDriveAddress(driveName));
driveStateLock.lock(driveState);
driveState.fetch();
// We have an exclusive lock on everything. We can now
// safely switch the owner of the drive status to the drive register
// (there is no other steady state ownership).
// return all as we are done.
log::ScopedParamContainer params (lc);
params.add("driveRegisterObject", dr.getAddressIfSet())
.add("driveStateObject", driveState.getAddressIfSet())
.add("driveStateCurrentOwner", driveState.getOwner())
.add("driveStatePreviouslySeenOwner", previouslySeenOwner);
lc.log(log::WARNING, "In Helpers::getLockedAndFetchedDriveState(): unexpected owner for driveState (should be register, will fix it).");
if (driveState.getOwner() != dr.getAddressIfSet()) {
driveState.setOwner(dr.getAddressIfSet());
driveState.commit();
}
// The drive register lock will be released automatically
}
// We're done with good day scenarios.
return;
} catch (...) {
// If anything goes wrong, we will suppose we have to create the drive state and do every step,
// one at time. Of course, this is far more costly (lock-wise).
// ... except if we were not supposed to create it.
if (doCreate == CreateIfNeeded::doNotCreate) {
throw NoSuchDrive("In Helpers::getLockedAndFetchedDriveState(): no such drive. Will not create it as instructed.");
}
RootEntry re(be);
re.fetchNoLock();
DriveRegister dr(re.getDriveRegisterAddress(), be);
ScopedExclusiveLock drl(dr);
dr.fetch();
checkDriveKnown:
try {
std::string dsAddress=dr.getDriveAddress(driveName);
// The drive is known. Check it does exist.
// We work in this order here because we are in one-off mode, so
// efficiency is not problematic.
if (be.exists(dsAddress)) {
driveState.setAddress(dsAddress);
driveStateLock.lock(driveState);
driveState.fetch();
if (driveState.getOwner() != dr.getAddressIfSet()) {
driveState.setOwner(dr.getAddressIfSet());
driveState.commit();
}
} else {
dr.removeDrive(driveName);
goto checkDriveKnown;
}
} catch (DriveRegister::NoSuchDrive &) {
// OK, we do need to create the drive status.
driveState.setAddress(agentReference.nextId(std::string ("DriveStatus-")+driveName));
driveState.initialize(driveName);
agentReference.addToOwnership(driveState.getAddressIfSet(), be);
driveState.setOwner(agentReference.getAgentAddress());
driveState.insert();
dr.setDriveAddress(driveName, driveState.getAddressIfSet());
dr.commit();
driveStateLock.lock(driveState);
driveState.fetch();
driveState.setOwner(dr.getAddressIfSet());
driveState.commit();
agentReference.removeFromOwnership(driveState.getAddressIfSet(), be);
return;
}
}
}
//------------------------------------------------------------------------------
// Helpers::getAllDriveStates()
//------------------------------------------------------------------------------
std::list<cta::common::dataStructures::DriveState> Helpers::getAllDriveStates(Backend& backend, log::LogContext &lc) {
std::list<cta::common::dataStructures::DriveState> ret;
// Get the register. Parallel get the states. Report... BUT if there are discrepancies, we
// will need to do some cleanup.
RootEntry re(backend);
re.fetchNoLock();
DriveRegister dr(re.getDriveRegisterAddress(), backend);
dr.fetchNoLock();
std::list<DriveState> driveStates;
std::list<std::unique_ptr<DriveState::AsyncLockfreeFetcher>> driveStateFetchers;
for (auto & d: dr.getDriveAddresses()) {
driveStates.emplace_back(DriveState(d.driveStateAddress, backend));
driveStateFetchers.emplace_back(driveStates.back().asyncLockfreeFetch());
}
for (auto & df: driveStateFetchers) {
df->wait();
}
for (auto &d: driveStates) {
ret.emplace_back(d.getState());
}
return ret;
}
}} // namespace cta::objectstore.