diff --git a/catalogue/DummyCatalogue.hpp b/catalogue/DummyCatalogue.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e44f909c53a99a1a98fc36c4b52d46c6ac9b4cb3 --- /dev/null +++ b/catalogue/DummyCatalogue.hpp @@ -0,0 +1,114 @@ +/* + * The CERN Tape Archive(CTA) project + * Copyright(C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + *(at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "Catalogue.hpp" + +namespace cta { + +namespace catalogue { + +/** + * An empty implementation of the Catalogue used to populate unit tests of the scheduler database + * as they need a reference to a Catalogue, used in very few situations (requeueing of retrieve + * requests). + */ +class DummyCatalogue: public Catalogue { +public: + DummyCatalogue(log::Logger &l): Catalogue(l) {} + virtual ~DummyCatalogue() { } + void createAdminHost(const common::dataStructures::SecurityIdentity& admin, const std::string& hostName, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + + void createAdminUser(const common::dataStructures::SecurityIdentity& admin, const std::string& username, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void createArchiveRoute(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& storageClassName, const uint64_t copyNb, const std::string& tapePoolName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void createLogicalLibrary(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void createMountPolicy(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t archivePriority, const uint64_t minArchiveRequestAge, const uint64_t retrievePriority, const uint64_t minRetrieveRequestAge, const uint64_t maxDrivesAllowed, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void createRequesterGroupMountRule(const common::dataStructures::SecurityIdentity& admin, const std::string& mountPolicyName, const std::string& diskInstanceName, const std::string& requesterGroupName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void createRequesterMountRule(const common::dataStructures::SecurityIdentity& admin, const std::string& mountPolicyName, const std::string& diskInstance, const std::string& requesterName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void createStorageClass(const common::dataStructures::SecurityIdentity& admin, const common::dataStructures::StorageClass& storageClass) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void createTape(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& logicalLibraryName, const std::string& tapePoolName, const uint64_t capacityInBytes, const bool disabled, const bool full, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void createTapePool(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t nbPartialTapes, const bool encryptionValue, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteAdminHost(const std::string& hostName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteAdminUser(const std::string& username) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteArchiveFile(const std::string& instanceName, const uint64_t archiveFileId) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteArchiveRoute(const std::string& diskInstanceName, const std::string& storageClassName, const uint64_t copyNb) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteLogicalLibrary(const std::string& name) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteMountPolicy(const std::string& name) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteRequesterGroupMountRule(const std::string& diskInstanceName, const std::string& requesterGroupName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteRequesterMountRule(const std::string& diskInstanceName, const std::string& requesterName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteStorageClass(const std::string& diskInstanceName, const std::string& storageClassName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteTape(const std::string& vid) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void deleteTapePool(const std::string& name) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void filesWrittenToTape(const std::set<TapeFileWritten>& event) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::AdminHost> getAdminHosts() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::AdminUser> getAdminUsers() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + common::dataStructures::ArchiveFile getArchiveFileById(const uint64_t id) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + ArchiveFileItor getArchiveFiles(const TapeFileSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::ArchiveRoute> getArchiveRoutes() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::LogicalLibrary> getLogicalLibraries() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::MountPolicy> getMountPolicies() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::RequesterGroupMountRule> getRequesterGroupMountRules() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::RequesterMountRule> getRequesterMountRules() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::StorageClass> getStorageClasses() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + common::dataStructures::ArchiveFileSummary getTapeFileSummary(const TapeFileSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::TapePool> getTapePools() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<common::dataStructures::Tape> getTapes(const TapeSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + common::dataStructures::VidToTapeMap getTapesByVid(const std::set<std::string>& vids) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + std::list<TapeForWriting> getTapesForWriting(const std::string& logicalLibraryName) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + bool isAdmin(const common::dataStructures::SecurityIdentity& admin) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyAdminHostComment(const common::dataStructures::SecurityIdentity& admin, const std::string& hostName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyAdminUserComment(const common::dataStructures::SecurityIdentity& admin, const std::string& username, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint64_t copyNb, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint64_t copyNb, const std::string& tapePoolName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyLogicalLibraryComment(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyMountPolicyArchiveMinRequestAge(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t minArchiveRequestAge) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyMountPolicyArchivePriority(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t archivePriority) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyMountPolicyComment(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyMountPolicyMaxDrivesAllowed(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t maxDrivesAllowed) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyMountPolicyRetrieveMinRequestAge(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t minRetrieveRequestAge) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyMountPolicyRetrievePriority(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t retrievePriority) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyRequesteMountRuleComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& requesterName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyRequesterGroupMountRuleComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& requesterGroupName, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyRequesterGroupMountRulePolicy(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& requesterGroupName, const std::string& mountPolicy) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyRequesterMountRulePolicy(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& requesterName, const std::string& mountPolicy) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyStorageClassComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyStorageClassNbCopies(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& name, const uint64_t nbCopies) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyTapeCapacityInBytes(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const uint64_t capacityInBytes) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyTapeComment(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyTapeEncryptionKey(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& encryptionKey) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyTapeLogicalLibraryName(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& logicalLibraryName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyTapePoolComment(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string& comment) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyTapePoolNbPartialTapes(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const uint64_t nbPartialTapes) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void modifyTapeTapePoolName(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string& tapePoolName) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void noSpaceLeftOnTape(const std::string& vid) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void ping() { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + common::dataStructures::ArchiveFileQueueCriteria prepareForNewFile(const std::string& diskInstanceName, const std::string& storageClassName, const common::dataStructures::UserIdentity& user) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string& instanceName, const uint64_t archiveFileId, const common::dataStructures::UserIdentity& user) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void reclaimTape(const common::dataStructures::SecurityIdentity& admin, const std::string& vid) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void setTapeDisabled(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const bool disabledValue) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void setTapeFull(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const bool fullValue) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void setTapePoolEncryption(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const bool encryptionValue) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + bool tapeExists(const std::string& vid) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void tapeLabelled(const std::string& vid, const std::string& drive, const bool lbpIsOn) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void tapeMountedForArchive(const std::string& vid, const std::string& drive) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + void tapeMountedForRetrieve(const std::string& vid, const std::string& drive) { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } + bool tapePoolExists(const std::string& tapePoolName) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); } +}; + +}} // namespace cta::catalogue. \ No newline at end of file diff --git a/objectstore/Agent.cpp b/objectstore/Agent.cpp index fc106b00fc85fb8f8b6ce39aae1da33949466a92..545f241b4807d2b3a65b174e9eb706dfe65601d0 100644 --- a/objectstore/Agent.cpp +++ b/objectstore/Agent.cpp @@ -107,7 +107,8 @@ bool cta::objectstore::Agent::isEmpty() { return true; } -void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) { +void cta::objectstore::Agent::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { checkPayloadWritable(); // We are here limited to checking the presumed owner and mark the agent as // untracked in the agent register in case of match, else we do nothing diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp index 2a3e41650ebe0f430d6d430787688dcd5a9a81ae..e4649908542023656a81a0bc71ab5a1f4220ac57 100644 --- a/objectstore/Agent.hpp +++ b/objectstore/Agent.hpp @@ -56,7 +56,8 @@ public: bool isEmpty(); - void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; /* class ScopedIntent { public: diff --git a/objectstore/AgentRegister.cpp b/objectstore/AgentRegister.cpp index a75e77a0ccfd2846e25047c15211eeab145132a3..3d7123511455f4835a18adacecaa51e99a7a925b 100644 --- a/objectstore/AgentRegister.cpp +++ b/objectstore/AgentRegister.cpp @@ -41,7 +41,8 @@ void cta::objectstore::AgentRegister::initialize() { m_payloadInterpreted = true; } -void cta::objectstore::AgentRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) { +void cta::objectstore::AgentRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { checkPayloadWritable(); if (!isEmpty()) { throw (NotEmpty("Trying to garbage collect a non-empty AgentRegister: internal error")); diff --git a/objectstore/AgentRegister.hpp b/objectstore/AgentRegister.hpp index 49d746af53cae4ad3b79714e5cd95be223eff5df..2fe565d6a24f93f8053bc841aca99099c35d6959 100644 --- a/objectstore/AgentRegister.hpp +++ b/objectstore/AgentRegister.hpp @@ -34,7 +34,8 @@ public: AgentRegister(const std::string & name, Backend & os); void initialize(); CTA_GENERATE_EXCEPTION_CLASS(NotEmpty); - void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; bool isEmpty(); void addAgent (std::string name); void removeAgent (const std::string & name); diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 70e30cdf6a12586247e0f9b0c22a487f86abb9de..10b3c57f55d9a1d6c035c9df1b4c5e59f5a8f3b7 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -98,7 +98,8 @@ bool ArchiveQueue::isEmpty() { return true; } -void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) { +void ArchiveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { checkPayloadWritable(); // If the agent is not anymore the owner of the object, then only the very // last operation of the tape pool creation failed. We have nothing to do. diff --git a/objectstore/ArchiveQueue.hpp b/objectstore/ArchiveQueue.hpp index 5ffd34b5a3e2a7a6c37a8d75c7488fa84bd55978..943add52387a1962b0dcad72b1714f93688048e4 100644 --- a/objectstore/ArchiveQueue.hpp +++ b/objectstore/ArchiveQueue.hpp @@ -100,7 +100,8 @@ public: CTA_GENERATE_EXCEPTION_CLASS(NotEmpty); // Garbage collection - void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; std::string dump(); }; diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 74a80be28f38bf14449fdc45c35e8463a5c35faf..26a284044604eaf92fd45db2cf5b01e22569b227 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -283,7 +283,11 @@ auto ArchiveRequest::dumpJobs() -> std::list<ArchiveRequest::JobDump> { return ret; } -void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) { +//------------------------------------------------------------------------------ +// garbageCollect +//------------------------------------------------------------------------------ +void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { checkPayloadWritable(); // The behavior here depends on which job the agent is supposed to own. // We should first find this job (if any). This is for covering the case diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index da53d45a74925f87b5c1abd3e323762489980807..da067ef51c251cb3dfee3725e88011b8bd7fb435 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -112,7 +112,8 @@ public: }; std::list<JobDump> dumpJobs(); - void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; std::string dump(); }; diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index 08d344763593a529d49fed4bca261d49d67796d6..1918d973275b406c4eb331975a1a7eb002bcf761 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -114,7 +114,7 @@ target_link_libraries(cta-objectstore-dereference-removed-queues add_executable(cta-objectstore-collect-orphaned-object cta-objectstore-collect-orphaned-object.cpp) set_target_properties(cta-objectstore-collect-orphaned-object PROPERTIES INSTALL_RPATH ${PROTOBUF3_RPATH}) target_link_libraries(cta-objectstore-collect-orphaned-object - ${PROTOBUF3_LIBRARIES} ctaobjectstore ctacommon) + ${PROTOBUF3_LIBRARIES} ctaobjectstore ctacommon ctacatalogue) install(TARGETS cta-objectstore-initialize cta-objectstore-list cta-objectstore-dump-object cta-objectstore-unfollow-agent cta-objectstore-dereference-removed-queues cta-objectstore-collect-orphaned-object diff --git a/objectstore/DriveRegister.cpp b/objectstore/DriveRegister.cpp index 53177b75b44d8ab14e2dfd3e2ae4c260d9df38e2..9816f51ece2ffb563d962f242419e10f340d93b0 100644 --- a/objectstore/DriveRegister.cpp +++ b/objectstore/DriveRegister.cpp @@ -70,7 +70,8 @@ void DriveRegister::initialize() { //------------------------------------------------------------------------------ // DriveRegister::garbageCollect()) //------------------------------------------------------------------------------ -void DriveRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) { +void DriveRegister::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { checkPayloadWritable(); // If the agent is not anymore the owner of the object, then only the very // last operation of the drive register creation failed. We have nothing to do. diff --git a/objectstore/DriveRegister.hpp b/objectstore/DriveRegister.hpp index 8ad841ad9d3a1b2940642a5d72a521e4c8b87595..484424e7b8948d2bc9188ba6024d424bccfa52ef 100644 --- a/objectstore/DriveRegister.hpp +++ b/objectstore/DriveRegister.hpp @@ -38,7 +38,8 @@ public: DriveRegister(GenericObject & go); void initialize(); CTA_GENERATE_EXCEPTION_CLASS(NotEmpty); - void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; bool isEmpty(); // Drives management ========================================================= diff --git a/objectstore/GarbageCollector.cpp b/objectstore/GarbageCollector.cpp index b94318edce3d5c483ec73c538fbca30f447ef30b..4278af11928527cee23dd5f7b6ff7f5ac37be704 100644 --- a/objectstore/GarbageCollector.cpp +++ b/objectstore/GarbageCollector.cpp @@ -24,8 +24,8 @@ namespace cta { namespace objectstore { const size_t GarbageCollector::c_maxWatchedAgentsPerGC = 5; -GarbageCollector::GarbageCollector(Backend & os, AgentReference & agentReference): - m_objectStore(os), m_ourAgentReference(agentReference), m_agentRegister(os) { +GarbageCollector::GarbageCollector(Backend & os, AgentReference & agentReference, catalogue::Catalogue & catalogue): + m_objectStore(os), m_catalogue(catalogue), m_ourAgentReference(agentReference), m_agentRegister(os) { RootEntry re(m_objectStore); ScopedSharedLock reLock(re); re.fetch(); @@ -182,7 +182,7 @@ void GarbageCollector::cleanupDeadAgent(const std::string & address, log::LogCon go.fetch(); // Call GenericOpbject's garbage collect method, which in turn will // delegate to the object type's garbage collector. - go.garbageCollectDispatcher(goLock, address, m_ourAgentReference); + go.garbageCollectDispatcher(goLock, address, m_ourAgentReference, lc, m_catalogue); lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): garbage collected owned object."); } else { lc.log(log::INFO, "In GarbageCollector::cleanupDeadAgent(): skipping garbage collection of now gone object."); diff --git a/objectstore/GarbageCollector.hpp b/objectstore/GarbageCollector.hpp index 9a737ecea9e83471997297afc3563f2d72cfd4b0..beea0ec4a7216f33665d32707b97a34f35207c07 100644 --- a/objectstore/GarbageCollector.hpp +++ b/objectstore/GarbageCollector.hpp @@ -36,7 +36,7 @@ namespace cta { namespace objectstore { class GarbageCollector { public: - GarbageCollector(Backend & os, AgentReference & agentReference); + GarbageCollector(Backend & os, AgentReference & agentReference, catalogue::Catalogue & catalogue); void runOnePass(log::LogContext & lc); @@ -51,6 +51,7 @@ public: void reinjectOwnedObject(log::LogContext & lc); private: Backend & m_objectStore; + catalogue::Catalogue & m_catalogue; AgentReference & m_ourAgentReference; AgentRegister m_agentRegister; std::map<std::string, AgentWatchdog * > m_watchedAgents; diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index ea3819cc14b064bf04243b1f6c5fad40e2977682..62e56eb042462b80d7566259ff6bd4c881917f2b 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -34,12 +34,14 @@ #include "ArchiveRequest.hpp" #include "ArchiveQueue.hpp" #include "EntryLogSerDeser.hpp" +#include "catalogue/DummyCatalogue.hpp" namespace unitTests { TEST(ObjectStore, GarbageCollectorBasicFuctionnality) { // We will need a log object cta::log::DummyLogger dl("unitTest"); + cta::catalogue::DummyCatalogue catalogue(dl); cta::log::LogContext lc(dl); // Here we check for the ability to detect dead (but empty agents) // and clean them up. @@ -74,7 +76,7 @@ TEST(ObjectStore, GarbageCollectorBasicFuctionnality) { gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { - cta::objectstore::GarbageCollector gc(be, gcAgentRef); + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); gc.runOnePass(lc); gc.runOnePass(lc); } @@ -93,6 +95,7 @@ TEST(ObjectStore, GarbageCollectorRegister) { // We will need a log object cta::log::DummyLogger dl("unitTest"); cta::log::LogContext lc(dl); + cta::catalogue::DummyCatalogue catalogue(dl); // Here we check that can successfully call agentRegister's garbage collector cta::objectstore::BackendVFS be; cta::objectstore::AgentReference agentRef("unitTestGarbageCollector"); @@ -131,7 +134,7 @@ TEST(ObjectStore, GarbageCollectorRegister) { gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { - cta::objectstore::GarbageCollector gc(be, gcAgentRef); + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); gc.runOnePass(lc); gc.runOnePass(lc); } @@ -151,6 +154,8 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) { // We will need a log object cta::log::DummyLogger dl("unitTest"); cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue(dl); // Here we check that can successfully call agentRegister's garbage collector cta::objectstore::BackendVFS be; cta::objectstore::AgentReference agentRef("unitTestGarbageCollector"); @@ -189,7 +194,7 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) { gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { - cta::objectstore::GarbageCollector gc(be, gcAgentRef); + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); gc.runOnePass(lc); gc.runOnePass(lc); } @@ -209,6 +214,8 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) { // We will need a log object cta::log::DummyLogger dl("unitTest"); cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue(dl); // Here we check that can successfully call agentRegister's garbage collector cta::objectstore::BackendVFS be; cta::objectstore::AgentReference agentRef("unitTestGarbageCollector"); @@ -247,7 +254,7 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) { gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { - cta::objectstore::GarbageCollector gc(be, gcAgentRef); + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); gc.runOnePass(lc); gc.runOnePass(lc); } @@ -271,6 +278,8 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { cta::log::DummyLogger dl("unitTest"); #endif cta::log::LogContext lc(dl); + // We need a dummy catalogue + cta::catalogue::DummyCatalogue catalogue(dl); // Here we check that can successfully call ArchiveRequests's garbage collector cta::objectstore::BackendVFS be; // Create the root entry @@ -409,7 +418,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { - cta::objectstore::GarbageCollector gc(be, gcAgentRef); + cta::objectstore::GarbageCollector gc(be, gcAgentRef, catalogue); gc.runOnePass(lc); gc.runOnePass(lc); } diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp index d08469078e6044fefbe5ffa742816cc4daea5056..00763ee794577b6cb2e534898bf27478df4c8598 100644 --- a/objectstore/GenericObject.cpp +++ b/objectstore/GenericObject.cpp @@ -77,47 +77,50 @@ namespace { using cta::objectstore::ScopedExclusiveLock; template <class C> void garbageCollectWithType(GenericObject * gop, ScopedExclusiveLock& lock, - const std::string &presumedOwner, AgentReference & agentReference) { + const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { C typedObject(*gop); lock.transfer(typedObject); - typedObject.garbageCollect(presumedOwner, agentReference); + typedObject.garbageCollect(presumedOwner, agentReference, lc, catalogue); // Release the lock now as if we let the caller do, it will point // to the then-removed typedObject. lock.release(); } } -void GenericObject::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) { +void GenericObject::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { throw ForbiddenOperation("In GenericObject::garbageCollect(): GenericObject cannot be garbage collected"); } void GenericObject::garbageCollectDispatcher(ScopedExclusiveLock& lock, - const std::string &presumedOwner, AgentReference & agentReference) { + const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { checkHeaderWritable(); switch(m_header.type()) { case serializers::AgentRegister_t: - garbageCollectWithType<AgentRegister>(this, lock, presumedOwner, agentReference); + garbageCollectWithType<AgentRegister>(this, lock, presumedOwner, agentReference, lc, catalogue); break; case serializers::Agent_t: - garbageCollectWithType<Agent>(this, lock, presumedOwner, agentReference); + garbageCollectWithType<Agent>(this, lock, presumedOwner, agentReference, lc, catalogue); break; case serializers::DriveRegister_t: - garbageCollectWithType<DriveRegister>(this, lock, presumedOwner, agentReference); + garbageCollectWithType<DriveRegister>(this, lock, presumedOwner, agentReference, lc, catalogue); break; case serializers::SchedulerGlobalLock_t: - garbageCollectWithType<SchedulerGlobalLock>(this, lock, presumedOwner, agentReference); + garbageCollectWithType<SchedulerGlobalLock>(this, lock, presumedOwner, agentReference, lc, catalogue); break; case serializers::ArchiveRequest_t: - garbageCollectWithType<ArchiveRequest>(this, lock, presumedOwner, agentReference); + garbageCollectWithType<ArchiveRequest>(this, lock, presumedOwner, agentReference, lc, catalogue); break; case serializers::RetrieveRequest_t: - garbageCollectWithType<RetrieveRequest>(this, lock, presumedOwner, agentReference); + garbageCollectWithType<RetrieveRequest>(this, lock, presumedOwner, agentReference, lc, catalogue); break; case serializers::ArchiveQueue_t: - garbageCollectWithType<ArchiveQueue>(this, lock, presumedOwner, agentReference); + garbageCollectWithType<ArchiveQueue>(this, lock, presumedOwner, agentReference, lc, catalogue); break; case serializers::RetrieveQueue_t: - garbageCollectWithType<RetrieveQueue>(this, lock, presumedOwner, agentReference); + garbageCollectWithType<RetrieveQueue>(this, lock, presumedOwner, agentReference, lc, catalogue); break; default: { std::stringstream err; diff --git a/objectstore/GenericObject.hpp b/objectstore/GenericObject.hpp index ff0aae4445995e9f582232862bd45e2841a91b64..a3de1d82d627505387598138b80e0394cf45d490 100644 --- a/objectstore/GenericObject.hpp +++ b/objectstore/GenericObject.hpp @@ -51,7 +51,8 @@ public: /** Overload of ObjectOps's implementation: this operation is forbidden. Generic * Object cannot be garbage collected as-is */ - void garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; /** This dispatcher function will call the object's garbage collection function. * It also handles the passed lock and returns is unlocked. @@ -61,8 +62,12 @@ public: * @param lock reference to the generic object's lock * @param presumedOwner address of the agent which pointed to the object * @param agent reference object allowing creation of new objects when needed (at least for requeuing of requests) + * @params lc log context passed to garbage collection routines to log GC steps. + * @params catalogue reference to the catalogue, used by some garbage collection routines (specially for RetriveRequests + * which are tape status dependent. */ - void garbageCollectDispatcher(ScopedExclusiveLock & lock, const std::string &presumedOwner, AgentReference & agentReference); + void garbageCollectDispatcher(ScopedExclusiveLock & lock, const std::string &presumedOwner, AgentReference & agentReference, + log::LogContext & lc, cta::catalogue::Catalogue & catalogue); /** This dispatcher function will call the object's dump. * It also handles the passed lock. diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index f772d3be55ff2978e6376e494344b1ff5d9e3af9..94fc1e45558ebc4bd313c0116f0e94dbbef0b585 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -20,12 +20,16 @@ #include "Backend.hpp" #include "ArchiveQueue.hpp" #include "AgentReference.hpp" +#include "RetrieveQueue.hpp" #include "RootEntry.hpp" +#include "catalogue/Catalogue.hpp" +#include "common/exception/NonRetryableError.hpp" +#include <random> namespace cta { namespace objectstore { //------------------------------------------------------------------------------ -// OStoreDB::getLockedAndFetchedArchiveQueue() +// Helpers::getLockedAndFetchedArchiveQueue() //------------------------------------------------------------------------------ void Helpers::getLockedAndFetchedArchiveQueue(ArchiveQueue& archiveQueue, ScopedExclusiveLock& archiveQueueLock, AgentReference & agentReference, @@ -66,4 +70,152 @@ void Helpers::getLockedAndFetchedArchiveQueue(ArchiveQueue& archiveQueue, + tapePool); } +//------------------------------------------------------------------------------ +// Helpers::getLockedAndFetchedArchiveQueue() +//------------------------------------------------------------------------------ +std::string Helpers::selectBestRetrieveQueue(const std::set<std::string>& candidateVids, cta::catalogue::Catalogue & catalogue, + objectstore::Backend & objectstore) { + // We will build the retrieve stats of the non-disable candidate vids here + std::list<SchedulerDatabase::RetrieveQueueStatistics> candidateVidsStats; + // A promise we create so we can make users wait on it. + // Take the global lock + cta::threading::MutexLocker grqsmLock(g_retrieveQueueStatisticsMutex); + // Create a promise just in case + // Find the vids to be fetched (if any). + for (auto & v: candidateVids) { + try { + // Out of range or outdated will be updated the same way. + // If an update is in progress, we wait on it, and get the result after. + // We have to release the global lock while doing so. + if (g_retrieveQueueStatistics.at(v).updating) { + // Cache is updating, we wait on update. + auto updateFuture = g_retrieveQueueStatistics.at(v).updateFuture; + grqsmLock.unlock(); + updateFuture.wait(); + grqsmLock.lock(); + if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled) { + candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats); + } + } else { + // We have a cache hit, check it's not stale. + if (g_retrieveQueueStatistics.at(v).updateTime + c_retrieveQueueCacheMaxAge > time(nullptr)) + throw std::out_of_range(""); + // We're lucky: cache hit (and not stale) + if (!g_retrieveQueueStatistics.at(v).tapeStatus.disabled) + candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats); + } + } catch (std::out_of_range) { + // We need to update the entry in the cache (miss or stale, we handle the same way). + // We just update one vid at a time as doing several in parallel would be quite + // hairy lock-wise (but give a slight performance boost). + g_retrieveQueueStatistics[v].updating = true; + std::promise<void> updatePromise; + g_retrieveQueueStatistics[v].updateFuture = updatePromise.get_future(); + // Give other threads a chance to access the cache for other vids. + grqsmLock.unlock(); + // Get the informations (stages, so we don't access the global variable without the mutex. + auto tapeStatus=catalogue.getTapesByVid({v}); + // Build a minimal service retrieve file queue criteria to query queues. + common::dataStructures::RetrieveFileQueueCriteria rfqc; + rfqc.archiveFile.tapeFiles[1].vid=v; + auto queuesStats=Helpers::getRetrieveQueueStatistics(rfqc, {v}, objectstore); + // We now have the data we need. Update the cache. + grqsmLock.lock(); + g_retrieveQueueStatistics[v].updating=false; + g_retrieveQueueStatistics[v].updateFuture=std::shared_future<void>(); + // Check we got the expected vid (and size of stats). + if (queuesStats.size()!=1) + throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for queueStats."); + if (queuesStats.front().vid!=v) + throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in queueStats."); + if (tapeStatus.size()!=1) + throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected size for tapeStatus."); + if (tapeStatus.begin()->first!=v) + throw cta::exception::Exception("In Helpers::selectBestRetrieveQueue(): unexpected vid in tapeStatus."); + g_retrieveQueueStatistics[v].stats = queuesStats.front(); + g_retrieveQueueStatistics[v].tapeStatus = tapeStatus.at(v); + // Signal to potential waiters + updatePromise.set_value(); + // Update our own candidate list if needed. + if(!g_retrieveQueueStatistics.at(v).tapeStatus.disabled) + candidateVidsStats.emplace_back(g_retrieveQueueStatistics.at(v).stats); + } + } + // We now have all the candidates listed (if any). + if (candidateVidsStats.empty()) + throw exception::NonRetryableError("In Helpers::selectBestRetrieveQueue(): no tape available to recall from."); + // Sort the tapes. + candidateVidsStats.sort(SchedulerDatabase::RetrieveQueueStatistics::leftGreaterThanRight); + // Get a list of equivalent best tapes + std::set<std::string> shortlistVids; + for (auto & s: candidateVidsStats) { + if (!(s<candidateVidsStats.front()) && !(s>candidateVidsStats.front())) + shortlistVids.insert(s.vid); + } + // If there is only one best tape, we're done + if (shortlistVids.size()==1) return *shortlistVids.begin(); + // There are several equivalent entries, choose randomly among them. + // First element will always be selected. + // We need to get a random number [0, candidateVids.size() -1] + std::default_random_engine dre(std::chrono::system_clock::now().time_since_epoch().count()); + std::uniform_int_distribution<size_t> distribution(0, candidateVids.size() -1); + size_t index=distribution(dre); + auto it=candidateVids.cbegin(); + std::advance(it, index); + return *it; +} + +//------------------------------------------------------------------------------ +// Helpers::g_retrieveQueueStatistics +//------------------------------------------------------------------------------ +std::map<std::string, Helpers::RetrieveQueueStatisticsWithTime> Helpers::g_retrieveQueueStatistics; + +//------------------------------------------------------------------------------ +// Helpers::g_retrieveQueueStatisticsMutex +//------------------------------------------------------------------------------ +cta::threading::Mutex Helpers::g_retrieveQueueStatisticsMutex; + +//------------------------------------------------------------------------------ +// Helpers::getLockedAndFetchedArchiveQueue() +//------------------------------------------------------------------------------ + +std::list<SchedulerDatabase::RetrieveQueueStatistics> Helpers::getRetrieveQueueStatistics( + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string>& vidsToConsider, + objectstore::Backend & objectstore) { + std::list<SchedulerDatabase::RetrieveQueueStatistics> ret; + // Find the retrieve queues for each vid if they exist (absence is possible). + RootEntry re(objectstore); + ScopedSharedLock rel(re); + re.fetch(); + rel.release(); + for (auto &tf:criteria.archiveFile.tapeFiles) { + if (!vidsToConsider.count(tf.second.vid)) + continue; + std::string rqAddr; + try { + std::string rqAddr = re.getRetrieveQueue(tf.second.vid); + } catch (cta::exception::Exception &) { + ret.push_back(SchedulerDatabase::RetrieveQueueStatistics()); + ret.back().vid=tf.second.vid; + ret.back().bytesQueued=0; + ret.back().currentPriority=0; + ret.back().filesQueued=0; + continue; + } + RetrieveQueue rq(rqAddr, objectstore); + ScopedSharedLock rql(rq); + rq.fetch(); + rql.release(); + if (rq.getVid() != tf.second.vid) + throw cta::exception::Exception("In OStoreDB::getRetrieveQueueStatistics(): unexpected vid for retrieve queue"); + ret.push_back(SchedulerDatabase::RetrieveQueueStatistics()); + ret.back().vid=rq.getVid(); + ret.back().currentPriority=rq.getJobsSummary().priority; + ret.back().bytesQueued=rq.getJobsSummary().bytes; + ret.back().filesQueued=rq.getJobsSummary().files; + } + return ret; +} + + }} // namespace cta::objectstore. \ No newline at end of file diff --git a/objectstore/Helpers.hpp b/objectstore/Helpers.hpp index a646d52001be8c5423de67f3653f688eea696dd8..72f2367e95a804e610a140200258e946d5a7d75d 100644 --- a/objectstore/Helpers.hpp +++ b/objectstore/Helpers.hpp @@ -18,7 +18,14 @@ #pragma once +#include "scheduler/SchedulerDatabase.hpp" +#include "common/threading/Mutex.hpp" +#include "common/threading/MutexLocker.hpp" +#include "catalogue/Catalogue.hpp" +#include "scheduler/OStoreDB/OStoreDB.hpp" #include <string> +#include <set> +#include <future> /** * A collection of helper functions for commonly used multi-object operations @@ -27,6 +34,7 @@ namespace cta { namespace objectstore { class ArchiveQueue; +class RetrieveQueue; class ScopedExclusiveLock; class AgentReference; @@ -40,12 +48,61 @@ public: * (ArchiveQueue and ScopedExclusiveLock objects are provided empty) * @param archiveQueue the ArchiveQueue object, empty * @param archiveQueueLock the lock, not initialized + * @param agentReference the agent reference that will be needed in case of object creation * @param tapePool the name of the needed tape pool */ static void getLockedAndFetchedArchiveQueue(ArchiveQueue & archiveQueue, ScopedExclusiveLock & archiveQueueLock, AgentReference & agentReference, const std::string & tapePool); + /** + * Find the most appropriate queue (bid) to add the retrieve request to. The potential + * VIDs (VIDs for non-failed copies) is provided by the caller. The status of the + * the tapes (disabled or not, and available queue size) are all cached to avoid + * frequent access to the object store. The caching create a small inefficiency + * to the algorithm, but will help performance drastically for a very similar result + */ + static std::string selectBestRetrieveQueue (const std::set<std::string> & candidateVids, cta::catalogue::Catalogue & catalogue, + objectstore::Backend & objectstore); + + /** + * Find or create a retrieve queue, and return it locked and fetched to the caller + * (RetrieveQueue and ScopedExclusiveLock objects are provided empty) + * @param archiveQueue the ArchiveQueue object, empty + * @param archiveQueueLock the lock, not initialized + * @param agentReference the agent reference that will be needed in case of object creation + * @param vid the name of the needed tape pool + */ + static void getLockedAndFetchedRetrieveQueue(RetrieveQueue & retrieveQueue, + ScopedExclusiveLock & retrieveQueueLock, AgentReference & agentReference, + const std::string & vid); + + /** + * Gets the retrieve queue statistics for a set of Vids (extracted from the OStoreDB + * so it can be used in the Helper context without passing the DB object. + */ + static std::list<SchedulerDatabase::RetrieveQueueStatistics> getRetrieveQueueStatistics( + const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, + const std::set<std::string> & vidsToConsider, + objectstore::Backend & objectstore); + +private: + /** Lock for the retrieve queues stats */ + static cta::threading::Mutex g_retrieveQueueStatisticsMutex; + /** A struct holding together RetrieveQueueStatistics, tape status and an update time. */ + struct RetrieveQueueStatisticsWithTime { + cta::SchedulerDatabase::RetrieveQueueStatistics stats; + cta::common::dataStructures::Tape tapeStatus; + bool updating; + /** The shared future will allow all updating safely an entry of the cache while + * releasing the global mutex to allow threads interested in other VIDs to carry on.*/ + std::shared_future<void> updateFuture; + time_t updateTime; + }; + /** The stats for the queues */ + static std::map<std::string, RetrieveQueueStatisticsWithTime> g_retrieveQueueStatistics; + /** Time between cache updates */ + static const time_t c_retrieveQueueCacheMaxAge = 10; }; }} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index 3ba163953361401c752eabf09f605098170961d6..bbb50d82ff55c84dde4cc2d7501dd7cfd7ca5c02 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -21,6 +21,8 @@ #include "Backend.hpp" #include "common/exception/Exception.hpp" #include "objectstore/cta.pb.h" +#include "common/log/LogContext.hpp" +#include "catalogue/Catalogue.hpp" #include <memory> #include <stdint.h> @@ -288,7 +290,8 @@ public: /** * This function should be overloaded in the inheriting classes */ - virtual void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) = 0; + virtual void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) = 0; protected: diff --git a/objectstore/RetrieveQueue.cpp b/objectstore/RetrieveQueue.cpp index c22e63f7449cdd27d9beedb1d2cf4f40ef996754..1c12c8d91966125b1b236be9dae7eac9b3b19c63 100644 --- a/objectstore/RetrieveQueue.cpp +++ b/objectstore/RetrieveQueue.cpp @@ -215,6 +215,7 @@ void cta::objectstore::RetrieveQueue::removeJob(const std::string& retriveToFile } while (found); } -void cta::objectstore::RetrieveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) { +void cta::objectstore::RetrieveQueue::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { throw cta::exception::Exception("In RetrieveQueue::garbageCollect(): not implemented"); } diff --git a/objectstore/RetrieveQueue.hpp b/objectstore/RetrieveQueue.hpp index 8aedd43363645b5c171c91dca78157ed775fcec2..1c7ecdd14a69af508d63c8d40485f50835d9abca 100644 --- a/objectstore/RetrieveQueue.hpp +++ b/objectstore/RetrieveQueue.hpp @@ -35,7 +35,8 @@ public: RetrieveQueue(GenericObject & go); void initialize(const std::string & vid); void commit(); - void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; bool isEmpty(); CTA_GENERATE_EXCEPTION_CLASS(NotEmpty); void removeIfEmpty(); diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 1c57232818d58d6a42ee49fbec173108d58e88e8..b474df9e320c36b233fed1c949d04865201a935f 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -23,14 +23,21 @@ #include "DiskFileInfoSerDeser.hpp" #include "ArchiveFileSerDeser.hpp" #include "objectstore/cta.pb.h" +#include "Helpers.hpp" #include <google/protobuf/util/json_util.h> namespace cta { namespace objectstore { +//------------------------------------------------------------------------------ +// Constructor +//------------------------------------------------------------------------------ RetrieveRequest::RetrieveRequest( const std::string& address, Backend& os): ObjectOps<serializers::RetrieveRequest, serializers::RetrieveRequest_t>(os, address) { } +//------------------------------------------------------------------------------ +// Constructor +//------------------------------------------------------------------------------ RetrieveRequest::RetrieveRequest(GenericObject& go): ObjectOps<serializers::RetrieveRequest, serializers::RetrieveRequest_t>(go.objectStore()) { // Here we transplant the generic object into the new object @@ -39,6 +46,9 @@ RetrieveRequest::RetrieveRequest(GenericObject& go): getPayloadFromHeader(); } +//------------------------------------------------------------------------------ +// initialize +//------------------------------------------------------------------------------ void RetrieveRequest::initialize() { // Setup underlying object ObjectOps<serializers::RetrieveRequest, serializers::RetrieveRequest_t>::initialize(); @@ -46,10 +56,44 @@ void RetrieveRequest::initialize() { m_payloadInterpreted = true; } -void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) { +//------------------------------------------------------------------------------ +// garbageCollect +//------------------------------------------------------------------------------ +void RetrieveRequest::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { + checkPayloadWritable(); + // Check the request is indeed owned by the right owner. + if (getOwner() != presumedOwner) return; + // The owner is indeed the right one. We should requeue the request if possible. + // Find the vids for active jobs in the request (pending ones). + using serializers::RetrieveJobStatus; + std::set<RetrieveJobStatus> validStates({RetrieveJobStatus::RJS_Pending, RetrieveJobStatus::RJS_Selected}); + std::set<std::string> candidateVids; + for (auto &j: m_payload.jobs()) { + if (validStates.count(j.status())) { + // Find the job details in tape file + for (auto &tf: m_payload.archivefile().tapefiles()) { + if (tf.copynb() == j.copynb()) { + candidateVids.insert(tf.vid()); + goto found; + } + } + { + std::stringstream err; + err << "In RetrieveRequest::garbageCollect(): could not find tapefile for copynb " << j.copynb(); + throw exception::Exception(err.str()); + } + found:; + } + } + // If we have to fetch the status of the tapes and queued for the non-disabled vids. + auto bestVid=Helpers::selectBestRetrieveQueue(candidateVids, catalogue, m_objectStore); throw cta::exception::Exception("In RetrieveRequest::garbageCollect(): not implemented."); } +//------------------------------------------------------------------------------ +// setJobSuccessful +//------------------------------------------------------------------------------ void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries) { checkPayloadWritable(); auto *tf = m_payload.add_jobs(); @@ -61,6 +105,9 @@ void RetrieveRequest::addJob(uint64_t copyNb, uint16_t maxRetiesWithinMount, uin tf->set_status(serializers::RetrieveJobStatus::RJS_Pending); } +//------------------------------------------------------------------------------ +// setJobSuccessful +//------------------------------------------------------------------------------ bool RetrieveRequest::setJobSuccessful(uint16_t copyNumber) { checkPayloadWritable(); auto * jl = m_payload.mutable_jobs(); diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 782d51ef49ad308d583a3391076ffc41b9027b66..e5c0b1669b3a10ea58c9e8cef78fbacdc6e7e1f2 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -42,7 +42,8 @@ public: RetrieveRequest(const std::string & address, Backend & os); RetrieveRequest(GenericObject & go); void initialize(); - void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; // Job management ============================================================ void addJob(uint64_t copyNumber, uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries); void setJobSelected(uint16_t copyNumber, const std::string & owner); diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 52f8c5221104668640357f7ff35ea4b12c6e0df6..6c0628ae6d50984be0b595d670ec4e8ab2341946 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -82,7 +82,8 @@ void RootEntry::removeIfEmpty() { remove(); } -void RootEntry::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference) { +void RootEntry::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { // The root entry cannot be garbage collected. throw ForbiddenOperation("In RootEntry::garbageCollect(): RootEntry cannot be garbage collected"); } diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index 7ca3d78be5e84714e5b632eff90f8af2cdd09be6..70ef5964580e0de09b28ffe9501aa529efe85d82 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -55,7 +55,8 @@ public: void removeIfEmpty(); // Garbage collection (disallowed for root entry). - void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; // ArchiveQueue handling ==================================================== CTA_GENERATE_EXCEPTION_CLASS(ArchivelQueueNotEmpty); diff --git a/objectstore/SchedulerGlobalLock.cpp b/objectstore/SchedulerGlobalLock.cpp index d4e0e79e58d074d462110dbc34d64661cd966311..5300cdcb22f32ffdf77df17ee8662fdc1d553a2e 100644 --- a/objectstore/SchedulerGlobalLock.cpp +++ b/objectstore/SchedulerGlobalLock.cpp @@ -43,7 +43,8 @@ void SchedulerGlobalLock::initialize() { m_payloadInterpreted = 1; } -void SchedulerGlobalLock::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) { +void SchedulerGlobalLock::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) { checkPayloadWritable(); // If the agent is not anymore the owner of the object, then only the very // last operation of the drive register creation failed. We have nothing to do. diff --git a/objectstore/SchedulerGlobalLock.hpp b/objectstore/SchedulerGlobalLock.hpp index 9781361a48517be412b8dafa60c75427a297406a..d6fdcecc643add6cb390d9c184eefb81b2066409 100644 --- a/objectstore/SchedulerGlobalLock.hpp +++ b/objectstore/SchedulerGlobalLock.hpp @@ -33,7 +33,8 @@ public: SchedulerGlobalLock(GenericObject & go); void initialize(); CTA_GENERATE_EXCEPTION_CLASS(NotEmpty); - void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference) override; + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, + cta::catalogue::Catalogue & catalogue) override; bool isEmpty(); // Mount id management ======================================================= diff --git a/objectstore/cta-objectstore-collect-orphaned-object.cpp b/objectstore/cta-objectstore-collect-orphaned-object.cpp index ed95792a15c9e5ade4040cd3a406567559cfadbf..5506f4f7376f820fe24c067ce868a117e3efc941 100644 --- a/objectstore/cta-objectstore-collect-orphaned-object.cpp +++ b/objectstore/cta-objectstore-collect-orphaned-object.cpp @@ -28,16 +28,26 @@ #include "RootEntry.hpp" #include "ArchiveRequest.hpp" #include "GenericObject.hpp" +#include "common/log/StringLogger.hpp" +#include "catalogue/CatalogueFactory.hpp" #include <iostream> #include <stdexcept> +#include <bits/unique_ptr.h> int main(int argc, char ** argv) { try { std::unique_ptr<cta::objectstore::Backend> be; - if (3 == argc) { + std::unique_ptr<cta::catalogue::Catalogue> catalogue; + cta::log::StringLogger sl("cta-objectstore-collect-orphaned", cta::log::DEBUG); + cta::log::LogContext lc(sl); + if (4 == argc) { be.reset(cta::objectstore::BackendFactory::createBackend(argv[1]).release()); + const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(argv[2]); + const uint64_t nbConns = 1; + const uint64_t nbArchiveFileListingConns = 0; + catalogue=std::move(cta::catalogue::CatalogueFactory::create(sl, catalogueLogin, nbConns, nbArchiveFileListingConns)); } else { - throw std::runtime_error("Wrong number of arguments: expected 2"); + throw std::runtime_error("Wrong number of arguments: expected 3: <objectstoreURL> <catalogue login file> <objectname>"); } // If the backend is a VFS, make sure we don't delete it on exit. // If not, nevermind. @@ -69,7 +79,7 @@ int main(int argc, char ** argv) { for (auto & j: ar.dumpJobs()) { if (!be->exists(j.owner)) { std::cout << "Owner " << j.owner << " for job " << j.copyNb << " does not exist." << std::endl; - ar.garbageCollect(j.owner, agr); + ar.garbageCollect(j.owner, agr, lc, *catalogue); someGcDone=true; goto gcpass; } diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index fa0f270e020e375e458c90116ddc809305ae3f4f..52d723620fe023d19f2c3805ee78b992a1dcf9ff 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -48,8 +48,8 @@ using namespace objectstore; //------------------------------------------------------------------------------ // OStoreDB::OStoreDB() //------------------------------------------------------------------------------ -OStoreDB::OStoreDB(objectstore::Backend& be): - m_objectStore(be) {} +OStoreDB::OStoreDB(objectstore::Backend& be, catalogue::Catalogue & catalogue, log::Logger &logger): + m_objectStore(be), m_catalogue(catalogue), m_logger(logger) {} //------------------------------------------------------------------------------ // OStoreDB::~OStoreDB() @@ -185,7 +185,7 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> //Allocate the getMountInfostructure to return. assertAgentAddressSet(); std::unique_ptr<OStoreDB::TapeMountDecisionInfo> privateRet (new OStoreDB::TapeMountDecisionInfo( - m_objectStore, *m_agentReference)); + m_objectStore, *m_agentReference, m_catalogue, m_logger)); TapeMountDecisionInfo & tmdi=*privateRet; // Get all the tape pools and tapes with queues (potential mounts) objectstore::RootEntry re(m_objectStore); @@ -527,7 +527,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCancelation> assertAgentAddressSet(); // Construct the return value immediately std::unique_ptr<cta::OStoreDB::ArchiveToFileRequestCancelation> - internalRet(new cta::OStoreDB::ArchiveToFileRequestCancelation(*m_agentReference, m_objectStore)); + internalRet(new cta::OStoreDB::ArchiveToFileRequestCancelation(*m_agentReference, m_objectStore, m_catalogue, m_logger)); cta::objectstore::ArchiveRequest & ar = internalRet->m_request; cta::objectstore::ScopedExclusiveLock & atfrl = internalRet->m_lock; // Attempt to find the request @@ -595,7 +595,8 @@ void OStoreDB::ArchiveToFileRequestCancelation::complete() { OStoreDB::ArchiveToFileRequestCancelation::~ArchiveToFileRequestCancelation() { if (!m_closed) { try { - m_request.garbageCollect(m_agentReference.getAgentAddress(), m_agentReference); + log::LogContext lc(m_logger); + m_request.garbageCollect(m_agentReference.getAgentAddress(), m_agentReference, lc, m_catalogue); m_agentReference.removeFromOwnership(m_request.getAddressIfSet(), m_objectStore); } catch (...) {} } @@ -760,39 +761,7 @@ std::map<std::string, std::list<common::dataStructures::ArchiveJob> > std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueueStatistics( const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string> & vidsToConsider) { - std::list<RetrieveQueueStatistics> ret; - // Find the retrieve queues for each vid if they exist (absence is possible). - RootEntry re(m_objectStore); - ScopedSharedLock rel(re); - re.fetch(); - rel.release(); - for (auto &tf:criteria.archiveFile.tapeFiles) { - if (!vidsToConsider.count(tf.second.vid)) - continue; - std::string rqAddr; - try { - std::string rqAddr = re.getRetrieveQueue(tf.second.vid); - } catch (cta::exception::Exception &) { - ret.push_back(RetrieveQueueStatistics()); - ret.back().vid=tf.second.vid; - ret.back().bytesQueued=0; - ret.back().currentPriority=0; - ret.back().filesQueued=0; - continue; - } - RetrieveQueue rq(rqAddr, m_objectStore); - ScopedSharedLock rql(rq); - rq.fetch(); - rql.release(); - if (rq.getVid() != tf.second.vid) - throw cta::exception::Exception("In OStoreDB::getRetrieveQueueStatistics(): unexpected vid for retrieve queue"); - ret.push_back(RetrieveQueueStatistics()); - ret.back().vid=rq.getVid(); - ret.back().currentPriority=rq.getJobsSummary().priority; - ret.back().bytesQueued=rq.getJobsSummary().bytes; - ret.back().filesQueued=rq.getJobsSummary().files; - } - return ret; + return Helpers::getRetrieveQueueStatistics(criteria, vidsToConsider, m_objectStore); } //------------------------------------------------------------------------------ @@ -1026,6 +995,9 @@ void OStoreDB::reportDriveStatus(const common::dataStructures::DriveInfo& driveI dr.commit(); } +//------------------------------------------------------------------------------ +// OStoreDB::updateDriveStatusInRegitry() +//------------------------------------------------------------------------------ void OStoreDB::updateDriveStatusInRegitry(objectstore::DriveRegister& dr, const common::dataStructures::DriveInfo& driveInfo, const ReportDriveStatusInputs& inputs) { using common::dataStructures::DriveStatus; @@ -1098,6 +1070,9 @@ void OStoreDB::updateDriveStatusInRegitry(objectstore::DriveRegister& dr, dr.setDriveState(driveState); } +//------------------------------------------------------------------------------ +// OStoreDB::updateDriveStatsInRegitry() +//------------------------------------------------------------------------------ void OStoreDB::updateDriveStatsInRegitry(objectstore::DriveRegister& dr, const common::dataStructures::DriveInfo& driveInfo, const ReportDriveStatsInputs& inputs) { using common::dataStructures::DriveStatus; @@ -1423,7 +1398,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount> // Check we actually hold the scheduling lock // Set the drive status to up, and indicate which tape we use. std::unique_ptr<OStoreDB::ArchiveMount> privateRet( - new OStoreDB::ArchiveMount(m_objectStore, m_agentReference)); + new OStoreDB::ArchiveMount(m_objectStore, m_agentReference, m_catalogue, m_logger)); auto &am = *privateRet; // Check we hold the scheduling lock if (!m_lockTaken) @@ -1481,8 +1456,8 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount> // OStoreDB::TapeMountDecisionInfo::TapeMountDecisionInfo() //------------------------------------------------------------------------------ OStoreDB::TapeMountDecisionInfo::TapeMountDecisionInfo( - objectstore::Backend& os, objectstore::AgentReference& a): - m_lockTaken(false), m_objectStore(os), m_agentReference(a) {} + objectstore::Backend& os, objectstore::AgentReference& a, catalogue::Catalogue & c, log::Logger &l): + m_lockTaken(false), m_objectStore(os), m_catalogue(c), m_logger(l), m_agentReference(a) {} //------------------------------------------------------------------------------ // OStoreDB::TapeMountDecisionInfo::createArchiveMount() @@ -1566,8 +1541,8 @@ OStoreDB::TapeMountDecisionInfo::~TapeMountDecisionInfo() { //------------------------------------------------------------------------------ // OStoreDB::ArchiveMount::ArchiveMount() //------------------------------------------------------------------------------ -OStoreDB::ArchiveMount::ArchiveMount(objectstore::Backend& os, objectstore::AgentReference& a): - m_objectStore(os), m_agentReference(a) {} +OStoreDB::ArchiveMount::ArchiveMount(objectstore::Backend& os, objectstore::AgentReference& a, catalogue::Catalogue & c, log::Logger & l): + m_objectStore(os), m_catalogue(c), m_logger(l), m_agentReference(a) {} //------------------------------------------------------------------------------ // OStoreDB::ArchiveMount::getMountInfo() @@ -1649,7 +1624,7 @@ auto OStoreDB::ArchiveMount::getNextJob(log::LogContext &logContext) -> std::uni // Prepare the return value auto job=aq.dumpJobs().front(); std::unique_ptr<OStoreDB::ArchiveJob> privateRet(new OStoreDB::ArchiveJob( - job.address, m_objectStore, m_agentReference, *this)); + job.address, m_objectStore, m_catalogue, m_logger, m_agentReference, *this)); privateRet->tapeFile.copyNb = job.copyNb; objectstore::ScopedExclusiveLock arl; try { @@ -1866,7 +1841,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun candidateDumps.pop_front(); currentFiles++; currentBytes+=job.size; - candidateJobs.emplace_back(new OStoreDB::ArchiveJob(job.address, m_objectStore, m_agentReference, *this)); + candidateJobs.emplace_back(new OStoreDB::ArchiveJob(job.address, m_objectStore, m_catalogue, m_logger, m_agentReference, *this)); candidateJobs.back()->tapeFile.copyNb = job.copyNb; } { @@ -2065,8 +2040,8 @@ void OStoreDB::ArchiveMount::complete(time_t completionTime) { // OStoreDB::ArchiveJob::ArchiveJob() //------------------------------------------------------------------------------ OStoreDB::ArchiveJob::ArchiveJob(const std::string& jobAddress, - objectstore::Backend& os, objectstore::AgentReference& ar, ArchiveMount & am): m_jobOwned(false), - m_objectStore(os), m_agentReference(ar), m_archiveRequest(jobAddress, os), m_archiveMount(am) {} + objectstore::Backend& os, catalogue::Catalogue & c, log::Logger & l, objectstore::AgentReference& ar, ArchiveMount & am): m_jobOwned(false), + m_objectStore(os), m_catalogue(c), m_logger(l), m_agentReference(ar), m_archiveRequest(jobAddress, os), m_archiveMount(am) {} //------------------------------------------------------------------------------ // OStoreDB::RetrieveMount::RetrieveMount() @@ -2413,10 +2388,11 @@ bool OStoreDB::ArchiveJob::succeed() { //------------------------------------------------------------------------------ OStoreDB::ArchiveJob::~ArchiveJob() { if (m_jobOwned) { + log::LogContext lc(m_logger); // Return the job to the pot if we failed to handle it. objectstore::ScopedExclusiveLock atfrl(m_archiveRequest); m_archiveRequest.fetch(); - m_archiveRequest.garbageCollect(m_agentReference.getAgentAddress(), m_agentReference); + m_archiveRequest.garbageCollect(m_agentReference.getAgentAddress(), m_agentReference, lc, m_catalogue); atfrl.release(); // Remove ownership from agent m_agentReference.removeFromOwnership(m_archiveRequest.getAddressIfSet(), m_objectStore); diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 6bd843bcb04dcae4f3164d670c2a2ef8cb3ee7a2..e3b9bebd88d61e7c368b96c9b5ceaa9053835b50 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -27,6 +27,8 @@ #include "objectstore/DriveRegister.hpp" #include "objectstore/RetrieveRequest.hpp" #include "objectstore/SchedulerGlobalLock.hpp" +#include "catalogue/Catalogue.hpp" +#include "common/log/Logger.hpp" namespace cta { @@ -43,7 +45,7 @@ namespace ostoredb { class OStoreDB: public SchedulerDatabase { friend class cta::ostoredb::MemArchiveQueue; public: - OStoreDB(objectstore::Backend & be); + OStoreDB(objectstore::Backend & be, catalogue::Catalogue & catalogue, log::Logger &logger); virtual ~OStoreDB() throw(); /* === Object store and agent handling ==================================== */ @@ -75,11 +77,13 @@ public: time_t startTime) override; virtual ~TapeMountDecisionInfo(); private: - TapeMountDecisionInfo (objectstore::Backend &, objectstore::AgentReference &); + TapeMountDecisionInfo (objectstore::Backend &, objectstore::AgentReference &, catalogue::Catalogue &, log::Logger &l); bool m_lockTaken; objectstore::ScopedExclusiveLock m_lockOnSchedulerGlobalLock; std::unique_ptr<objectstore::SchedulerGlobalLock> m_schedulerGlobalLock; objectstore::Backend & m_objectStore; + catalogue::Catalogue & m_catalogue; + log::Logger & m_logger; objectstore::AgentReference & m_agentReference; }; @@ -112,8 +116,10 @@ public: class ArchiveMount: public SchedulerDatabase::ArchiveMount { friend class TapeMountDecisionInfo; private: - ArchiveMount(objectstore::Backend &, objectstore::AgentReference &); + ArchiveMount(objectstore::Backend &, objectstore::AgentReference &, catalogue::Catalogue &, log::Logger &); objectstore::Backend & m_objectStore; + catalogue::Catalogue & m_catalogue; + log::Logger & m_logger; objectstore::AgentReference & m_agentReference; public: CTA_GENERATE_EXCEPTION_CLASS(MaxFSeqNotGoingUp); @@ -137,12 +143,14 @@ public: void bumpUpTapeFileCount(uint64_t newFileCount) override; ~ArchiveJob() override; private: - ArchiveJob(const std::string &, objectstore::Backend &, - objectstore::AgentReference &, ArchiveMount &); + ArchiveJob(const std::string &, objectstore::Backend &, catalogue::Catalogue &, + log::Logger &, objectstore::AgentReference &, ArchiveMount &); bool m_jobOwned; uint64_t m_mountId; std::string m_tapePool; objectstore::Backend & m_objectStore; + catalogue::Catalogue & m_catalogue; + log::Logger & m_logger; objectstore::AgentReference & m_agentReference; objectstore::ArchiveRequest m_archiveRequest; ArchiveMount & m_archiveMount; @@ -198,14 +206,16 @@ public: public SchedulerDatabase::ArchiveToFileRequestCancelation { public: ArchiveToFileRequestCancelation(objectstore::AgentReference & agentReference, - objectstore::Backend & be): m_request(be), m_lock(), m_objectStore(be), - m_agentReference(agentReference), m_closed(false) {} + objectstore::Backend & be, catalogue::Catalogue & catalogue, log::Logger &logger): m_request(be), m_lock(), m_objectStore(be), + m_catalogue(catalogue), m_logger(logger), m_agentReference(agentReference), m_closed(false) {} virtual ~ArchiveToFileRequestCancelation(); void complete() override; private: objectstore::ArchiveRequest m_request; objectstore::ScopedExclusiveLock m_lock; objectstore::Backend & m_objectStore; + catalogue::Catalogue & m_catalogue; + log::Logger & m_logger; objectstore::AgentReference &m_agentReference; bool m_closed; friend class OStoreDB; @@ -330,6 +340,8 @@ private: private: objectstore::Backend & m_objectStore; + catalogue::Catalogue & m_catalogue; + log::Logger & m_logger; objectstore::AgentReference *m_agentReference = nullptr; }; diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index 558f9e8cdf74b9c2612e621693c660391d0d6eec..0afddd3d6a9a01d47f28c3859799a4eace7cf137 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -30,6 +30,8 @@ #include "objectstore/BackendVFS.hpp" #include "objectstore/BackendRados.hpp" #include "objectstore/BackendFactory.hpp" +#include "common/log/DummyLogger.hpp" +#include "catalogue/DummyCatalogue.hpp" #include <memory> namespace cta { @@ -160,6 +162,8 @@ public: private: std::unique_ptr <cta::objectstore::Backend> m_backend; + std::unique_ptr <cta::log::Logger> m_logger; + std::unique_ptr <cta::catalogue::Catalogue> m_catalogue; cta::OStoreDB m_OStoreDB; objectstore::AgentReference m_agentReference; }; @@ -167,8 +171,9 @@ private: template <> OStoreDBWrapper<cta::objectstore::BackendVFS>::OStoreDBWrapper( const std::string &context, const std::string &URL) : -m_backend(new cta::objectstore::BackendVFS()), -m_OStoreDB(*m_backend), m_agentReference("OStoreDBFactory") { +m_backend(new cta::objectstore::BackendVFS()), m_logger(new cta::log::DummyLogger("")), +m_catalogue(new cta::catalogue::DummyCatalogue(*m_logger)), +m_OStoreDB(*m_backend, *m_catalogue, *m_logger), m_agentReference("OStoreDBFactory") { // We need to populate the root entry before using. objectstore::RootEntry re(*m_backend); re.initialize(); @@ -192,8 +197,9 @@ m_OStoreDB(*m_backend), m_agentReference("OStoreDBFactory") { template <> OStoreDBWrapper<cta::objectstore::BackendRados>::OStoreDBWrapper( const std::string &context, const std::string &URL) : -m_backend(cta::objectstore::BackendFactory::createBackend(URL).release()), -m_OStoreDB(*m_backend), m_agentReference("OStoreDBFactory") { +m_backend(cta::objectstore::BackendFactory::createBackend(URL).release()), m_logger(new cta::log::DummyLogger("")), +m_catalogue(new cta::catalogue::DummyCatalogue(*m_logger)), +m_OStoreDB(*m_backend, *m_catalogue, *m_logger), m_agentReference("OStoreDBFactory") { // We need to first clean up possible left overs in the pool auto l = m_backend->list(); for (auto o=l.begin(); o!=l.end(); o++) { diff --git a/scheduler/OStoreDB/OStoreDBWithAgent.cpp b/scheduler/OStoreDB/OStoreDBWithAgent.cpp index de65c0bebccca6cdcf863905f976e20342a73529..fac457fde2aacc041db39d358d91648350fe4f16 100644 --- a/scheduler/OStoreDB/OStoreDBWithAgent.cpp +++ b/scheduler/OStoreDB/OStoreDBWithAgent.cpp @@ -21,7 +21,8 @@ //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ -cta::OStoreDBWithAgent::OStoreDBWithAgent(cta::objectstore::Backend & be, cta::objectstore::AgentReference & ar): cta::OStoreDB(be) { +cta::OStoreDBWithAgent::OStoreDBWithAgent(cta::objectstore::Backend & be, cta::objectstore::AgentReference & ar, + catalogue::Catalogue & catalogue, log::Logger & logger): cta::OStoreDB(be, catalogue, logger) { cta::OStoreDB::setAgentReference(&ar); } diff --git a/scheduler/OStoreDB/OStoreDBWithAgent.hpp b/scheduler/OStoreDB/OStoreDBWithAgent.hpp index 411d56562915d6a4bdd89b47ab7b4c2807cd3d4a..4b84ddbf2cce40d6bbd3a0aeced42618e5cc5162 100644 --- a/scheduler/OStoreDB/OStoreDBWithAgent.hpp +++ b/scheduler/OStoreDB/OStoreDBWithAgent.hpp @@ -33,7 +33,8 @@ public: * @param be The objectstore backend * @param ag The agent */ - OStoreDBWithAgent(cta::objectstore::Backend & be, cta::objectstore::AgentReference & ar); + OStoreDBWithAgent(cta::objectstore::Backend & be, cta::objectstore::AgentReference & ar, + catalogue::Catalogue & catalogue, log::Logger & logger); /** * Destructor diff --git a/tapeserver/daemon/DriveHandler.cpp b/tapeserver/daemon/DriveHandler.cpp index 9fcc9b367e12e8c9180ed168c55022e74dfa4a5d..21c6891fb7fb162d20b5606e8d5bdaf8ec7d54f5 100644 --- a/tapeserver/daemon/DriveHandler.cpp +++ b/tapeserver/daemon/DriveHandler.cpp @@ -870,7 +870,6 @@ int DriveHandler::runChild() { std::string processName="DriveProcess-"; processName+=m_configLine.unitName; backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, processName)); - osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference())); } catch(cta::exception::Exception &ex) { log::ScopedParamContainer param(m_processManager.logContext()); param.add("errorMessage", ex.getMessageValue()); @@ -885,6 +884,7 @@ int DriveHandler::runChild() { const uint64_t nbConns = 1; const uint64_t nbArchiveFileListingConns = 0; catalogue=cta::catalogue::CatalogueFactory::create(m_sessionEndContext.logger(), catalogueLogin, nbConns, nbArchiveFileListingConns); + osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *catalogue, m_processManager.logContext().logger())); } catch(cta::exception::Exception &ex) { log::ScopedParamContainer param(m_processManager.logContext()); param.add("errorMessage", ex.getMessageValue()); diff --git a/tapeserver/daemon/GarbageCollectorHandler.cpp b/tapeserver/daemon/GarbageCollectorHandler.cpp index b65e90a9696e2dee1b5be60996ce4d4307b809cc..c008046e582ca973001e773207be3b3c62dc2054 100644 --- a/tapeserver/daemon/GarbageCollectorHandler.cpp +++ b/tapeserver/daemon/GarbageCollectorHandler.cpp @@ -268,7 +268,7 @@ int GarbageCollectorHandler::runChild() { std::unique_ptr<cta::Scheduler> scheduler; try { backendPopulator.reset(new cta::objectstore::BackendPopulator(*backend, "garbageCollector")); - osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference())); + osdb.reset(new cta::OStoreDBWithAgent(*backend, backendPopulator->getAgentReference(), *catalogue, m_processManager.logContext().logger())); const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile(m_tapedConfig.fileCatalogConfigFile.value()); const uint64_t nbConns = 1; const uint64_t nbArchiveFileListingConns = 0; @@ -297,7 +297,7 @@ int GarbageCollectorHandler::runChild() { agentHeartbeat.startThread(); // Create the garbage collector itself - objectstore::GarbageCollector gc(*backend, backendPopulator->getAgentReference()); + objectstore::GarbageCollector gc(*backend, backendPopulator->getAgentReference(), *catalogue); // Run the gc in a loop try { diff --git a/xroot_plugins/XrdCtaFilesystem.cpp b/xroot_plugins/XrdCtaFilesystem.cpp index e7b46e98def0a7ad4434079f03bbf3114145aa7c..dcc73f5346c46b709199b97e5eebbe1087832a4e 100644 --- a/xroot_plugins/XrdCtaFilesystem.cpp +++ b/xroot_plugins/XrdCtaFilesystem.cpp @@ -520,8 +520,7 @@ XrdCtaFilesystem::XrdCtaFilesystem(): m_xrdOucBuffPool(1024, 65536), // XrdOucBuffPool(minsz, maxsz) m_ctaConf("/etc/cta/cta-frontend.conf"), m_backend(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr)).release()), - m_backendPopulator(*m_backend, "Frontend"), - m_scheddb(*m_backend, m_backendPopulator.getAgentReference()) { + m_backendPopulator(*m_backend, "Frontend") { using namespace cta; // Try to instantiate the logging system API @@ -544,7 +543,8 @@ XrdCtaFilesystem::XrdCtaFilesystem(): const uint64_t nbConns = m_ctaConf.getConfEntInt<uint64_t>("Catalogue", "NumberOfConnections", nullptr); const uint64_t nbArchiveFileListingConns = 2; m_catalogue = catalogue::CatalogueFactory::create(*m_log, catalogueLogin, nbConns, nbArchiveFileListingConns); - m_scheduler = cta::make_unique<cta::Scheduler>(*m_catalogue, m_scheddb, 5, 2*1000*1000); + m_scheddb = cta::make_unique<cta::OStoreDBWithAgent>(*m_backend, m_backendPopulator.getAgentReference(), *m_catalogue, *m_log); + m_scheduler = cta::make_unique<cta::Scheduler>(*m_catalogue, *m_scheddb, 5, 2*1000*1000); // If the backend is a VFS, make sure we don't delete it on exit. // If not, nevermind. diff --git a/xroot_plugins/XrdCtaFilesystem.hpp b/xroot_plugins/XrdCtaFilesystem.hpp index d3e7162a209f178c670e7d368d8f5eec2f35e7c8..a06c55b3cbb138c9f3f447b00e8bf1b36c926e4b 100644 --- a/xroot_plugins/XrdCtaFilesystem.hpp +++ b/xroot_plugins/XrdCtaFilesystem.hpp @@ -113,7 +113,7 @@ protected: /** * The database or object store holding all CTA persistent objects */ - cta::OStoreDBWithAgent m_scheddb; + std::unique_ptr<cta::OStoreDBWithAgent> m_scheddb; /** * The CTA catalogue of tapes and tape files.