From 10939657bccb54aefa335a1a6c47db4b8c5e553e Mon Sep 17 00:00:00 2001 From: Michael Davis <michael.davis@cern.ch> Date: Tue, 13 Dec 2022 12:16:00 +0100 Subject: [PATCH] Resolve "Tape server querying DB at very high rate" --- ReleaseNotes.md | 1 + objectstore/Helpers.cpp | 75 +++++++++++++++++++++++++++++++---------- objectstore/Helpers.hpp | 8 +++++ 3 files changed, 67 insertions(+), 17 deletions(-) diff --git a/ReleaseNotes.md b/ReleaseNotes.md index b680be5983..36ab865fbc 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -2,6 +2,7 @@ ## Summary ### Features +- cta/CTA#155 - Tape server querying DB at very high rate - cta/CTA#224 - Improve error message for cta-verify-file whn VID does not exist - cta/CTA#230 - Modify CTA code to enforce VID uppercase ### Bug Fixes diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index 5c7d9a345a..75ac3be204 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -370,11 +370,35 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStats; // We will build the retrieve stats of the disabled vids here, as a fallback std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStatsFallback; - // A promise we create so we can make users wait on it. // Take the global lock cta::threading::MutexLocker grqsmLock(g_retrieveQueueStatisticsMutex); - // Create a promise just in case - // Find the vids to be fetched (if any). + // Ensure the tape status cache contains all the entries we need + try { + for(auto& v : candidateVids) { + // throw std::out_of_range() if cache item not found or if it is stale + auto timeSinceLastUpdate = time(nullptr) - g_tapeStatuses.at(v).updateTime; + if(timeSinceLastUpdate > c_tapeCacheMaxAge) { + throw std::out_of_range(""); + } + } + } catch (std::out_of_range&) { + // Remove stale cache entries + for(auto it = g_tapeStatuses.cbegin(); it != g_tapeStatuses.cend(); ) { + auto timeSinceLastUpdate = time(nullptr) - it->second.updateTime; + if(timeSinceLastUpdate > c_tapeCacheMaxAge) { + it = g_tapeStatuses.erase(it); + } else { + ++it; + } + } + // Add in all the entries we need for this batch of candidates + auto tapeStatuses = catalogue.getTapesByVid(candidateVids); + for(auto& ts : tapeStatuses) { + g_tapeStatuses[ts.first].tapeStatus = ts.second; + g_tapeStatuses[ts.first].updateTime = time(nullptr); + } + } + // Find the vids to be fetched (if any) for (auto & v: candidateVids) { try { // Out of range or outdated will be updated the same way. @@ -423,11 +447,20 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid g_retrieveQueueStatistics[v].updating = true; std::promise<void> updatePromise; g_retrieveQueueStatistics[v].updateFuture = updatePromise.get_future(); + // Get the cached tape status value before releasing the lock + if(g_tapeStatuses.find(v) == g_tapeStatuses.end()) { + // Handle corner case where there are two candidate vids and the second candidate was evicted because it is stale + auto tapeStatuses = catalogue.getTapesByVid(v); + if(tapeStatuses.size() != 1) { + throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): candidate vid not found in the TAPE table."); + } + g_tapeStatuses[v].tapeStatus = tapeStatuses.begin()->second; + g_tapeStatuses[v].updateTime = time(nullptr); + } + common::dataStructures::Tape tapeStatus = g_tapeStatuses.at(v).tapeStatus; // Give other threads a chance to access the cache for other vids. grqsmLock.unlock(); - // Get the informations (stages, so we don't access the global variable without the mutex. - auto tapeStatus=catalogue.getTapesByVid(v); - // Build a minimal service retrieve file queue criteria to query queues. + // Build a minimal service retrieve file queue criteria to query queues common::dataStructures::RetrieveFileQueueCriteria rfqc; common::dataStructures::TapeFile tf; tf.copyNb = 1; @@ -438,17 +471,13 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid grqsmLock.lock(); g_retrieveQueueStatistics[v].updating=false; g_retrieveQueueStatistics[v].updateFuture=std::shared_future<void>(); - // Check we got the expected vid (and size of stats). + // Check size of stats if (queuesStats.size()!=1) throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for queueStats."); if (queuesStats.front().vid!=v) throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in queueStats."); - if (tapeStatus.size()!=1) - throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for tapeStatus."); - if (tapeStatus.begin()->first!=v) - throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in tapeStatus."); g_retrieveQueueStatistics[v].stats = queuesStats.front(); - g_retrieveQueueStatistics[v].tapeStatus = tapeStatus.at(v); + g_retrieveQueueStatistics[v].tapeStatus = tapeStatus; g_retrieveQueueStatistics[v].updateTime = time(nullptr); logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[v]); // Signal to potential waiters @@ -504,14 +533,19 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_ logUpdateCacheIfNeeded(false,g_retrieveQueueStatistics.at(vid)); } catch (std::out_of_range &) { // The entry is missing. We just create it. - g_retrieveQueueStatistics[vid].stats.bytesQueued=bytes; g_retrieveQueueStatistics[vid].stats.filesQueued=files; + g_retrieveQueueStatistics[vid].stats.bytesQueued=bytes; g_retrieveQueueStatistics[vid].stats.currentPriority=priority; g_retrieveQueueStatistics[vid].stats.vid=vid; - g_retrieveQueueStatistics[vid].tapeStatus.state = common::dataStructures::Tape::ACTIVE; - g_retrieveQueueStatistics[vid].tapeStatus.full=false; g_retrieveQueueStatistics[vid].updating = false; g_retrieveQueueStatistics[vid].updateTime = time(nullptr); + try { + // Use the cached tape status if we have it, otherwise fake it + g_retrieveQueueStatistics[vid].tapeStatus = g_tapeStatuses.at(vid).tapeStatus; + } catch(std::out_of_range&) { + g_retrieveQueueStatistics[vid].tapeStatus.state = common::dataStructures::Tape::ACTIVE; + g_retrieveQueueStatistics[vid].tapeStatus.full = false; + } logUpdateCacheIfNeeded(true,g_retrieveQueueStatistics[vid]); } } @@ -519,23 +553,30 @@ void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_ void Helpers::flushRetrieveQueueStatisticsCache(){ threading::MutexLocker ml(g_retrieveQueueStatisticsMutex); g_retrieveQueueStatistics.clear(); + g_tapeStatuses.clear(); } void Helpers::flushRetrieveQueueStatisticsCacheForVid(const std::string & vid){ threading::MutexLocker ml(g_retrieveQueueStatisticsMutex); g_retrieveQueueStatistics.erase(vid); + g_tapeStatuses.erase(vid); } //------------------------------------------------------------------------------ -// Helpers::g_retrieveQueueStatistics +// Helpers::g_tapeStatuses //------------------------------------------------------------------------------ -std::map<std::string, Helpers::RetrieveQueueStatisticsWithTime> Helpers::g_retrieveQueueStatistics; +std::map<std::string, Helpers::TapeStatusWithTime> Helpers::g_tapeStatuses; //------------------------------------------------------------------------------ // Helpers::g_retrieveQueueStatisticsMutex //------------------------------------------------------------------------------ cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex; +//------------------------------------------------------------------------------ +// Helpers::g_retrieveQueueStatistics +//------------------------------------------------------------------------------ +std::map<std::string, Helpers::RetrieveQueueStatisticsWithTime> Helpers::g_retrieveQueueStatistics; + //------------------------------------------------------------------------------ // Helpers::getRetrieveQueueStatistics() //------------------------------------------------------------------------------ diff --git a/objectstore/Helpers.hpp b/objectstore/Helpers.hpp index caa1c5f025..51d3cbb262 100644 --- a/objectstore/Helpers.hpp +++ b/objectstore/Helpers.hpp @@ -117,6 +117,13 @@ class Helpers { static void flushRetrieveQueueStatisticsCacheForVid(const std::string & vid); private: + /** A struct holding together tape statistics and an update time */ + struct TapeStatusWithTime { + common::dataStructures::Tape tapeStatus; + time_t updateTime; + }; + /** Cache for tape statistics */ + static std::map<std::string, TapeStatusWithTime> g_tapeStatuses; /** Lock for the retrieve queues stats */ static cta::threading::Mutex g_retrieveQueueStatisticsMutex; /** A struct holding together RetrieveQueueStatistics, tape status and an update time. */ @@ -132,6 +139,7 @@ class Helpers { /** The stats for the queues */ static std::map<std::string, RetrieveQueueStatisticsWithTime> g_retrieveQueueStatistics; /** Time between cache updates */ + static const time_t c_tapeCacheMaxAge = 600; static const time_t c_retrieveQueueCacheMaxAge = 10; static void logUpdateCacheIfNeeded(const bool entryCreation, const RetrieveQueueStatisticsWithTime& tapeStatistic, const std::string& message = ""); -- GitLab