Skip to content
Snippets Groups Projects
Commit 10939657 authored by Michael Davis's avatar Michael Davis
Browse files

Resolve "Tape server querying DB at very high rate"

parent 1bfc2590
No related branches found
No related tags found
No related merge requests found
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
## Summary ## Summary
### Features ### Features
- cta/CTA#155 - Tape server querying DB at very high rate
- cta/CTA#224 - Improve error message for cta-verify-file whn VID does not exist - cta/CTA#224 - Improve error message for cta-verify-file whn VID does not exist
- cta/CTA#230 - Modify CTA code to enforce VID uppercase - cta/CTA#230 - Modify CTA code to enforce VID uppercase
### Bug Fixes ### Bug Fixes
......
...@@ -370,11 +370,35 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid ...@@ -370,11 +370,35 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid
std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStats; std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStats;
// We will build the retrieve stats of the disabled vids here, as a fallback // We will build the retrieve stats of the disabled vids here, as a fallback
std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStatsFallback; std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStatsFallback;
// A promise we create so we can make users wait on it.
// Take the global lock // Take the global lock
cta::threading::MutexLocker grqsmLock(g_retrieveQueueStatisticsMutex); cta::threading::MutexLocker grqsmLock(g_retrieveQueueStatisticsMutex);
// Create a promise just in case // Ensure the tape status cache contains all the entries we need
// Find the vids to be fetched (if any). try {
for(auto& v : candidateVids) {
// throw std::out_of_range() if cache item not found or if it is stale
auto timeSinceLastUpdate = time(nullptr) - g_tapeStatuses.at(v).updateTime;
if(timeSinceLastUpdate > c_tapeCacheMaxAge) {
throw std::out_of_range("");
}
}
} catch (std::out_of_range&) {
// Remove stale cache entries
for(auto it = g_tapeStatuses.cbegin(); it != g_tapeStatuses.cend(); ) {
auto timeSinceLastUpdate = time(nullptr) - it->second.updateTime;
if(timeSinceLastUpdate > c_tapeCacheMaxAge) {
it = g_tapeStatuses.erase(it);
} else {
++it;
}
}
// Add in all the entries we need for this batch of candidates
auto tapeStatuses = catalogue.getTapesByVid(candidateVids);
for(auto& ts : tapeStatuses) {
g_tapeStatuses[ts.first].tapeStatus = ts.second;
g_tapeStatuses[ts.first].updateTime = time(nullptr);
}
}
// Find the vids to be fetched (if any)
for (auto & v: candidateVids) { for (auto & v: candidateVids) {
try { try {
// Out of range or outdated will be updated the same way. // Out of range or outdated will be updated the same way.
...@@ -423,11 +447,20 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid ...@@ -423,11 +447,20 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid
g_retrieveQueueStatistics[v].updating = true; g_retrieveQueueStatistics[v].updating = true;
std::promise<void> updatePromise; std::promise<void> updatePromise;
g_retrieveQueueStatistics[v].updateFuture = updatePromise.get_future(); g_retrieveQueueStatistics[v].updateFuture = updatePromise.get_future();
// Get the cached tape status value before releasing the lock
if(g_tapeStatuses.find(v) == g_tapeStatuses.end()) {
// Handle corner case where there are two candidate vids and the second candidate was evicted because it is stale
auto tapeStatuses = catalogue.getTapesByVid(v);
if(tapeStatuses.size() != 1) {
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): candidate vid not found in the TAPE table.");
}
g_tapeStatuses[v].tapeStatus = tapeStatuses.begin()->second;
g_tapeStatuses[v].updateTime = time(nullptr);
}
common::dataStructures::Tape tapeStatus = g_tapeStatuses.at(v).tapeStatus;
// Give other threads a chance to access the cache for other vids. // Give other threads a chance to access the cache for other vids.
grqsmLock.unlock(); grqsmLock.unlock();
// Get the informations (stages, so we don't access the global variable without the mutex. // Build a minimal service retrieve file queue criteria to query queues
auto tapeStatus=catalogue.getTapesByVid(v);
// Build a minimal service retrieve file queue criteria to query queues.
common::dataStructures::RetrieveFileQueueCriteria rfqc; common::dataStructures::RetrieveFileQueueCriteria rfqc;
common::dataStructures::TapeFile tf; common::dataStructures::TapeFile tf;
tf.copyNb = 1; tf.copyNb = 1;
...@@ -438,17 +471,13 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid ...@@ -438,17 +471,13 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid
grqsmLock.lock(); grqsmLock.lock();
g_retrieveQueueStatistics[v].updating=false; g_retrieveQueueStatistics[v].updating=false;
g_retrieveQueueStatistics[v].updateFuture=std::shared_future<void>(); g_retrieveQueueStatistics[v].updateFuture=std::shared_future<void>();
// Check we got the expected vid (and size of stats). // Check size of stats
if (queuesStats.size()!=1) if (queuesStats.size()!=1)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for queueStats."); throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for queueStats.");
if (queuesStats.front().vid!=v) if (queuesStats.front().vid!=v)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in queueStats."); throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in queueStats.");
if (tapeStatus.size()!=1)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for tapeStatus.");
if (tapeStatus.begin()->first!=v)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in tapeStatus.");
g_retrieveQueueStatistics[v].stats = queuesStats.front(); g_retrieveQueueStatistics[v].stats = queuesStats.front();
g_retrieveQueueStatistics[v].tapeStatus = tapeStatus.at(v); g_retrieveQueueStatistics[v].tapeStatus = tapeStatus;
g_retrieveQueueStatistics[v].updateTime = time(nullptr); g_retrieveQueueStatistics[v].updateTime = time(nullptr);
logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[v]); logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[v]);
// Signal to potential waiters // Signal to potential waiters
...@@ -504,14 +533,19 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_ ...@@ -504,14 +533,19 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_
logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(vid)); logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(vid));
} catch (std::out_of_range &) { } catch (std::out_of_range &) {
// The entry is missing. We just create it. // The entry is missing. We just create it.
g_retrieveQueueStatistics[vid].stats.bytesQueued=bytes;
g_retrieveQueueStatistics[vid].stats.filesQueued=files; g_retrieveQueueStatistics[vid].stats.filesQueued=files;
g_retrieveQueueStatistics[vid].stats.bytesQueued=bytes;
g_retrieveQueueStatistics[vid].stats.currentPriority=priority; g_retrieveQueueStatistics[vid].stats.currentPriority=priority;
g_retrieveQueueStatistics[vid].stats.vid=vid; g_retrieveQueueStatistics[vid].stats.vid=vid;
g_retrieveQueueStatistics[vid].tapeStatus.state = common::dataStructures::Tape::ACTIVE;
g_retrieveQueueStatistics[vid].tapeStatus.full=false;
g_retrieveQueueStatistics[vid].updating = false; g_retrieveQueueStatistics[vid].updating = false;
g_retrieveQueueStatistics[vid].updateTime = time(nullptr); g_retrieveQueueStatistics[vid].updateTime = time(nullptr);
try {
// Use the cached tape status if we have it, otherwise fake it
g_retrieveQueueStatistics[vid].tapeStatus = g_tapeStatuses.at(vid).tapeStatus;
} catch(std::out_of_range&) {
g_retrieveQueueStatistics[vid].tapeStatus.state = common::dataStructures::Tape::ACTIVE;
g_retrieveQueueStatistics[vid].tapeStatus.full = false;
}
logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[vid]); logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[vid]);
} }
} }
...@@ -519,23 +553,30 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_ ...@@ -519,23 +553,30 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_
void Helpers::flushRetrieveQueueStatisticsCache(){ void Helpers::flushRetrieveQueueStatisticsCache(){
threading::MutexLocker ml(g_retrieveQueueStatisticsMutex); threading::MutexLocker ml(g_retrieveQueueStatisticsMutex);
g_retrieveQueueStatistics.clear(); g_retrieveQueueStatistics.clear();
g_tapeStatuses.clear();
} }
void Helpers::flushRetrieveQueueStatisticsCacheForVid(const std::string & vid){ void Helpers::flushRetrieveQueueStatisticsCacheForVid(const std::string & vid){
threading::MutexLocker ml(g_retrieveQueueStatisticsMutex); threading::MutexLocker ml(g_retrieveQueueStatisticsMutex);
g_retrieveQueueStatistics.erase(vid); g_retrieveQueueStatistics.erase(vid);
g_tapeStatuses.erase(vid);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Helpers::g_retrieveQueueStatistics // Helpers::g_tapeStatuses
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
std::map<std::string, Helpers::RetrieveQueueStatisticsWithTime> Helpers::g_retrieveQueueStatistics; std::map<std::string, Helpers::TapeStatusWithTime> Helpers::g_tapeStatuses;
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Helpers::g_retrieveQueueStatisticsMutex // Helpers::g_retrieveQueueStatisticsMutex
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex; cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex;
//------------------------------------------------------------------------------
// Helpers::g_retrieveQueueStatistics
//------------------------------------------------------------------------------
std::map<std::string, Helpers::RetrieveQueueStatisticsWithTime> Helpers::g_retrieveQueueStatistics;
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Helpers::getRetrieveQueueStatistics() // Helpers::getRetrieveQueueStatistics()
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
......
...@@ -117,6 +117,13 @@ class Helpers { ...@@ -117,6 +117,13 @@ class Helpers {
static void flushRetrieveQueueStatisticsCacheForVid(const std::string & vid); static void flushRetrieveQueueStatisticsCacheForVid(const std::string & vid);
private: private:
/** A struct holding together tape statistics and an update time */
struct TapeStatusWithTime {
common::dataStructures::Tape tapeStatus;
time_t updateTime;
};
/** Cache for tape statistics */
static std::map<std::string, TapeStatusWithTime> g_tapeStatuses;
/** Lock for the retrieve queues stats */ /** Lock for the retrieve queues stats */
static cta::threading::Mutex g_retrieveQueueStatisticsMutex; static cta::threading::Mutex g_retrieveQueueStatisticsMutex;
/** A struct holding together RetrieveQueueStatistics, tape status and an update time. */ /** A struct holding together RetrieveQueueStatistics, tape status and an update time. */
...@@ -132,6 +139,7 @@ class Helpers { ...@@ -132,6 +139,7 @@ class Helpers {
/** The stats for the queues */ /** The stats for the queues */
static std::map<std::string, RetrieveQueueStatisticsWithTime> g_retrieveQueueStatistics; static std::map<std::string, RetrieveQueueStatisticsWithTime> g_retrieveQueueStatistics;
/** Time between cache updates */ /** Time between cache updates */
static const time_t c_tapeCacheMaxAge = 600;
static const time_t c_retrieveQueueCacheMaxAge = 10; static const time_t c_retrieveQueueCacheMaxAge = 10;
static void logUpdateCacheIfNeeded(const bool entryCreation, const RetrieveQueueStatisticsWithTime& tapeStatistic, static void logUpdateCacheIfNeeded(const bool entryCreation, const RetrieveQueueStatisticsWithTime& tapeStatistic,
const std::string& message = ""); const std::string& message = "");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment