Commit be6d0b4d authored by Eric Cano's avatar Eric Cano
Browse files

Created a helper function for retrieve queue selection.

In order to be efficient, the data (tape status and queue sizes) is cached in memory.
This function needed access to the catalogue (where the tape status is located).
This in turn required passing the catalogue object to:
 - The garbage colletion functions for all object types.
 - The garbage collector object now keeps a reference to the catalogue.
 - The path to the catalogue credential file should now be passed to cta-objectstore-collect-orphaned-object
 - The OStoreDB also needs a reference to the catalogue in order to queue retrive requests.

 The garbage collection member function are now also being passed a log context, so the requirement for better
 log of object fate can also be fulfilled later. This in turn required the passing of a logger object to
 the object store and relative helper classes.

 Finally, a dummy catalogue class has been created to be used in unit tests.
parent 31f3ac02
/*
* The CERN Tape Archive(CTA) project
* Copyright(C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
*(at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "Catalogue.hpp"
namespace cta {
namespace catalogue {
/**
* An empty implementation of the Catalogue used to populate unit tests of the scheduler database
* as they need a reference to a Catalogue, used in very few situations (requeueing of retrieve
* requests).
*/
class DummyCatalogue: public Catalogue {
public:
DummyCatalogue(log::Logger &l): Catalogue(l) {}
virtual ~DummyCatalogue() { }
void createAdminHost(const common::dataStructures::SecurityIdentity& admin, const std::string& hostName, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void createAdminUser(const common::dataStructures::SecurityIdentity& admin, const std::string& username, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void createArchiveRoute(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& storageClassName, const uint64_t copyNb, const std::string& tapePoolName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void createLogicalLibrary(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void createMountPolicy(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t archivePriority, const uint64_t minArchiveRequestAge, const uint64_t retrievePriority, const uint64_t minRetrieveRequestAge, const uint64_t maxDrivesAllowed, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void createRequesterGroupMountRule(const common::dataStructures::SecurityIdentity& admin, const std::string& mountPolicyName, const std::string& diskInstanceName, const std::string& requesterGroupName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void createRequesterMountRule(const common::dataStructures::SecurityIdentity& admin, const std::string& mountPolicyName, const std::string& diskInstance, const std::string& requesterName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void createStorageClass(const common::dataStructures::SecurityIdentity& admin, const common::dataStructures::StorageClass& storageClass) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void createTape(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& logicalLibraryName, const std::string& tapePoolName, const uint64_t capacityInBytes, const bool disabled, const bool full, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void createTapePool(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t nbPartialTapes, const bool encryptionValue, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteAdminHost(const std::string& hostName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteAdminUser(const std::string& username) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteArchiveFile(const std::string& instanceName, const uint64_t archiveFileId) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteArchiveRoute(const std::string& diskInstanceName, const std::string& storageClassName, const uint64_t copyNb) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteLogicalLibrary(const std::string& name) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteMountPolicy(const std::string& name) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteRequesterGroupMountRule(const std::string& diskInstanceName, const std::string& requesterGroupName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteRequesterMountRule(const std::string& diskInstanceName, const std::string& requesterName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteStorageClass(const std::string& diskInstanceName, const std::string& storageClassName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteTape(const std::string& vid) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteTapePool(const std::string& name) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void filesWrittenToTape(const std::set<TapeFileWritten>& event) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::AdminHost> getAdminHosts() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::AdminUser> getAdminUsers() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
common::dataStructures::ArchiveFile getArchiveFileById(const uint64_t id) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
ArchiveFileItor getArchiveFiles(const TapeFileSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::ArchiveRoute> getArchiveRoutes() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::LogicalLibrary> getLogicalLibraries() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::MountPolicy> getMountPolicies() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::RequesterGroupMountRule> getRequesterGroupMountRules() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::RequesterMountRule> getRequesterMountRules() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::StorageClass> getStorageClasses() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
common::dataStructures::ArchiveFileSummary getTapeFileSummary(const TapeFileSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::TapePool> getTapePools() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::Tape> getTapes(const TapeSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string>& vids) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<TapeForWriting> getTapesForWriting(const std::string& logicalLibraryName) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
bool isAdmin(const common::dataStructures::SecurityIdentity& admin) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyAdminHostComment(const common::dataStructures::SecurityIdentity& admin, const std::string& hostName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyAdminUserComment(const common::dataStructures::SecurityIdentity& admin, const std::string& username, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint64_t copyNb, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint64_t copyNb, const std::string& tapePoolName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyLogicalLibraryComment(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyMountPolicyArchiveMinRequestAge(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t minArchiveRequestAge) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyMountPolicyArchivePriority(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t archivePriority) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyMountPolicyComment(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyMountPolicyMaxDrivesAllowed(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t maxDrivesAllowed) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyMountPolicyRetrieveMinRequestAge(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t minRetrieveRequestAge) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyMountPolicyRetrievePriority(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t retrievePriority) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyRequesteMountRuleComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& requesterName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyRequesterGroupMountRuleComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& requesterGroupName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyRequesterGroupMountRulePolicy(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& requesterGroupName, const std::string& mountPolicy) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyRequesterMountRulePolicy(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& requesterName, const std::string& mountPolicy) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyStorageClassComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyStorageClassNbCopies(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& name, const uint64_t nbCopies) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyTapeCapacityInBytes(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const uint64_t capacityInBytes) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyTapeComment(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyTapeEncryptionKey(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& encryptionKey) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyTapeLogicalLibraryName(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& logicalLibraryName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyTapePoolComment(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyTapePoolNbPartialTapes(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t nbPartialTapes) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyTapeTapePoolName(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& tapePoolName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void noSpaceLeftOnTape(const std::string& vid) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void ping() { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
common::dataStructures::ArchiveFileQueueCriteria prepareForNewFile(const std::string& diskInstanceName, const std::string& storageClassName, const common::dataStructures::UserIdentity& user) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string& instanceName, const uint64_t archiveFileId, const common::dataStructures::UserIdentity& user) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void reclaimTape(const common::dataStructures::SecurityIdentity& admin, const std::string& vid) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void setTapeDisabled(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const bool disabledValue) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void setTapeFull(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const bool fullValue) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void setTapePoolEncryption(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const bool encryptionValue) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
bool tapeExists(const std::string& vid) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void tapeLabelled(const std::string& vid, const std::string& drive, const bool lbpIsOn) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void tapeMountedForArchive(const std::string& vid, const std::string& drive) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void tapeMountedForRetrieve(const std::string& vid, const std::string& drive) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
bool tapePoolExists(const std::string& tapePoolName) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
};
}} // namespace cta::catalogue.
\ No newline at end of file
......@@ -107,7 +107,8 @@ bool cta::objectstore::Agent::isEmpty() {
return true;
}
void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) {
void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkPayloadWritable();
// We are here limited to checking the presumed owner and mark the agent as
// untracked in the agent register in case of match, else we do nothing
......
......@@ -56,7 +56,8 @@ public:
bool isEmpty();
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
/* class ScopedIntent {
public:
......
......@@ -41,7 +41,8 @@ void cta::objectstore::AgentRegister::initialize() {
m_payloadInterpreted = true;
}
void cta::objectstore::AgentRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) {
void cta::objectstore::AgentRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkPayloadWritable();
if (!isEmpty()) {
throw (NotEmpty("Trying to garbage collect a non-empty AgentRegister: internal error"));
......
......@@ -34,7 +34,8 @@ public:
AgentRegister(const std::string & name, Backend & os);
void initialize();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
bool isEmpty();
void addAgent (std::string name);
void removeAgent (const std::string & name);
......
......@@ -98,7 +98,8 @@ bool ArchiveQueue::isEmpty() {
return true;
}
void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) {
void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkPayloadWritable();
// If the agent is not anymore the owner of the object, then only the very
// last operation of the tape pool creation failed. We have nothing to do.
......
......@@ -100,7 +100,8 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
// Garbage collection
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
std::string dump();
};
......
......@@ -283,7 +283,11 @@ auto ArchiveRequest::dumpJobs() -> std::list<ArchiveRequest::JobDump> {
return ret;
}
void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) {
//------------------------------------------------------------------------------
// garbageCollect
//------------------------------------------------------------------------------
void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkPayloadWritable();
// The behavior here depends on which job the agent is supposed to own.
// We should first find this job (if any). This is for covering the case
......
......@@ -112,7 +112,8 @@ public:
};
std::list<JobDump> dumpJobs();
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
std::string dump();
};
......
......@@ -114,7 +114,7 @@ target_link_libraries(cta-objectstore-dereference-removed-queues
add_executable(cta-objectstore-collect-orphaned-object cta-objectstore-collect-orphaned-object.cpp)
set_target_properties(cta-objectstore-collect-orphaned-object PROPERTIES INSTALL_RPATH ${PROTOBUF3_RPATH})
target_link_libraries(cta-objectstore-collect-orphaned-object
${PROTOBUF3_LIBRARIES} ctaobjectstore ctacommon)
${PROTOBUF3_LIBRARIES} ctaobjectstore ctacommon ctacatalogue)
install(TARGETS cta-objectstore-initialize cta-objectstore-list cta-objectstore-dump-object cta-objectstore-unfollow-agent
cta-objectstore-dereference-removed-queues cta-objectstore-collect-orphaned-object
......
......@@ -70,7 +70,8 @@ void DriveRegister::initialize() {
//------------------------------------------------------------------------------
// DriveRegister::garbageCollect())
//------------------------------------------------------------------------------
void DriveRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) {
void DriveRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkPayloadWritable();
// If the agent is not anymore the owner of the object, then only the very
// last operation of the drive register creation failed. We have nothing to do.
......
......@@ -38,7 +38,8 @@ public:
DriveRegister(GenericObject & go);
void initialize();
CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override;
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
bool isEmpty();
// Drives management =========================================================
......
......@@ -24,8 +24,8 @@
namespace cta { namespace objectstore {
const size_t GarbageCollector::c_maxWatchedAgentsPerGC = 5;
GarbageCollector::GarbageCollector(Backend & os, AgentReference & agentReference):
m_objectStore(os), m_ourAgentReference(agentReference), m_agentRegister(os) {
GarbageCollector::GarbageCollector(Backend & os, AgentReference & agentReference, catalogue::Catalogue & catalogue):
m_objectStore(os), m_catalogue(catalogue), m_ourAgentReference(agentReference), m_agentRegister(os) {
RootEntry re(m_objectStore);
ScopedSharedLock reLock(re);
re.fetch();
......@@ -182,7 +182,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon
go.fetch();
// Call GenericOpbject's garbage collect method, which in turn will
// delegate to the object type's garbage collector.
go.garbageCollectDispatcher(goLock, address, m_ourAgentReference);
go.garbageCollectDispatcher(goLock, address, m_ourAgentReference, lc, m_catalogue);
lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): garbage collected owned object.");
} else {
lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): skipping garbage collection of now gone object.");
......
......@@ -36,7 +36,7 @@ namespace cta { namespace objectstore {
class GarbageCollector {
public:
GarbageCollector(Backend & os, AgentReference & agentReference);
GarbageCollector(Backend & os, AgentReference & agentReference, catalogue::Catalogue & catalogue);
void runOnePass(log::LogContext & lc);
......@@ -51,6 +51,7 @@ public:
void reinjectOwnedObject(log::LogContext & lc);
private:
Backend & m_objectStore;
catalogue::Catalogue & m_catalogue;
AgentReference & m_ourAgentReference;
AgentRegister m_agentRegister;
std::map<std::string, AgentWatchdog * > m_watchedAgents;
......
......@@ -34,12 +34,14 @@
#include "ArchiveRequest.hpp"
#include "ArchiveQueue.hpp"
#include "EntryLogSerDeser.hpp"
#include "catalogue/DummyCatalogue.hpp"
namespace unitTests {
TEST(ObjectStore, GarbageCollectorBasicFuctionnality) {
// We will need a log object
cta::log::DummyLogger dl("unitTest");
cta::catalogue::DummyCatalogue catalogue(dl);
cta::log::LogContext lc(dl);
// Here we check for the ability to detect dead (but empty agents)
// and clean them up.
......@@ -74,7 +76,7 @@ TEST(ObjectStore, GarbageCollectorBasicFuctionnality) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......@@ -93,6 +95,7 @@ TEST(ObjectStore, GarbageCollectorRegister) {
// We will need a log object
cta::log::DummyLogger dl("unitTest");
cta::log::LogContext lc(dl);
cta::catalogue::DummyCatalogue catalogue(dl);
// Here we check that can successfully call agentRegister's garbage collector
cta::objectstore::BackendVFS be;
cta::objectstore::AgentReference agentRef("unitTestGarbageCollector");
......@@ -131,7 +134,7 @@ TEST(ObjectStore, GarbageCollectorRegister) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......@@ -151,6 +154,8 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) {
// We will need a log object
cta::log::DummyLogger dl("unitTest");
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue(dl);
// Here we check that can successfully call agentRegister's garbage collector
cta::objectstore::BackendVFS be;
cta::objectstore::AgentReference agentRef("unitTestGarbageCollector");
......@@ -189,7 +194,7 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......@@ -209,6 +214,8 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) {
// We will need a log object
cta::log::DummyLogger dl("unitTest");
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue(dl);
// Here we check that can successfully call agentRegister's garbage collector
cta::objectstore::BackendVFS be;
cta::objectstore::AgentReference agentRef("unitTestGarbageCollector");
......@@ -247,7 +254,7 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......@@ -271,6 +278,8 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
cta::log::DummyLogger dl("unitTest");
#endif
cta::log::LogContext lc(dl);
// We need a dummy catalogue
cta::catalogue::DummyCatalogue catalogue(dl);
// Here we check that can successfully call ArchiveRequests's garbage collector
cta::objectstore::BackendVFS be;
// Create the root entry
......@@ -409,7 +418,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
gcAgent.setTimeout_us(0);
gcAgent.insertAndRegisterSelf();
{
cta::objectstore::GarbageCollector gc(be, gcAgentRef);
cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue);
gc.runOnePass(lc);
gc.runOnePass(lc);
}
......
......@@ -77,47 +77,50 @@ namespace {
using cta::objectstore::ScopedExclusiveLock;
template <class C>
void garbageCollectWithType(GenericObject * gop, ScopedExclusiveLock& lock,
const std::string &presumedOwner, AgentReference & agentReference) {
const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
C typedObject(*gop);
lock.transfer(typedObject);
typedObject.garbageCollect(presumedOwner, agentReference);
typedObject.garbageCollect(presumedOwner, agentReference, lc, catalogue);
// Release the lock now as if we let the caller do, it will point
// to the then-removed typedObject.
lock.release();
}
}
void GenericObject::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) {
void GenericObject::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
throw ForbiddenOperation("In GenericObject::garbageCollect(): GenericObject cannot be garbage collected");
}
void GenericObject::garbageCollectDispatcher(ScopedExclusiveLock& lock,
const std::string &presumedOwner, AgentReference & agentReference) {
const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) {
checkHeaderWritable();
switch(m_header.type()) {
case serializers::AgentRegister_t:
garbageCollectWithType<AgentRegister>(this, lock, presumedOwner, agentReference);
garbageCollectWithType<AgentRegister>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
case serializers::Agent_t:
garbageCollectWithType<Agent>(this, lock, presumedOwner, agentReference);
garbageCollectWithType<Agent>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
case serializers::DriveRegister_t:
garbageCollectWithType<DriveRegister>(this, lock, presumedOwner, agentReference);
garbageCollectWithType<DriveRegister>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
case serializers::SchedulerGlobalLock_t:
garbageCollectWithType<SchedulerGlobalLock>(this, lock, presumedOwner, agentReference);
garbageCollectWithType<SchedulerGlobalLock>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
case serializers::ArchiveRequest_t:
garbageCollectWithType<ArchiveRequest>(this, lock, presumedOwner, agentReference);
garbageCollectWithType<ArchiveRequest>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
case serializers::RetrieveRequest_t:
garbageCollectWithType<RetrieveRequest>(this, lock, presumedOwner, agentReference);
garbageCollectWithType<RetrieveRequest>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
case serializers::ArchiveQueue_t:
garbageCollectWithType<ArchiveQueue>(this, lock, presumedOwner, agentReference);
garbageCollectWithType<ArchiveQueue>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
case serializers::RetrieveQueue_t:
garbageCollectWithType<RetrieveQueue>(this, lock, presumedOwner, agentReference);
garbageCollectWithType<RetrieveQueue>(this, lock, presumedOwner, agentReference, lc, catalogue);
break;
default: {
std::stringstream err;
......
......@@ -51,7 +51,8 @@ public:
/** Overload of ObjectOps's implementation: this operation is forbidden. Generic
* Object cannot be garbage collected as-is */
void garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) override;
void garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
/** This dispatcher function will call the object's garbage collection function.
* It also handles the passed lock and returns is unlocked.
......@@ -61,8 +62,12 @@ public:
* @param lock reference to the generic object's lock
* @param presumedOwner address of the agent which pointed to the object
* @param agent reference object allowing creation of new objects when needed (at least for requeuing of requests)
* @params lc log context passed to garbage collection routines to log GC steps.
* @params catalogue reference to the catalogue, used by some garbage collection routines (specially for RetriveRequests
* which are tape status dependent.
*/
void garbageCollectDispatcher(ScopedExclusiveLock & lock, const std::string &presumedOwner, AgentReference & agentReference);
void garbageCollectDispatcher(ScopedExclusiveLock & lock, const std::string &presumedOwner, AgentReference & agentReference,
log::LogContext & lc, cta::catalogue::Catalogue & catalogue);
/** This dispatcher function will call the object's dump.
* It also handles the passed lock.
......
......@@ -20,12 +20,16 @@
#include "Backend.hpp"
#include "ArchiveQueue.hpp"
#include "AgentReference.hpp"
#include "RetrieveQueue.hpp"
#include "RootEntry.hpp"
#include "catalogue/Catalogue.hpp"
#include "common/exception/NonRetryableError.hpp"
#include <random>
namespace cta { namespace objectstore {
//------------------------------------------------------------------------------
// OStoreDB::getLockedAndFetchedArchiveQueue()
// Helpers::getLockedAndFetchedArchiveQueue()
//------------------------------------------------------------------------------
void Helpers::getLockedAndFetchedArchiveQueue(ArchiveQueue& archiveQueue,
ScopedExclusiveLock& archiveQueueLock, AgentReference & agentReference,
......@@ -66,4 +70,152 @@ void Helpers::getLockedAndFetchedArchiveQueue(ArchiveQueue& archiveQueue,
+ tapePool);
}
//------------------------------------------------------------------------------
// Helpers::getLockedAndFetchedArchiveQueue()
//------------------------------------------------------------------------------
std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candidateVids, cta::catalogue::Catalogue & catalogue,
objectstore::Backend & objectstore) {
// We will build the retrieve stats of the non-disable candidate vids here
std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStats;
// A promise we create so we can make users wait on it.
// Take the global lock
cta::threading::MutexLocker grqsmLock(g_retrieveQueueStatisticsMutex);
// Create a promise just in case
// Find the vids to be fetched (if any).
for (auto & v: candidateVids) {
try {
// Out of range or outdated will be updated the same way.
// If an update is in progress, we wait on it, and get the result after.
// We have to release the global lock while doing so.
if (g_retrieveQueueStatistics.at(v).updating) {
// Cache is updating, we wait on update.
auto updateFuture = g_retrieveQueueStatistics.at(v).updateFuture;
grqsmLock.unlock();
updateFuture.wait();
grqsmLock.lock();
if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled) {
candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
}
} else {
// We have a cache hit, check it's not stale.
if (g_retrieveQueueStatistics.at(v).updateTime + c_retrieveQueueCacheMaxAge > time(nullptr))
throw std::out_of_range("");
// We're lucky: cache hit (and not stale)
if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled)
candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
}
} catch (std::out_of_range) {
// We need to update the entry in the cache (miss or stale, we handle the same way).
// We just update one vid at a time as doing several in parallel would be quite
// hairy lock-wise (but give a slight performance boost).
g_retrieveQueueStatistics[v].updating = true;
std::promise<void> updatePromise;
g_retrieveQueueStatistics[v].updateFuture = updatePromise.get_future();
// Give other threads a chance to access the cache for other vids.
grqsmLock.unlock();
// Get the informations (stages, so we don't access the global variable without the mutex.
auto tapeStatus=catalogue.getTapesByVid({v});
// Build a minimal service retrieve file queue criteria to query queues.
common::dataStructures::RetrieveFileQueueCriteria rfqc;
rfqc.archiveFile.tapeFiles[1].vid=v;
auto queuesStats=Helpers::getRetrieveQueueStatistics(rfqc, {v}, objectstore);
// We now have the data we need. Update the cache.
grqsmLock.lock();
g_retrieveQueueStatistics[v].updating=false;
g_retrieveQueueStatistics[v].updateFuture=std::shared_future<void>();
// Check we got the expected vid (and size of stats).
if (queuesStats.size()!=1)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for queueStats.");
if (queuesStats.front().vid!=v)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in queueStats.");
if (tapeStatus.size()!=1)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for tapeStatus.");
if (tapeStatus.begin()->first!=v)
throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in tapeStatus.");
g_retrieveQueueStatistics[v].stats = queuesStats.front();
g_retrieveQueueStatistics[v].tapeStatus = tapeStatus.at(v);
// Signal to potential waiters
updatePromise.set_value();
// Update our own candidate list if needed.
if(!g_retrieveQueueStatistics.at(v).tapeStatus.disabled)
candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats);
}
}
// We now have all the candidates listed (if any).