-
Michael Davis authoredMichael Davis authored
Helpers.hpp 6.64 KiB
/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright © 2021-2022 CERN
* @license This program is free software, distributed under the terms of the GNU General Public
* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can
* redistribute it and/or modify it under the terms of the GPL Version 3, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* In applying this licence, CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization or
* submit itself to any jurisdiction.
*/
#pragma once
#include <syscall.h>
#include <fstream>
#include <future>
#include <list>
#include <map>
#include <set>
#include <string>
#include "common/dataStructures/JobQueueType.hpp"
#include "common/dataStructures/RepackQueueType.hpp"
#include "common/dataStructures/Tape.hpp"
#include "common/threading/Mutex.hpp"
#include "common/threading/MutexLocker.hpp"
#include "scheduler/OStoreDB/OStoreDB.hpp"
#include "scheduler/SchedulerDatabase.hpp"
// Activate or not helper cache update for debugging
// #define HELPERS_CACHE_UPDATE_LOGGING
#define HELPERS_CACHE_UPDATE_LOGGING_FILE "/var/tmp/cta-helpers-update-cache.log"
/**
* A collection of helper functions for commonly used multi-object operations
*/
namespace cta {
namespace catalogue {
class Catalogue;
}
namespace objectstore {
class ScopedExclusiveLock;
class AgentReference;
class RepackQueue;
/**
* A class with static functions allowing multi-object operations
*/
class Helpers {
public:
/**
* Find or create an archive or retrieve queue, and return it locked and fetched to the caller
* (Queue and ScopedExclusiveLock objects are provided empty)
* @param queue the queue object, empty
* @param queueLock the lock, not initialized
* @param agentReference the agent reference that will be needed in case of object creation
* @param tapePool or vid the name of the needed tape pool
*/
template <class Queue>
static void getLockedAndFetchedJobQueue(Queue & queue,
ScopedExclusiveLock & queueLock, AgentReference & agentReference,
const std::optional<std::string> & tapePoolOrVid, common::dataStructures::JobQueueType queueType, log::LogContext & lc);
/**
* Find or create a repack queue, and return it locked and fetched to the caller
* (Queue and ScopedExclusiveLock objects are provided empty)
* @param queue the queue object, empty
* @param queueLock the lock, not initialized
* @param agentReference the agent reference that will be needed in case of object creation
*/
static void getLockedAndFetchedRepackQueue(RepackQueue & queue,
ScopedExclusiveLock & queueLock, AgentReference & agentReference,
common::dataStructures::RepackQueueType queueType, log::LogContext & lc);
CTA_GENERATE_EXCEPTION_CLASS(NoTapeAvailableForRetrieve);
/**
* Find the most appropriate queue (bid) to add the retrieve request to. The potential
* VIDs (VIDs for non-failed copies) is provided by the caller. The status of the
* the tapes (and available queue size) are all cached to avoid
* frequent access to the object store. The caching create a small inefficiency
* to the algorithm, but will help performance drastically for a very similar result
*/
static std::string selectBestRetrieveQueue(const std::set<std::string> & candidateVids,
cta::catalogue::Catalogue & catalogue, objectstore::Backend & objectstore, bool isRepack = false);
/**
* Gets the retrieve queue statistics for a set of Vids (extracted from the OStoreDB
* so it can be used in the Helper context without passing the DB object.
*/
static std::list<SchedulerDatabase::RetrieveQueueStatistics> getRetrieveQueueStatistics(
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria,
const std::set<std::string> & vidsToConsider,
objectstore::Backend & objectstore);
/**
* Opportunistic updating of the queue stats cache as we access it. This implies the
* tape is not disabled (full status not fetched).
*/
static void updateRetrieveQueueStatisticsCache(const std::string & vid, uint64_t files, uint64_t bytes, uint64_t priority);
/**
* Allows to flush the RetrieveQueueStatisticsCache
* TO BE USED BY UNIT TESTS !
*/
static void flushRetrieveQueueStatisticsCache();
static void flushRetrieveQueueStatisticsCacheForVid(const std::string & vid);
private:
/** A struct holding together tape statistics and an update time */
struct TapeStatusWithTime {
common::dataStructures::Tape tapeStatus;
time_t updateTime;
};
/** Cache for tape statistics */
static std::map<std::string, TapeStatusWithTime> g_tapeStatuses;
/** Lock for the retrieve queues stats */
static cta::threading::Mutex g_retrieveQueueStatisticsMutex;
/** A struct holding together RetrieveQueueStatistics, tape status and an update time. */
struct RetrieveQueueStatisticsWithTime {
cta::SchedulerDatabase::RetrieveQueueStatistics stats;
cta::common::dataStructures::Tape tapeStatus;
bool updating;
/** The shared future will allow all updating safely an entry of the cache while
* releasing the global mutex to allow threads interested in other VIDs to carry on.*/
std::shared_future<void> updateFuture;
time_t updateTime;
};
/** The stats for the queues */
static std::map<std::string, RetrieveQueueStatisticsWithTime> g_retrieveQueueStatistics;
/** Time between cache updates */
static const time_t c_tapeCacheMaxAge = 600;
static const time_t c_retrieveQueueCacheMaxAge = 10;
static void logUpdateCacheIfNeeded(const bool entryCreation, const RetrieveQueueStatisticsWithTime& tapeStatistic,
const std::string& message = "");
public:
enum class CreateIfNeeded: bool {
create = true,
doNotCreate = false
};
/**
* Helper to register a repack request in the repack index.
* As this structure was developed late, we potentially have to create it on the fly.
*/
static void registerRepackRequestToIndex(const std::string & vid, const std::string & requestAddress,
AgentReference & agentReference, Backend & backend, log::LogContext & lc);
/**
* Helper to remove an entry form the repack index.
*/
static void removeRepackRequestToIndex(const std::string & vid, Backend & backend, log::LogContext & lc);
};
} // namespace objectstore
} // namespace cta