Skip to content
Snippets Groups Projects
Commit 84e6b7e3 authored by Eric Cano's avatar Eric Cano
Browse files

Completed implementation of RetrieveRequest::garbageCollect().

Added a tape queue and status cache update function in objectstore::Helpers.
parent be6d0b4d
Branches
Tags
No related merge requests found
......@@ -66,12 +66,53 @@ void Helpers::getLockedAndFetchedArchiveQueue(ArchiveQueue& archiveQueue,
}
}
throw cta::exception::Exception(std::string(
"In OStoreDB::getLockedArchiveQueue(): failed to find or create and lock archive queue after 5 retries for tapepool: ")
"In OStoreDB::getLockedAndFetchedArchiveQueue(): failed to find or create and lock archive queue after 5 retries for tapepool: ")
+ tapePool);
}
//------------------------------------------------------------------------------
// Helpers::getLockedAndFetchedArchiveQueue()
// Helpers::getLockedAndFetchedRetrieveQueue()
//------------------------------------------------------------------------------
void Helpers::getLockedAndFetchedRetrieveQueue(RetrieveQueue& retrieveQueue, ScopedExclusiveLock& retrieveQueueLock, AgentReference& agentReference, const std::string& vid) {
// TODO: if necessary, we could use a singleton caching object here to accelerate
// lookups.
// Getting a locked AQ is the name of the game.
// Try and find an existing one first, create if needed
Backend & be = retrieveQueue.m_objectStore;
for (size_t i=0; i<5; i++) {
{
RootEntry re (be);
ScopedSharedLock rel(re);
re.fetch();
try {
retrieveQueue.setAddress(re.getRetrieveQueue(vid));
} catch (cta::exception::Exception & ex) {
rel.release();
ScopedExclusiveLock rexl(re);
re.fetch();
retrieveQueue.setAddress(re.addOrGetRetrieveQueueAndCommit(vid, agentReference));
}
}
try {
retrieveQueueLock.lock(retrieveQueue);
retrieveQueue.fetch();
return;
} catch (cta::exception::Exception & ex) {
// We have a (rare) opportunity for a race condition, where we identify the
// queue and it gets deleted before we manage to lock it.
// The locking of fetching will fail in this case.
// We hence allow ourselves to retry a couple times.
continue;
}
}
throw cta::exception::Exception(std::string(
"In OStoreDB::getLockedAndFetchedRetrieveQueue(): failed to find or create and lock archive queue after 5 retries for vid: ")
+ vid);
}
//------------------------------------------------------------------------------
// Helpers::selectBestRetrieveQueue()
//------------------------------------------------------------------------------
std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candidateVids, cta::catalogue::Catalogue & catalogue,
objectstore::Backend & objectstore) {
......@@ -165,6 +206,31 @@ std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candid
return *it;
}
//------------------------------------------------------------------------------
// Helpers::updateRetrieveQueueStatisticsCache()
//------------------------------------------------------------------------------
void Helpers::updateRetrieveQueueStatisticsCache(const std::string& vid, uint64_t files, uint64_t bytes, uint64_t priority) {
// We will not update the status of the tape if we already cached it (called did not check),
// but of course we will suppose the tape is not disabled if the cache entry is either absent
// or stale.
threading::MutexLocker ml(g_retrieveQueueStatisticsMutex);
try {
if (g_retrieveQueueStatistics.at(vid).updateTime + c_retrieveQueueCacheMaxAge < time(nullptr))
g_retrieveQueueStatistics.at(vid).tapeStatus.disabled=false;
g_retrieveQueueStatistics.at(vid).stats.filesQueued=files;
g_retrieveQueueStatistics.at(vid).stats.bytesQueued=bytes;
g_retrieveQueueStatistics.at(vid).updateTime=time(nullptr);
} catch (std::out_of_range &) {
// The entry is missing. We just create it.
g_retrieveQueueStatistics[vid].stats.bytesQueued=bytes;
g_retrieveQueueStatistics[vid].stats.filesQueued=files;
g_retrieveQueueStatistics[vid].stats.currentPriority=priority;
g_retrieveQueueStatistics[vid].stats.vid=vid;
g_retrieveQueueStatistics[vid].tapeStatus.disabled=false;
g_retrieveQueueStatistics[vid].tapeStatus.full=false;
}
}
//------------------------------------------------------------------------------
// Helpers::g_retrieveQueueStatistics
//------------------------------------------------------------------------------
......@@ -178,7 +244,6 @@ cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex;
//------------------------------------------------------------------------------
// Helpers::getLockedAndFetchedArchiveQueue()
//------------------------------------------------------------------------------
std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueStatistics(
const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string>& vidsToConsider,
objectstore::Backend & objectstore) {
......
......@@ -86,6 +86,12 @@ public:
const std::set<std::string> & vidsToConsider,
objectstore::Backend & objectstore);
/**
* Opportunistic updating of the queue stats cache as we access it. This implies the
* tape is not disabled (full status not fetched).
*/
static void updateRetrieveQueueStatisticsCache(const std::string & vid, uint64_t files, uint64_t bytes, uint64_t priority);
private:
/** Lock for the retrieve queues stats */
static cta::threading::Mutex g_retrieveQueueStatisticsMutex;
......
......@@ -33,6 +33,9 @@ cta::objectstore::RetrieveQueue::RetrieveQueue(GenericObject& go):
getPayloadFromHeader();
}
cta::objectstore::RetrieveQueue::RetrieveQueue(Backend& os):
ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>(os) { }
void cta::objectstore::RetrieveQueue::initialize(const std::string &vid) {
ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t>::initialize();
// Set the reguired fields
......
......@@ -32,6 +32,8 @@ class GenericObject;
class RetrieveQueue: public ObjectOps<serializers::RetrieveQueue, serializers::RetrieveQueue_t> {
public:
RetrieveQueue(const std::string & address, Backend & os);
// Undefined object constructor
RetrieveQueue(Backend & os);
RetrieveQueue(GenericObject & go);
void initialize(const std::string & vid);
void commit();
......
......@@ -22,6 +22,7 @@
#include "MountPolicySerDeser.hpp"
#include "DiskFileInfoSerDeser.hpp"
#include "ArchiveFileSerDeser.hpp"
#include "RetrieveQueue.hpp"
#include "objectstore/cta.pb.h"
#include "Helpers.hpp"
#include <google/protobuf/util/json_util.h>
......@@ -86,9 +87,67 @@ void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentRefe
found:;
}
}
// If there is no candidate, we cancel the job
// TODO: in the future, we might queue it for reporting to EOS.
if (candidateVids.empty()) {
remove();
log::ScopedParamContainer params(lc);
params.add("jobObject", getAddressIfSet());
lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): deleted job as no tape file is available for recall.");
return;
}
// If we have to fetch the status of the tapes and queued for the non-disabled vids.
auto bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore);
throw cta::exception::Exception("In RetrieveRequest::garbageCollect(): not implemented.");
// Find the corresponding tape file, which will give the copynb, which will allow finding the retrieve job.
auto bestTapeFile=m_payload.archivefile().tapefiles().begin();
while (bestTapeFile != m_payload.archivefile().tapefiles().end()) {
if (bestTapeFile->vid() == bestVid)
goto tapeFileFound;
bestTapeFile++;
}
{
std::stringstream err;
err << "In RetrieveRequest::garbageCollect(): could not find tapefile for vid " << bestVid;
throw exception::Exception(err.str());
}
tapeFileFound:;
auto bestJob=m_payload.mutable_jobs()->begin();
while (bestJob!=m_payload.mutable_jobs()->end()) {
if (bestJob->copynb() == bestTapeFile->copynb())
goto jobFound;
bestJob++;
}
{
std::stringstream err;
err << "In RetrieveRequest::garbageCollect(): could not find job for copynb " << bestTapeFile->copynb();
throw exception::Exception(err.str());
}
jobFound:;
// We now need to grab the queue a requeue the request.
RetrieveQueue rq(m_objectStore);
ScopedExclusiveLock rql;
Helpers::getLockedAndFetchedRetrieveQueue(rq, rql, agentReference, bestVid);
// Enqueue add the job to the queue
objectstore::MountPolicySerDeser mp;
mp.deserialize(m_payload.mountpolicy());
rq.addJob(bestTapeFile->copynb(), bestTapeFile->fseq(), getAddressIfSet(), m_payload.archivefile().filesize(),
mp, m_payload.schedulerrequest().entrylog().time());
auto jobsSummary=rq.getJobsSummary();
rq.commit();
// We can now make the transition official
bestJob->set_status(serializers::RetrieveJobStatus::RJS_Pending);
m_payload.set_activecopynb(bestJob->copynb());
setOwner(rq.getAddressIfSet());
commit();
{
log::ScopedParamContainer params(lc);
params.add("jobObject", getAddressIfSet())
.add("queueObject", rq.getAddressIfSet())
.add("copynb", bestTapeFile->copynb())
.add("vid", bestTapeFile->vid());
lc.log(log::INFO, "In RetrieveRequest::garbageCollect(): requeued the request.");
}
Helpers::updateRetrieveQueueStatisticsCache(bestVid, jobsSummary.files, jobsSummary.bytes, jobsSummary.priority);
}
//------------------------------------------------------------------------------
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment