From 4164f2cdb8adf9ffa64993d14345b83262469088 Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Mon, 15 Aug 2016 15:00:23 +0200 Subject: [PATCH] Split the Agent class into AgentReference and Agent. Agent becomes a usual class, like any other in the object store. Object name generation (agent's and others) is now handled in the separate AgentReference. The previous situation created race conditions. --- objectstore/Agent.cpp | 34 --- objectstore/Agent.hpp | 5 - objectstore/BackendPopulator.cpp | 23 +- objectstore/BackendPopulator.hpp | 5 +- objectstore/CMakeLists.txt | 1 + objectstore/GarbageCollectorTest.cpp | 82 +++---- objectstore/RetrieveQueueTest.cpp | 7 +- objectstore/RootEntry.cpp | 25 +- objectstore/RootEntry.hpp | 12 +- objectstore/RootEntryTest.cpp | 42 ++-- objectstore/makeMinimalVFS.cpp | 11 +- scheduler/OStoreDB/OStoreDB.cpp | 213 ++++++++++-------- scheduler/OStoreDB/OStoreDB.hpp | 33 +-- scheduler/OStoreDB/OStoreDBFactory.hpp | 35 +-- scheduler/OStoreDB/OStoreDBWithAgent.cpp | 6 +- scheduler/OStoreDB/OStoreDBWithAgent.hpp | 2 +- .../tape/tapeserver/daemon/ProcessForker.cpp | 2 +- xroot_plugins/XrdCtaFilesystem.cpp | 2 +- 18 files changed, 264 insertions(+), 276 deletions(-) diff --git a/objectstore/Agent.cpp b/objectstore/Agent.cpp index f1f20789c9..186c214c39 100644 --- a/objectstore/Agent.cpp +++ b/objectstore/Agent.cpp @@ -24,16 +24,10 @@ #include "ProtocolBuffersAlgorithms.hpp" #include <string> #include <sstream> -#include <iomanip> -#include <sys/syscall.h> #include <ctime> #include <cxxabi.h> -#include <unistd.h> #include <json-c/json.h> -cta::objectstore::Agent::Agent(Backend & os): - ObjectOps<serializers::Agent, serializers::Agent_t>(os), m_nextId(0) {} - cta::objectstore::Agent::Agent(GenericObject& go): ObjectOps<serializers::Agent, serializers::Agent_t>(go.objectStore()) { // Here we transplant the generic object into the new object @@ -53,28 +47,6 @@ void cta::objectstore::Agent::initialize() { m_payloadInterpreted = true; } -void cta::objectstore::Agent::generateName(const std::string & typeName) { - std::stringstream aid; - // Get time - time_t now = time(0); - struct tm localNow; - localtime_r(&now, &localNow); - // Get hostname - char host[200]; - cta::exception::Errnum::throwOnMinusOne(gethostname(host, sizeof(host)), - "In AgentId::AgentId: failed to gethostname"); - // gettid is a safe system call (never fails) - aid << typeName << "-" << host << "-" << syscall(SYS_gettid) << "-" - << 1900 + localNow.tm_year - << std::setfill('0') << std::setw(2) - << 1 + localNow.tm_mon - << std::setw(2) << localNow.tm_mday << "-" - << std::setw(2) << localNow.tm_hour << ":" - << std::setw(2) << localNow.tm_min << ":" - << std::setw(2) << localNow.tm_sec; - setAddress(aid.str()); -} - void cta::objectstore::Agent::insertAndRegisterSelf() { // We suppose initialize was already called, and that the agent name // is set. @@ -149,12 +121,6 @@ bool cta::objectstore::Agent::isEmpty() { m_creationDone = true; }*/ -std::string cta::objectstore::Agent::nextId(const std::string & childType) { - std::stringstream id; - id << childType << "-" << getAddressIfSet() << "-" << m_nextId++; - return id.str(); -} - void cta::objectstore::Agent::addToOwnership(std::string name) { checkPayloadWritable(); std::string * owned = m_payload.mutable_ownedobjects()->Add(); diff --git a/objectstore/Agent.hpp b/objectstore/Agent.hpp index 09580c0414..4c6390ef42 100644 --- a/objectstore/Agent.hpp +++ b/objectstore/Agent.hpp @@ -40,17 +40,12 @@ class GenericObject; class Agent: public ObjectOps<serializers::Agent, serializers::Agent_t> { public: CTA_GENERATE_EXCEPTION_CLASS(AgentStillOwnsObjects); - Agent(Backend & os); Agent(GenericObject & go); Agent(const std::string & name, Backend & os); void initialize(); - void generateName(const std::string & typeName); - - std::string nextId(const std::string & childType); - void insertAndRegisterSelf(); void removeAndUnregisterSelf(); diff --git a/objectstore/BackendPopulator.cpp b/objectstore/BackendPopulator.cpp index 9e8c68afdd..c615d04e73 100644 --- a/objectstore/BackendPopulator.cpp +++ b/objectstore/BackendPopulator.cpp @@ -25,18 +25,18 @@ namespace cta { namespace objectstore { //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ -BackendPopulator::BackendPopulator(cta::objectstore::Backend & be): m_backend(be), m_agent(m_backend) { +BackendPopulator::BackendPopulator(cta::objectstore::Backend & be): m_backend(be), m_agentReference("OStoreDBFactory") { cta::objectstore::RootEntry re(m_backend); cta::objectstore::ScopedExclusiveLock rel(re); re.fetch(); - m_agent.generateName("OStoreDBFactory"); - m_agent.initialize(); + Agent agent(m_agentReference.getAgentAddress(), m_backend); + agent.initialize(); cta::objectstore::EntryLogSerDeser cl("user0", "systemhost", time(NULL)); - re.addOrGetAgentRegisterPointerAndCommit(m_agent,cl); + re.addOrGetAgentRegisterPointerAndCommit(m_agentReference, cl); rel.release(); - m_agent.insertAndRegisterSelf(); + agent.insertAndRegisterSelf(); rel.lock(re); - re.addOrGetDriveRegisterPointerAndCommit(m_agent, cl); + re.addOrGetDriveRegisterPointerAndCommit(m_agentReference, cl); rel.release(); } @@ -44,16 +44,17 @@ BackendPopulator::BackendPopulator(cta::objectstore::Backend & be): m_backend(be // Destructor //------------------------------------------------------------------------------ BackendPopulator::~BackendPopulator() throw() { - cta::objectstore::ScopedExclusiveLock agl(m_agent); - m_agent.fetch(); - m_agent.removeAndUnregisterSelf(); + Agent agent(m_agentReference.getAgentAddress(), m_backend); + cta::objectstore::ScopedExclusiveLock agl(agent); + agent.fetch(); + agent.removeAndUnregisterSelf(); } //------------------------------------------------------------------------------ // getAgent //------------------------------------------------------------------------------ -cta::objectstore::Agent & BackendPopulator::getAgent() { - return m_agent; +cta::objectstore::AgentReference & BackendPopulator::getAgentReference() { + return m_agentReference; } }} \ No newline at end of file diff --git a/objectstore/BackendPopulator.hpp b/objectstore/BackendPopulator.hpp index 374deef672..81b255a5b1 100644 --- a/objectstore/BackendPopulator.hpp +++ b/objectstore/BackendPopulator.hpp @@ -19,6 +19,7 @@ #pragma once #include "objectstore/Agent.hpp" +#include "objectstore/AgentReference.hpp" #include "objectstore/Backend.hpp" namespace cta { namespace objectstore { @@ -43,7 +44,7 @@ public: * * @return the agent */ - cta::objectstore::Agent & getAgent(); + cta::objectstore::AgentReference & getAgentReference(); private: /** @@ -54,7 +55,7 @@ private: /** * The agent */ - cta::objectstore::Agent m_agent; + cta::objectstore::AgentReference m_agentReference; }; }} \ No newline at end of file diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt index fb1dbc8667..4ce195caf3 100644 --- a/objectstore/CMakeLists.txt +++ b/objectstore/CMakeLists.txt @@ -51,6 +51,7 @@ add_library (ctaobjectstore SHARED ${CTAProtoSources} RootEntry.cpp Agent.cpp + AgentReference.cpp AgentRegister.cpp AgentWatchdog.cpp ArchiveQueue.cpp diff --git a/objectstore/GarbageCollectorTest.cpp b/objectstore/GarbageCollectorTest.cpp index 3ecf3cda89..9e73f02fbc 100644 --- a/objectstore/GarbageCollectorTest.cpp +++ b/objectstore/GarbageCollectorTest.cpp @@ -23,6 +23,7 @@ #include "GarbageCollector.hpp" #include "RootEntry.hpp" #include "Agent.hpp" +#include "AgentReference.hpp" #include "AgentRegister.hpp" #include "DriveRegister.hpp" #include "ArchiveRequest.hpp" @@ -35,8 +36,8 @@ TEST(ObjectStore, GarbageCollectorBasicFuctionnality) { // Here we check for the ability to detect dead (but empty agents) // and clean them up. cta::objectstore::BackendVFS be; - cta::objectstore::Agent agent(be); - agent.generateName("unitTestGarbageCollector"); + cta::objectstore::AgentReference agentRef("unitTestGarbageCollector"); + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); // Create the root entry cta::objectstore::RootEntry re(be); re.initialize(); @@ -45,24 +46,23 @@ TEST(ObjectStore, GarbageCollectorBasicFuctionnality) { cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); cta::objectstore::ScopedExclusiveLock rel(re); - re.addOrGetAgentRegisterPointerAndCommit(agent, el); + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el); rel.release(); // Create 2 agents, A and B and register them // The agents are set with a timeout of 0, so they will be delclared // dead immediately. - cta::objectstore::Agent agA(be), agB(be); + cta::objectstore::AgentReference agrA("unitTestAgentA"), agrB("unitTestAgentB"); + cta::objectstore::Agent agA(agrA.getAgentAddress(), be), agB(agrB.getAgentAddress(), be); agA.initialize(); - agA.generateName("unitTestAgentA"); agA.setTimeout_us(0); agA.insertAndRegisterSelf(); agB.initialize(); - agB.generateName("unitTestAgentB"); agB.setTimeout_us(0); agB.insertAndRegisterSelf(); // Create the garbage colletor and run it twice. - cta::objectstore::Agent gcAgent(be); + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector"); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); gcAgent.initialize(); - gcAgent.generateName("unitTestGarbageCollector"); gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { @@ -83,8 +83,8 @@ TEST(ObjectStore, GarbageCollectorBasicFuctionnality) { TEST(ObjectStore, GarbageCollectorRegister) { // Here we check that can successfully call agentRegister's garbage collector cta::objectstore::BackendVFS be; - cta::objectstore::Agent agent(be); - agent.generateName("unitTestGarbageCollector"); + cta::objectstore::AgentReference agentRef("unitTestGarbageCollector"); + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); // Create the root entry cta::objectstore::RootEntry re(be); re.initialize(); @@ -93,19 +93,19 @@ TEST(ObjectStore, GarbageCollectorRegister) { cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); cta::objectstore::ScopedExclusiveLock rel(re); - re.addOrGetAgentRegisterPointerAndCommit(agent, el); + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el); rel.release(); // Create an agent and add and agent register to it as an owned object - cta::objectstore::Agent agA(be); + cta::objectstore::AgentReference agrA("unitTestAgentA"); + cta::objectstore::Agent agA(agrA.getAgentAddress(), be); agA.initialize(); - agA.generateName("unitTestAgentA"); agA.setTimeout_us(0); agA.insertAndRegisterSelf(); // Create a new agent register, owned by agA (by hand as it is not an usual // situation) std::string arName; { - arName = agA.nextId("AgentRegister"); + arName = agrA.nextId("AgentRegister"); cta::objectstore::AgentRegister ar(arName, be); ar.initialize(); ar.setOwner(agA.getAddressIfSet()); @@ -116,9 +116,9 @@ TEST(ObjectStore, GarbageCollectorRegister) { ar.insert(); } // Create the garbage colletor and run it twice. - cta::objectstore::Agent gcAgent(be); + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector"); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); gcAgent.initialize(); - gcAgent.generateName("unitTestGarbageCollector"); gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { @@ -140,8 +140,8 @@ TEST(ObjectStore, GarbageCollectorRegister) { TEST(ObjectStore, GarbageCollectorArchiveQueue) { // Here we check that can successfully call agentRegister's garbage collector cta::objectstore::BackendVFS be; - cta::objectstore::Agent agent(be); - agent.generateName("unitTestGarbageCollector"); + cta::objectstore::AgentReference agentRef("unitTestGarbageCollector"); + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); // Create the root entry cta::objectstore::RootEntry re(be); re.initialize(); @@ -150,19 +150,19 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) { cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); cta::objectstore::ScopedExclusiveLock rel(re); - re.addOrGetAgentRegisterPointerAndCommit(agent, el); + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el); rel.release(); // Create an agent and add and agent register to it as an owned object - cta::objectstore::Agent agA(be); + cta::objectstore::AgentReference agrA("unitTestAgentA"); + cta::objectstore::Agent agA(agrA.getAgentAddress(), be); agA.initialize(); - agA.generateName("unitTestAgentA"); agA.setTimeout_us(0); agA.insertAndRegisterSelf(); // Create a new agent register, owned by agA (by hand as it is not an usual // situation) std::string tpName; { - tpName = agA.nextId("ArchiveQueue"); + tpName = agrA.nextId("ArchiveQueue"); cta::objectstore::ArchiveQueue aq(tpName, be); aq.initialize("SomeTP"); aq.setOwner(agA.getAddressIfSet()); @@ -173,9 +173,9 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) { aq.insert(); } // Create the garbage colletor and run it twice. - cta::objectstore::Agent gcAgent(be); + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector"); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); gcAgent.initialize(); - gcAgent.generateName("unitTestGarbageCollector"); gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { @@ -197,8 +197,8 @@ TEST(ObjectStore, GarbageCollectorArchiveQueue) { TEST(ObjectStore, GarbageCollectorDriveRegister) { // Here we check that can successfully call agentRegister's garbage collector cta::objectstore::BackendVFS be; - cta::objectstore::Agent agent(be); - agent.generateName("unitTestGarbageCollector"); + cta::objectstore::AgentReference agentRef("unitTestGarbageCollector"); + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); // Create the root entry cta::objectstore::RootEntry re(be); re.initialize(); @@ -207,19 +207,19 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) { cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); cta::objectstore::ScopedExclusiveLock rel(re); - re.addOrGetAgentRegisterPointerAndCommit(agent, el); + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el); rel.release(); // Create an agent and add the drive register to it as an owned object - cta::objectstore::Agent agA(be); + cta::objectstore::AgentReference agrA("unitTestAgentA"); + cta::objectstore::Agent agA(agrA.getAgentAddress(), be); agA.initialize(); - agA.generateName("unitTestAgentA"); agA.setTimeout_us(0); agA.insertAndRegisterSelf(); // Create a new drive register, owned by agA (by hand as it is not an usual // situation) std::string tpName; { - tpName = agA.nextId("ArchiveQueue"); + tpName = agrA.nextId("ArchiveQueue"); cta::objectstore::DriveRegister dr(tpName, be); dr.initialize(); dr.setOwner(agA.getAddressIfSet()); @@ -230,9 +230,9 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) { dr.insert(); } // Create the garbage colletor and run it twice. - cta::objectstore::Agent gcAgent(be); + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector"); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); gcAgent.initialize(); - gcAgent.generateName("unitTestGarbageCollector"); gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { @@ -254,8 +254,8 @@ TEST(ObjectStore, GarbageCollectorDriveRegister) { TEST(ObjectStore, GarbageCollectorArchiveRequest) { // Here we check that can successfully call ArchiveRequests's garbage collector cta::objectstore::BackendVFS be; - cta::objectstore::Agent agent(be); - agent.generateName("unitTestGarbageCollector"); + cta::objectstore::AgentReference agentRef("unitTestGarbageCollector"); + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); // Create the root entry cta::objectstore::RootEntry re(be); re.initialize(); @@ -264,12 +264,12 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); cta::objectstore::ScopedExclusiveLock rel(re); - re.addOrGetAgentRegisterPointerAndCommit(agent, el); + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el); rel.release(); // Create an agent - cta::objectstore::Agent agA(be); + cta::objectstore::AgentReference agrA("unitTestAgentA"); + cta::objectstore::Agent agA(agrA.getAgentAddress(), be); agA.initialize(); - agA.generateName("unitTestAgentA"); agA.setTimeout_us(0); agA.insertAndRegisterSelf(); // Several use cases are present for the ArchiveRequests: @@ -286,7 +286,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { { std::stringstream aqid; aqid << "ArchiveQueue" << i; - tpAddr[i] = agent.nextId(aqid.str()); + tpAddr[i] = agentRef.nextId(aqid.str()); cta::objectstore::ArchiveQueue aq(tpAddr[i], be); aq.initialize(aqid.str()); aq.setOwner(""); @@ -297,7 +297,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { while (true) { // -just referenced - std::string atfrAddr = agA.nextId("ArchiveRequest"); + std::string atfrAddr = agrA.nextId("ArchiveRequest"); cta::objectstore::ScopedExclusiveLock agl(agA); agA.fetch(); agA.addToOwnership(atfrAddr); @@ -383,9 +383,9 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) { break; } // Create the garbage collector and run it twice. - cta::objectstore::Agent gcAgent(be); + cta::objectstore::AgentReference gcAgentRef("unitTestGarbageCollector"); + cta::objectstore::Agent gcAgent(gcAgentRef.getAgentAddress(), be); gcAgent.initialize(); - gcAgent.generateName("unitTestGarbageCollector"); gcAgent.setTimeout_us(0); gcAgent.insertAndRegisterSelf(); { diff --git a/objectstore/RetrieveQueueTest.cpp b/objectstore/RetrieveQueueTest.cpp index 402711ea63..964c35acdf 100644 --- a/objectstore/RetrieveQueueTest.cpp +++ b/objectstore/RetrieveQueueTest.cpp @@ -19,15 +19,14 @@ #include <gtest/gtest.h> #include "RetrieveQueue.hpp" #include "BackendVFS.hpp" -#include "Agent.hpp" +#include "AgentReference.hpp" namespace unitTests { TEST(ObjectStore, RetrieveQueueBasicAccess) { cta::objectstore::BackendVFS be; - cta::objectstore::Agent agent(be); - agent.generateName("unitTest"); - std::string retrieveQueueAddress = agent.nextId("RetrieveQueue"); + cta::objectstore::AgentReference agentRef("unitTest"); + std::string retrieveQueueAddress = agentRef.nextId("RetrieveQueue"); { // Try to create the tape entry cta::objectstore::RetrieveQueue rq(retrieveQueueAddress, be); diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 0c75a35fed..42cbcaacb1 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -19,6 +19,7 @@ #include "RootEntry.hpp" #include "AgentRegister.hpp" #include "Agent.hpp" +#include "AgentReference.hpp" #include "ArchiveQueue.hpp" #include "RetrieveQueue.hpp" #include "DriveRegister.hpp" @@ -93,7 +94,7 @@ namespace { } } -std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, Agent& agent) { +std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, AgentReference& agentRef) { checkPayloadWritable(); // Check the archive queue does not already exist try { @@ -101,8 +102,9 @@ std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool } catch (serializers::NotFound &) {} // Insert the archive queue, then its pointer, with agent intent log update // First generate the intent. We expect the agent to be passed locked. - std::string archiveQueueAddress = agent.nextId("archiveQueue"); + std::string archiveQueueAddress = agentRef.nextId("archiveQueue"); // TODO Do we expect the agent to be passed locked or not: to be clarified. + Agent agent(agentRef.getAgentAddress(), m_objectStore); ScopedExclusiveLock agl(agent); agent.fetch(); agent.addToOwnership(archiveQueueAddress); @@ -213,7 +215,7 @@ namespace { } } -std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Agent& agent) { +std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, AgentReference& agentRef) { checkPayloadWritable(); // Check the retrieve queue does not already exist try { @@ -221,8 +223,9 @@ std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, Ag } catch (serializers::NotFound &) {} // Insert the retrieve queue, then its pointer, with agent intent log update // First generate the intent. We expect the agent to be passed locked. - std::string retrieveQueueAddress = agent.nextId("retriveQueue"); + std::string retrieveQueueAddress = agentRef.nextId("retriveQueue"); // TODO Do we expect the agent to be passed locked or not: to be clarified. + Agent agent(agentRef.getAgentAddress(), m_objectStore); ScopedExclusiveLock agl(agent); agent.fetch(); agent.addToOwnership(retrieveQueueAddress); @@ -324,7 +327,7 @@ auto RootEntry::dumpRetrieveQueues() -> std::list<RetrieveQueueDump> { // ============================================================================= std::string RootEntry::addOrGetDriveRegisterPointerAndCommit( - Agent & agent, const EntryLogSerDeser & log) { + AgentReference& agentRef, const EntryLogSerDeser & log) { checkPayloadWritable(); // Check if the drive register exists try { @@ -332,7 +335,8 @@ std::string RootEntry::addOrGetDriveRegisterPointerAndCommit( } catch (NotAllocated &) { // decide on the object's name and add to agent's intent. We expect the // agent to be passed locked. - std::string drAddress (agent.nextId("driveRegister")); + std::string drAddress (agentRef.nextId("driveRegister")); + Agent agent(agentRef.getAgentAddress(), m_objectStore); ScopedExclusiveLock agl(agent); agent.fetch(); agent.addToOwnership(drAddress); @@ -408,7 +412,7 @@ std::string RootEntry::getAgentRegisterAddress() { } // Get the name of a (possibly freshly created) agent register -std::string RootEntry::addOrGetAgentRegisterPointerAndCommit(Agent & agent, +std::string RootEntry::addOrGetAgentRegisterPointerAndCommit(AgentReference& agentRef, const EntryLogSerDeser & log) { // Check if the agent register exists try { @@ -424,7 +428,7 @@ std::string RootEntry::addOrGetAgentRegisterPointerAndCommit(Agent & agent, return m_payload.agentregisterpointer().address(); } // decide on the object's name - std::string arAddress (agent.nextId("agentRegister")); + std::string arAddress (agentRef.nextId("agentRegister")); // Record the agent registry in our own intent addIntendedAgentRegistry(arAddress); commit(); @@ -535,7 +539,7 @@ std::string RootEntry::getSchedulerGlobalLock() { } // Get the name of a (possibly freshly created) scheduler global lock -std::string RootEntry::addOrGetSchedulerGlobalLockAndCommit(Agent & agent, +std::string RootEntry::addOrGetSchedulerGlobalLockAndCommit(AgentReference& agentRef, const EntryLogSerDeser & log) { checkPayloadWritable(); // Check if the drive register exists @@ -544,7 +548,8 @@ std::string RootEntry::addOrGetSchedulerGlobalLockAndCommit(Agent & agent, } catch (NotAllocated &) { // decide on the object's name and add to agent's intent. We expect the // agent to be passed locked. - std::string sglAddress (agent.nextId("schedulerGlobalLock")); + std::string sglAddress (agentRef.nextId("schedulerGlobalLock")); + Agent agent(agentRef.getAgentAddress(), m_objectStore); ScopedExclusiveLock agl(agent); agent.fetch(); agent.addToOwnership(sglAddress); diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp index 6dfb1f1592..df22fe32f6 100644 --- a/objectstore/RootEntry.hpp +++ b/objectstore/RootEntry.hpp @@ -29,7 +29,7 @@ namespace cta { namespace objectstore { -class Agent; +class AgentReference; class GenericObject; class RootEntry: public ObjectOps<serializers::RootEntry, serializers::RootEntry_t> { @@ -55,7 +55,7 @@ public: CTA_GENERATE_EXCEPTION_CLASS(WrongArchiveQueue); /** This function implicitly creates the archive queue structure and updates * the pointer to it. It will implicitly commit the object to the store. */ - std::string addOrGetArchiveQueueAndCommit(const std::string & tapePool, Agent & agent); + std::string addOrGetArchiveQueueAndCommit(const std::string & tapePool, AgentReference & agentRef); /** This function implicitly deletes the tape pool structure. * Fails if it not empty*/ CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveQueue); @@ -72,7 +72,7 @@ public: CTA_GENERATE_EXCEPTION_CLASS(RetrieveQueueNotEmpty); /** This function implicitly creates the retrieve queue structure and updates * the pointer to it. It will implicitly commit the object to the store. */ - std::string addOrGetRetrieveQueueAndCommit(const std::string & vid, Agent & agent); + std::string addOrGetRetrieveQueueAndCommit(const std::string & vid, AgentReference & agentRef); CTA_GENERATE_EXCEPTION_CLASS(NoSuchRetrieveQueue); void removeRetrieveQueueAndCommit(const std::string & vid); std::string getRetrieveQueue(const std::string & vid); @@ -85,7 +85,7 @@ public: // Drive register manipulations ============================================== CTA_GENERATE_EXCEPTION_CLASS(DriveRegisterNotEmpty); std::string getDriveRegisterAddress(); - std::string addOrGetDriveRegisterPointerAndCommit(Agent & agent, const EntryLogSerDeser & log); + std::string addOrGetDriveRegisterPointerAndCommit(AgentReference & agentRef, const EntryLogSerDeser & log); void removeDriveRegisterAndCommit(); // Agent register manipulations ============================================== @@ -95,13 +95,13 @@ public: * is used to generate the object name. We have the dedicated agent intent * log for tracking objects being created. We already use an agent here for * object name generation, but not yet tracking. */ - std::string addOrGetAgentRegisterPointerAndCommit(Agent & agent, + std::string addOrGetAgentRegisterPointerAndCommit(AgentReference & agentRef, const EntryLogSerDeser & log); void removeAgentRegisterAndCommit(); // Agent register manipulations ============================================== std::string getSchedulerGlobalLock(); - std::string addOrGetSchedulerGlobalLockAndCommit(Agent & agent, const EntryLogSerDeser & log); + std::string addOrGetSchedulerGlobalLockAndCommit(AgentReference & agentRef, const EntryLogSerDeser & log); void removeSchedulerGlobalLockAndCommit(); private: diff --git a/objectstore/RootEntryTest.cpp b/objectstore/RootEntryTest.cpp index bc22c77d75..736e83d59b 100644 --- a/objectstore/RootEntryTest.cpp +++ b/objectstore/RootEntryTest.cpp @@ -22,6 +22,7 @@ #include "objectstore/SerializersExceptions.hpp" #include "RootEntry.hpp" #include "Agent.hpp" +#include "AgentReference.hpp" #include "AgentRegister.hpp" #include "ArchiveQueue.hpp" @@ -49,12 +50,12 @@ TEST(ObjectStore, RootEntryBasicAccess) { cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock lock (re); // Create an agent - cta::objectstore::Agent agent(be); - agent.generateName("unitTest"); + cta::objectstore::AgentReference agentRef("unitTest"); + cta::objectstore::Agent agent(agentRef.getAgentAddress(), be); re.fetch(); cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); - re.addOrGetAgentRegisterPointerAndCommit(agent, el); + re.addOrGetAgentRegisterPointerAndCommit(agentRef, el); ASSERT_NO_THROW(re.getAgentRegisterAddress()); re.commit(); //agent.registerSelf(); @@ -72,16 +73,16 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::BackendVFS be; cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); - cta::objectstore::Agent ag(be); + cta::objectstore::AgentReference agr("UnitTests"); + cta::objectstore::Agent ag(agr.getAgentAddress(), be); ag.initialize(); - ag.generateName("UnitTests"); { // Try to create the root entry and allocate the agent register cta::objectstore::RootEntry re(be); re.initialize(); re.insert(); cta::objectstore::ScopedExclusiveLock rel(re); - re.addOrGetAgentRegisterPointerAndCommit(ag, el); + re.addOrGetAgentRegisterPointerAndCommit(agr, el); } ag.insertAndRegisterSelf(); std::string tpAddr1, tpAddr2; @@ -92,7 +93,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) { re.fetch(); ASSERT_THROW(re.getArchiveQueueAddress("tapePool1"), cta::objectstore::RootEntry::NoSuchArchiveQueue); - tpAddr1 = re.addOrGetArchiveQueueAndCommit("tapePool1", ag); + tpAddr1 = re.addOrGetArchiveQueueAndCommit("tapePool1", agr); // Check that we car read it cta::objectstore::ArchiveQueue aq(tpAddr1, be); cta::objectstore::ScopedSharedLock aql(aq); @@ -103,7 +104,7 @@ TEST (ObjectStore, RootEntryArchiveQueues) { cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock lock(re); re.fetch(); - tpAddr2 = re.addOrGetArchiveQueueAndCommit("tapePool2", ag); + tpAddr2 = re.addOrGetArchiveQueueAndCommit("tapePool2", agr); ASSERT_TRUE(be.exists(tpAddr2)); } { @@ -138,15 +139,15 @@ TEST (ObjectStore, RootEntryDriveRegister) { } cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); - cta::objectstore::Agent ag(be); + cta::objectstore::AgentReference agr("UnitTests"); + cta::objectstore::Agent ag(agr.getAgentAddress(), be); ag.initialize(); - ag.generateName("UnitTests"); { // Try to create the root entry and allocate the agent register cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock rel(re); re.fetch(); - re.addOrGetAgentRegisterPointerAndCommit(ag, el); + re.addOrGetAgentRegisterPointerAndCommit(agr, el); } ag.insertAndRegisterSelf(); std::string driveRegisterAddress; @@ -158,7 +159,7 @@ TEST (ObjectStore, RootEntryDriveRegister) { ASSERT_THROW(re.getDriveRegisterAddress(), cta::objectstore::RootEntry::NotAllocated); ASSERT_NO_THROW( - driveRegisterAddress = re.addOrGetDriveRegisterPointerAndCommit(ag, el)); + driveRegisterAddress = re.addOrGetDriveRegisterPointerAndCommit(agr, el)); ASSERT_TRUE(be.exists(driveRegisterAddress)); } { @@ -192,8 +193,9 @@ TEST(ObjectStore, RootEntryAgentRegister) { } cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); - cta::objectstore::Agent ag(be); - ag.generateName("UnitTests"); + cta::objectstore::AgentReference agr("UnitTests"); + cta::objectstore::Agent ag(agr.getAgentAddress(), be); + ag.initialize(); std::string arAddr; { // Create the agent register @@ -202,7 +204,7 @@ TEST(ObjectStore, RootEntryAgentRegister) { re.fetch(); ASSERT_THROW(re.getAgentRegisterAddress(), cta::objectstore::RootEntry::NotAllocated); - arAddr = re.addOrGetAgentRegisterPointerAndCommit(ag, el); + arAddr = re.addOrGetAgentRegisterPointerAndCommit(agr, el); // Check that we car read it cta::objectstore::AgentRegister ar(arAddr, be); cta::objectstore::ScopedSharedLock arl(ar); @@ -215,7 +217,7 @@ TEST(ObjectStore, RootEntryAgentRegister) { re.fetch(); // Check that we still get the same agent register ASSERT_EQ(arAddr, re.getAgentRegisterAddress()); - ASSERT_EQ(arAddr, re.addOrGetAgentRegisterPointerAndCommit(ag, el)); + ASSERT_EQ(arAddr, re.addOrGetAgentRegisterPointerAndCommit(agr, el)); // Remove it ASSERT_NO_THROW(re.removeAgentRegisterAndCommit()); // Check that the object is gone @@ -239,15 +241,15 @@ TEST (ObjectStore, RootEntrySchedulerGlobalLock) { } cta::objectstore::EntryLogSerDeser el("user0", "unittesthost", time(NULL)); - cta::objectstore::Agent ag(be); + cta::objectstore::AgentReference agr("UnitTests"); + cta::objectstore::Agent ag(agr.getAgentAddress(), be); ag.initialize(); - ag.generateName("UnitTests"); { // Try to create the root entry and allocate the agent register cta::objectstore::RootEntry re(be); cta::objectstore::ScopedExclusiveLock rel(re); re.fetch(); - re.addOrGetAgentRegisterPointerAndCommit(ag, el); + re.addOrGetAgentRegisterPointerAndCommit(agr, el); } ag.insertAndRegisterSelf(); std::string schedulerGlobalLockAddress; @@ -259,7 +261,7 @@ TEST (ObjectStore, RootEntrySchedulerGlobalLock) { ASSERT_THROW(re.getDriveRegisterAddress(), cta::objectstore::RootEntry::NotAllocated); ASSERT_NO_THROW( - schedulerGlobalLockAddress = re.addOrGetSchedulerGlobalLockAndCommit(ag, el)); + schedulerGlobalLockAddress = re.addOrGetSchedulerGlobalLockAndCommit(agr, el)); ASSERT_TRUE(be.exists(schedulerGlobalLockAddress)); } { diff --git a/objectstore/makeMinimalVFS.cpp b/objectstore/makeMinimalVFS.cpp index 4c076c49f2..1537f6884d 100644 --- a/objectstore/makeMinimalVFS.cpp +++ b/objectstore/makeMinimalVFS.cpp @@ -26,6 +26,7 @@ #include "BackendFactory.hpp" #include "RootEntry.hpp" #include "Agent.hpp" +#include "AgentReference.hpp" #include <iostream> #include <stdexcept> @@ -50,16 +51,16 @@ int main(int argc, char ** argv) { re.insert(); cta::objectstore::ScopedExclusiveLock rel(re); re.fetch(); - cta::objectstore::Agent ag(*be); - ag.generateName("makeMinimalVFS"); + cta::objectstore::AgentReference agr("makeMinimalVFS"); + cta::objectstore::Agent ag(agr.getAgentAddress(), *be); ag.initialize(); cta::objectstore::EntryLogSerDeser el("user0", "systemhost", time(NULL)); - re.addOrGetAgentRegisterPointerAndCommit(ag,el); + re.addOrGetAgentRegisterPointerAndCommit(agr,el); rel.release(); ag.insertAndRegisterSelf(); rel.lock(re); - re.addOrGetDriveRegisterPointerAndCommit(ag, el); - re.addOrGetSchedulerGlobalLockAndCommit(ag,el); + re.addOrGetDriveRegisterPointerAndCommit(agr, el); + re.addOrGetSchedulerGlobalLockAndCommit(agr,el); rel.release(); std::cout << "New object store path: " << be->getParams()->toURL() << std::endl; } catch (std::exception & e) { diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 308770b39f..b378e602c4 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -47,26 +47,26 @@ namespace cta { using namespace objectstore; OStoreDB::OStoreDB(objectstore::Backend& be): - m_objectStore(be), m_agent(NULL) {} + m_objectStore(be) {} OStoreDB::~OStoreDB() throw() {} -void OStoreDB::setAgent(objectstore::Agent& agent) { - m_agent = & agent; - } +void OStoreDB::setAgentReference(objectstore::AgentReference *agentReference) { + m_agentReference = agentReference; +} -void OStoreDB::assertAgentSet() { - if (!m_agent) - throw AgentNotSet("In OStoreDB::assertAgentSet: Agent pointer not set"); +void OStoreDB::assertAgentAddressSet() { + if (!m_agentReference) + throw AgentNotSet("In OStoreDB::assertAgentSet: Agent address not set"); } std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> OStoreDB::getMountInfo() { //Allocate the getMountInfostructure to return. - assertAgentSet(); + assertAgentAddressSet(); std::unique_ptr<OStoreDB::TapeMountDecisionInfo> privateRet (new OStoreDB::TapeMountDecisionInfo( - m_objectStore, *m_agent)); + m_objectStore, *m_agentReference)); TapeMountDecisionInfo & tmdi=*privateRet; // Get all the tape pools and tapes with queues (potential mounts) objectstore::RootEntry re(m_objectStore); @@ -256,9 +256,9 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request, const cta::common::dataStructures::ArchiveFileQueueCriteria &criteria) { - assertAgentSet(); + assertAgentAddressSet(); // Construct the return value immediately - cta::objectstore::ArchiveRequest aReq(m_agent->nextId("ArchiveRequest"), m_objectStore); + cta::objectstore::ArchiveRequest aReq(m_agentReference->nextId("ArchiveRequest"), m_objectStore); aReq.initialize(); // Summarize all as an archiveFile cta::common::dataStructures::ArchiveFile aFile; @@ -287,7 +287,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: re.fetch(); std::list<cta::objectstore::ArchiveRequest::JobDump> jl; for (auto & copy:criteria.copyToPoolMap) { - std::string aqaddr = re.addOrGetArchiveQueueAndCommit(copy.second, *m_agent); + std::string aqaddr = re.addOrGetArchiveQueueAndCommit(copy.second, *m_agentReference); const uint32_t hardcodedRetriesWithinMount = 3; const uint32_t hardcodedTotalRetries = 6; aReq.addJob(copy.first, copy.second, aqaddr, hardcodedRetriesWithinMount, hardcodedTotalRetries); @@ -301,12 +301,13 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: } // We create the object here { - objectstore::ScopedExclusiveLock al(*m_agent); - m_agent->fetch(); - m_agent->addToOwnership(aReq.getAddressIfSet()); - m_agent->commit(); + objectstore::Agent ag(m_agentReference->getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock agl(ag); + ag.fetch(); + ag.addToOwnership(aReq.getAddressIfSet()); + ag.commit(); } - aReq.setOwner(m_agent->getAddressIfSet()); + aReq.setOwner(m_agentReference->getAgentAddress()); aReq.insert(); ScopedExclusiveLock arl(aReq); // We can now enqueue the requests @@ -347,10 +348,11 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common:: arl.release(); // And remove reference from the agent { - objectstore::ScopedExclusiveLock al(*m_agent); - m_agent->fetch(); - m_agent->removeFromOwnership(aReq.getAddressIfSet()); - m_agent->commit(); + objectstore::Agent ag(m_agentReference->getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock al(ag); + ag.fetch(); + ag.removeFromOwnership(aReq.getAddressIfSet()); + ag.commit(); } } @@ -378,10 +380,11 @@ void OStoreDB::deleteArchiveRequest(const std::string &diskInstanceName, // Upgrade the lock to an exclusive one. arl.release(); ScopedExclusiveLock arxl(ar); - objectstore::ScopedExclusiveLock al(*m_agent); - m_agent->fetch(); - m_agent->addToOwnership(ar.getAddressIfSet()); - m_agent->commit(); + objectstore::Agent ag(m_agentReference->getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock agl(ag); + ag.fetch(); + ag.addToOwnership(ar.getAddressIfSet()); + ag.commit(); ar.fetch(); ar.setAllJobsFailed(); for (auto j:ar.dumpJobs()) { @@ -395,11 +398,11 @@ void OStoreDB::deleteArchiveRequest(const std::string &diskInstanceName, aq2.removeJob(ar.getAddressIfSet()); aq2.commit(); } catch (...) {} - ar.setJobOwner(j.copyNb, m_agent->getAddressIfSet()); + ar.setJobOwner(j.copyNb, ag.getAddressIfSet()); } ar.remove(); - m_agent->removeFromOwnership(ar.getAddressIfSet()); - m_agent->commit(); + ag.removeFromOwnership(ar.getAddressIfSet()); + ag.commit(); // We found and deleted the job: return. return; } @@ -411,10 +414,10 @@ void OStoreDB::deleteArchiveRequest(const std::string &diskInstanceName, std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCancelation> OStoreDB::markArchiveRequestForDeletion(const common::dataStructures::SecurityIdentity& requester, uint64_t fileId) { - assertAgentSet(); + assertAgentAddressSet(); // Construct the return value immediately std::unique_ptr<cta::OStoreDB::ArchiveToFileRequestCancelation> - internalRet(new cta::OStoreDB::ArchiveToFileRequestCancelation(m_agent, m_objectStore)); + internalRet(new cta::OStoreDB::ArchiveToFileRequestCancelation(*m_agentReference, m_objectStore)); cta::objectstore::ArchiveRequest & ar = internalRet->m_request; cta::objectstore::ScopedExclusiveLock & atfrl = internalRet->m_lock; // Attempt to find the request @@ -436,10 +439,11 @@ std::unique_ptr<SchedulerDatabase::ArchiveToFileRequestCancelation> tar.fetch(); if (tar.getArchiveFile().archiveFileID == fileId) { // Point the agent to the request - ScopedExclusiveLock agl(*m_agent); - m_agent->fetch(); - m_agent->addToOwnership(arp->address); - m_agent->commit(); + cta::objectstore::Agent ag(m_agentReference->getAgentAddress(), m_objectStore); + ScopedExclusiveLock agl(ag); + ag.fetch(); + ag.addToOwnership(arp->address); + ag.commit(); agl.release(); // Mark all jobs are being pending NS deletion (for being deleted them selves) tatfrl.release(); @@ -473,21 +477,23 @@ void OStoreDB::ArchiveToFileRequestCancelation::complete() { throw ArchiveRequestAlreadyDeleted("OStoreDB::ArchiveToFileRequestCancelation::complete(): called twice"); // We just need to delete the object and forget it m_request.remove(); - objectstore::ScopedExclusiveLock al (*m_agent); - m_agent->fetch(); - m_agent->removeFromOwnership(m_request.getAddressIfSet()); - m_agent->commit(); + objectstore::Agent ag(m_agentReference.getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock al (ag); + ag.fetch(); + ag.removeFromOwnership(m_request.getAddressIfSet()); + ag.commit(); m_closed = true; } OStoreDB::ArchiveToFileRequestCancelation::~ArchiveToFileRequestCancelation() { if (!m_closed) { try { - m_request.garbageCollect(m_agent->getAddressIfSet()); - objectstore::ScopedExclusiveLock al (*m_agent); - m_agent->fetch(); - m_agent->removeFromOwnership(m_request.getAddressIfSet()); - m_agent->commit(); + objectstore::Agent ag(m_agentReference.getAgentAddress(), m_objectStore); + m_request.garbageCollect(ag.getAddressIfSet()); + objectstore::ScopedExclusiveLock al (ag); + ag.fetch(); + ag.removeFromOwnership(m_request.getAddressIfSet()); + ag.commit(); } catch (...) {} } } @@ -683,23 +689,24 @@ std::list<SchedulerDatabase::RetrieveQueueStatistics> OStoreDB::getRetrieveQueue void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rqst, const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::string &vid) { - assertAgentSet(); + assertAgentAddressSet(); // Check that the requested retrieve job (for the provided vid) exists. if (!std::count_if(criteria.archiveFile.tapeFiles.cbegin(), criteria.archiveFile.tapeFiles.end(), [vid](decltype(*criteria.archiveFile.tapeFiles.cbegin()) & tf){ return tf.second.vid == vid; })) throw RetrieveRequestHasNoCopies("In OStoreDB::queueRetrieve(): no tape file for requested vid."); // In order to post the job, construct it first in memory. - objectstore::RetrieveRequest rReq(m_agent->nextId("RetrieveToFileRequest"), m_objectStore); + objectstore::RetrieveRequest rReq(m_agentReference->nextId("RetrieveToFileRequest"), m_objectStore); rReq.initialize(); rReq.setSchedulerRequest(rqst); rReq.setRetrieveFileQueueCriteria(criteria); // Point to the request in the agent { - ScopedExclusiveLock agl(*m_agent); - m_agent->fetch(); - m_agent->addToOwnership(rReq.getAddressIfSet()); - m_agent->commit(); + objectstore::Agent ag(m_agentReference->getAgentAddress(), m_objectStore); + ScopedExclusiveLock agl(ag); + ag.fetch(); + ag.addToOwnership(rReq.getAddressIfSet()); + ag.commit(); } // Set an arbitrary copy number so we can serialize. Garbage collection we re-choose // the tape file number and override it in case of problem (and we will set it further). @@ -710,7 +717,7 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& RootEntry re(m_objectStore); ScopedExclusiveLock rel(re); re.fetch(); - auto rqAddr=re.addOrGetRetrieveQueueAndCommit(vid, *m_agent); + auto rqAddr=re.addOrGetRetrieveQueueAndCommit(vid, *m_agentReference); // Create the request. rel.release(); RetrieveQueue rq(rqAddr, m_objectStore); @@ -736,10 +743,11 @@ void OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveRequest& rrl.release(); // And relinquish ownership form agent { - ScopedExclusiveLock agl(*m_agent); - m_agent->fetch(); - m_agent->removeFromOwnership(rReq.getAddressIfSet()); - m_agent->commit(); + objectstore::Agent ag(m_agentReference->getAgentAddress(), m_objectStore); + ScopedExclusiveLock agl(ag); + ag.fetch(); + ag.removeFromOwnership(rReq.getAddressIfSet()); + ag.commit(); } } @@ -858,7 +866,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount> // Check we actually hold the scheduling lock // Set the drive status to up, and indicate which tape we use. std::unique_ptr<OStoreDB::ArchiveMount> privateRet( - new OStoreDB::ArchiveMount(m_objectStore, m_agent)); + new OStoreDB::ArchiveMount(m_objectStore, m_agentReference)); auto &am = *privateRet; // Check we hold the scheduling lock if (!m_lockTaken) @@ -901,8 +909,8 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount> } OStoreDB::TapeMountDecisionInfo::TapeMountDecisionInfo( - objectstore::Backend& os, objectstore::Agent& a): - m_lockTaken(false), m_objectStore(os), m_agent(a) {} + objectstore::Backend& os, objectstore::AgentReference& a): + m_lockTaken(false), m_objectStore(os), m_agentReference(a) {} std::unique_ptr<SchedulerDatabase::RetrieveMount> @@ -918,7 +926,7 @@ std::unique_ptr<SchedulerDatabase::RetrieveMount> // latest known state of the drive (and its absence of updating if needed) // Prepare the return value std::unique_ptr<OStoreDB::RetrieveMount> privateRet( - new OStoreDB::RetrieveMount(m_objectStore, m_agent)); + new OStoreDB::RetrieveMount(m_objectStore, m_agentReference)); auto &rm = *privateRet; // Check we hold the scheduling lock if (!m_lockTaken) @@ -969,8 +977,8 @@ OStoreDB::TapeMountDecisionInfo::~TapeMountDecisionInfo() { m_schedulerGlobalLock.reset(NULL); } -OStoreDB::ArchiveMount::ArchiveMount(objectstore::Backend& os, objectstore::Agent& a): - m_objectStore(os), m_agent(a) {} +OStoreDB::ArchiveMount::ArchiveMount(objectstore::Backend& os, objectstore::AgentReference& a): + m_objectStore(os), m_agentReference(a) {} const SchedulerDatabase::ArchiveMount::MountInfo& OStoreDB::ArchiveMount::getMountInfo() { return mountInfo; @@ -1024,7 +1032,7 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:: // Prepare the return value auto job=aq.dumpJobs().front(); std::unique_ptr<OStoreDB::ArchiveJob> privateRet(new OStoreDB::ArchiveJob( - job.address, m_objectStore, m_agent, *this)); + job.address, m_objectStore, m_agentReference, *this)); privateRet->tapeFile.copyNb = job.copyNb; objectstore::ScopedExclusiveLock arl; try { @@ -1045,13 +1053,14 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase:: } // Take ownership of the job // Add to ownership - objectstore::ScopedExclusiveLock al(m_agent); - m_agent.fetch(); - m_agent.addToOwnership(privateRet->m_archiveRequest.getAddressIfSet()); - m_agent.commit(); + objectstore::Agent ag(m_agentReference.getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock al(ag); + ag.fetch(); + ag.addToOwnership(privateRet->m_archiveRequest.getAddressIfSet()); + ag.commit(); al.release(); // Make the ownership official (for this job within the request) - privateRet->m_archiveRequest.setJobOwner(job.copyNb, m_agent.getAddressIfSet()); + privateRet->m_archiveRequest.setJobOwner(job.copyNb, ag.getAddressIfSet()); privateRet->m_archiveRequest.commit(); // Remove the job from the archive queue aq.removeJob(privateRet->m_archiveRequest.getAddressIfSet()); @@ -1158,11 +1167,11 @@ void OStoreDB::ArchiveMount::complete(time_t completionTime) { } OStoreDB::ArchiveJob::ArchiveJob(const std::string& jobAddress, - objectstore::Backend& os, objectstore::Agent& ag, ArchiveMount & am): m_jobOwned(false), - m_objectStore(os), m_agent(ag), m_archiveRequest(jobAddress, os), m_archiveMount(am) {} + objectstore::Backend& os, objectstore::AgentReference& ar, ArchiveMount & am): m_jobOwned(false), + m_objectStore(os), m_agentReference(ar), m_archiveRequest(jobAddress, os), m_archiveMount(am) {} -OStoreDB::RetrieveMount::RetrieveMount(objectstore::Backend& os, objectstore::Agent& a): - m_objectStore(os), m_agent(a) { } +OStoreDB::RetrieveMount::RetrieveMount(objectstore::Backend& os, objectstore::AgentReference& ar): + m_objectStore(os), m_agentReference(ar) { } const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo() { return mountInfo; @@ -1216,7 +1225,7 @@ auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase: // Prepare the return value auto job=rq.dumpJobs().front(); std::unique_ptr<OStoreDB::RetrieveJob> privateRet(new OStoreDB::RetrieveJob( - job.address, m_objectStore, m_agent, *this)); + job.address, m_objectStore, m_agentReference, *this)); privateRet->selectedCopyNb = job.copyNb; objectstore::ScopedExclusiveLock rrl; try { @@ -1236,13 +1245,14 @@ auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase: } // Take ownership of the job // Add to ownership - objectstore::ScopedExclusiveLock al(m_agent); - m_agent.fetch(); - m_agent.addToOwnership(privateRet->m_retrieveRequest.getAddressIfSet()); - m_agent.commit(); + objectstore::Agent ag(m_agentReference.getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock al(ag); + ag.fetch(); + ag.addToOwnership(privateRet->m_retrieveRequest.getAddressIfSet()); + ag.commit(); al.release(); // Make the ownership official - privateRet->m_retrieveRequest.setOwner(m_agent.getAddressIfSet()); + privateRet->m_retrieveRequest.setOwner(ag.getAddressIfSet()); privateRet->m_retrieveRequest.commit(); // Remove the job from the archive queue rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet()); @@ -1326,10 +1336,11 @@ void OStoreDB::ArchiveJob::fail() { // The job will not be retried. Either another jobs for the same request is // queued and keeps the request referenced or the request has been deleted. // In any case, we can forget it. - objectstore::ScopedExclusiveLock al(m_agent); - m_agent.fetch(); - m_agent.removeFromOwnership(m_archiveRequest.getAddressIfSet()); - m_agent.commit(); + objectstore::Agent ag(m_agentReference.getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock al(ag); + ag.fetch(); + ag.removeFromOwnership(m_archiveRequest.getAddressIfSet()); + ag.commit(); m_jobOwned = false; return; } @@ -1357,10 +1368,11 @@ void OStoreDB::ArchiveJob::fail() { m_archiveRequest.commit(); arl.release(); // We just have to remove the ownership from the agent and we're done. - objectstore::ScopedExclusiveLock al(m_agent); - m_agent.fetch(); - m_agent.removeFromOwnership(m_archiveRequest.getAddressIfSet()); - m_agent.commit(); + objectstore::Agent ag(m_agentReference.getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock al(ag); + ag.fetch(); + ag.removeFromOwnership(ag.getAddressIfSet()); + ag.commit(); m_jobOwned = false; return; } @@ -1390,10 +1402,11 @@ void OStoreDB::ArchiveJob::succeed() { // We no more own the job (which could be gone) m_jobOwned = false; // Remove ownership from agent - objectstore::ScopedExclusiveLock al(m_agent); - m_agent.fetch(); - m_agent.removeFromOwnership(atfrAddress); - m_agent.commit(); + objectstore::Agent ag(m_agentReference.getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock al(ag); + ag.fetch(); + ag.removeFromOwnership(atfrAddress); + ag.commit(); } OStoreDB::ArchiveJob::~ArchiveJob() { @@ -1401,20 +1414,21 @@ OStoreDB::ArchiveJob::~ArchiveJob() { // Return the job to the pot if we failed to handle it. objectstore::ScopedExclusiveLock atfrl(m_archiveRequest); m_archiveRequest.fetch(); - m_archiveRequest.garbageCollect(m_agent.getAddressIfSet()); + m_archiveRequest.garbageCollect(m_agentReference.getAgentAddress()); atfrl.release(); // Remove ownership from agent - objectstore::ScopedExclusiveLock al(m_agent); - m_agent.fetch(); - m_agent.removeFromOwnership(m_archiveRequest.getAddressIfSet()); - m_agent.commit(); + objectstore::Agent ag(m_agentReference.getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock al(ag); + ag.fetch(); + ag.removeFromOwnership(m_archiveRequest.getAddressIfSet()); + ag.commit(); } } OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress, - objectstore::Backend& os, objectstore::Agent& ag, + objectstore::Backend& os, objectstore::AgentReference& ar, OStoreDB::RetrieveMount& rm): m_jobOwned(false), - m_objectStore(os), m_agent(ag), m_retrieveRequest(jobAddress, os), + m_objectStore(os), m_agentReference(ar), m_retrieveRequest(jobAddress, os), m_retrieveMount(rm) { } void OStoreDB::RetrieveJob::fail() { @@ -1505,10 +1519,11 @@ void OStoreDB::RetrieveJob::succeed() { // We no more own the job (which could be gone) m_jobOwned = false; // Remove ownership form the agent - objectstore::ScopedExclusiveLock al(m_agent); - m_agent.fetch(); - m_agent.removeFromOwnership(rtfrAddress); - m_agent.commit(); + objectstore::Agent ag(m_agentReference.getAgentAddress(), m_objectStore); + objectstore::ScopedExclusiveLock al(ag); + ag.fetch(); + ag.removeFromOwnership(rtfrAddress); + ag.commit(); } diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index a0e50c2d88..ef775a680d 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -20,6 +20,7 @@ #include "scheduler/SchedulerDatabase.hpp" #include "objectstore/Agent.hpp" +#include "objectstore/AgentReference.hpp" #include "objectstore/ArchiveRequest.hpp" #include "objectstore/ArchiveRequest.hpp" #include "objectstore/DriveRegister.hpp" @@ -39,10 +40,10 @@ public: virtual ~OStoreDB() throw(); /* === Object store and agent handling ==================================== */ - void setAgent(objectstore::Agent &agent); + void setAgentReference(objectstore::AgentReference *agentReference); CTA_GENERATE_EXCEPTION_CLASS(AgentNotSet); private: - void assertAgentSet(); + void assertAgentAddressSet(); public: CTA_GENERATE_EXCEPTION_CLASS(NotImplemented); @@ -64,12 +65,12 @@ public: time_t startTime) override; virtual ~TapeMountDecisionInfo(); private: - TapeMountDecisionInfo (objectstore::Backend &, objectstore::Agent &); + TapeMountDecisionInfo (objectstore::Backend &, objectstore::AgentReference &); bool m_lockTaken; objectstore::ScopedExclusiveLock m_lockOnSchedulerGlobalLock; std::unique_ptr<objectstore::SchedulerGlobalLock> m_schedulerGlobalLock; objectstore::Backend & m_objectStore; - objectstore::Agent & m_agent; + objectstore::AgentReference & m_agentReference; }; std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> getMountInfo() override; @@ -78,9 +79,9 @@ public: class ArchiveMount: public SchedulerDatabase::ArchiveMount { friend class TapeMountDecisionInfo; private: - ArchiveMount(objectstore::Backend &, objectstore::Agent &); + ArchiveMount(objectstore::Backend &, objectstore::AgentReference &); objectstore::Backend & m_objectStore; - objectstore::Agent & m_agent; + objectstore::AgentReference & m_agentReference; public: CTA_GENERATE_EXCEPTION_CLASS(MaxFSeqNotGoingUp); const MountInfo & getMountInfo() override; @@ -101,12 +102,12 @@ public: ~ArchiveJob() override; private: ArchiveJob(const std::string &, objectstore::Backend &, - objectstore::Agent &, ArchiveMount &); + objectstore::AgentReference &, ArchiveMount &); bool m_jobOwned; uint64_t m_mountId; std::string m_tapePool; objectstore::Backend & m_objectStore; - objectstore::Agent & m_agent; + objectstore::AgentReference & m_agentReference; objectstore::ArchiveRequest m_archiveRequest; ArchiveMount & m_archiveMount; }; @@ -115,9 +116,9 @@ public: class RetrieveMount: public SchedulerDatabase::RetrieveMount { friend class TapeMountDecisionInfo; private: - RetrieveMount(objectstore::Backend &, objectstore::Agent &); + RetrieveMount(objectstore::Backend &, objectstore::AgentReference &); objectstore::Backend & m_objectStore; - objectstore::Agent & m_agent; + objectstore::AgentReference & m_agentReference; public: const MountInfo & getMountInfo() override; std::unique_ptr<RetrieveJob> getNextJob() override; @@ -136,11 +137,11 @@ public: virtual ~RetrieveJob() override; private: RetrieveJob(const std::string &, objectstore::Backend &, - objectstore::Agent &, RetrieveMount &); + objectstore::AgentReference &, RetrieveMount &); bool m_jobOwned; uint64_t m_mountId; objectstore::Backend & m_objectStore; - objectstore::Agent & m_agent; + objectstore::AgentReference & m_agentReference; objectstore::RetrieveRequest m_retrieveRequest; RetrieveMount & m_retrieveMount; }; @@ -159,16 +160,16 @@ public: class ArchiveToFileRequestCancelation: public SchedulerDatabase::ArchiveToFileRequestCancelation { public: - ArchiveToFileRequestCancelation(objectstore::Agent * agent, + ArchiveToFileRequestCancelation(objectstore::AgentReference & agentReference, objectstore::Backend & be): m_request(be), m_lock(), m_objectStore(be), - m_agent(agent), m_closed(false) {} + m_agentReference(agentReference), m_closed(false) {} virtual ~ArchiveToFileRequestCancelation(); void complete() override; private: objectstore::ArchiveRequest m_request; objectstore::ScopedExclusiveLock m_lock; objectstore::Backend & m_objectStore; - objectstore::Agent * m_agent; + objectstore::AgentReference &m_agentReference; bool m_closed; friend class OStoreDB; }; @@ -205,7 +206,7 @@ public: std::list<cta::common::DriveState> getDriveStates() const override; private: objectstore::Backend & m_objectStore; - objectstore::Agent * m_agent; + objectstore::AgentReference *m_agentReference = nullptr; }; } diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index fff9a29a2e..f0a7c6c062 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -26,6 +26,7 @@ #include "scheduler/RetrieveRequestDump.hpp" #include "objectstore/RootEntry.hpp" #include "objectstore/Agent.hpp" +#include "objectstore/AgentReference.hpp" #include "objectstore/BackendVFS.hpp" #include "objectstore/BackendRados.hpp" #include "objectstore/BackendFactory.hpp" @@ -119,38 +120,38 @@ public: private: std::unique_ptr <cta::objectstore::Backend> m_backend; cta::OStoreDB m_OStoreDB; - objectstore::Agent m_agent; + objectstore::AgentReference m_agentReference; }; template <> OStoreDBWrapper<cta::objectstore::BackendVFS>::OStoreDBWrapper( const std::string &context, const std::string &URL) : m_backend(new cta::objectstore::BackendVFS()), -m_OStoreDB(*m_backend), m_agent(*m_backend) { +m_OStoreDB(*m_backend), m_agentReference("OStoreDBFactory") { // We need to populate the root entry before using. objectstore::RootEntry re(*m_backend); re.initialize(); re.insert(); objectstore::ScopedExclusiveLock rel(re); re.fetch(); - m_agent.generateName("OStoreDBFactory"); - m_agent.initialize(); + objectstore::Agent agent(m_agentReference.getAgentAddress(), *m_backend); + agent.initialize(); objectstore::EntryLogSerDeser cl("user0", "systemhost", time(NULL)); - re.addOrGetAgentRegisterPointerAndCommit(m_agent, cl); + re.addOrGetAgentRegisterPointerAndCommit(m_agentReference, cl); rel.release(); - m_agent.insertAndRegisterSelf(); + agent.insertAndRegisterSelf(); rel.lock(re); - re.addOrGetDriveRegisterPointerAndCommit(m_agent, cl); - re.addOrGetSchedulerGlobalLockAndCommit(m_agent, cl); + re.addOrGetDriveRegisterPointerAndCommit(m_agentReference, cl); + re.addOrGetSchedulerGlobalLockAndCommit(m_agentReference, cl); rel.release(); - m_OStoreDB.setAgent(m_agent); + m_OStoreDB.setAgentReference(&m_agentReference); } template <> OStoreDBWrapper<cta::objectstore::BackendRados>::OStoreDBWrapper( const std::string &context, const std::string &URL) : m_backend(cta::objectstore::BackendFactory::createBackend(URL).release()), -m_OStoreDB(*m_backend), m_agent(*m_backend) { +m_OStoreDB(*m_backend), m_agentReference("OStoreDBFactory") { // We need to first clean up possible left overs in the pool auto l = m_backend->list(); for (auto o=l.begin(); o!=l.end(); o++) { @@ -164,17 +165,17 @@ m_OStoreDB(*m_backend), m_agent(*m_backend) { re.insert(); objectstore::ScopedExclusiveLock rel(re); re.fetch(); - m_agent.generateName("OStoreDBFactory"); - m_agent.initialize(); + objectstore::Agent agent(m_agentReference.getAgentAddress(), *m_backend); + agent.initialize(); objectstore::EntryLogSerDeser cl("user0", "systemhost", time(NULL)); - re.addOrGetAgentRegisterPointerAndCommit(m_agent, cl); + re.addOrGetAgentRegisterPointerAndCommit(m_agentReference, cl); rel.release(); - m_agent.insertAndRegisterSelf(); + agent.insertAndRegisterSelf(); rel.lock(re); - re.addOrGetDriveRegisterPointerAndCommit(m_agent, cl); - re.addOrGetSchedulerGlobalLockAndCommit(m_agent, cl); + re.addOrGetDriveRegisterPointerAndCommit(m_agentReference, cl); + re.addOrGetSchedulerGlobalLockAndCommit(m_agentReference, cl); rel.release(); - m_OStoreDB.setAgent(m_agent); + m_OStoreDB.setAgentReference(&m_agentReference); } } diff --git a/scheduler/OStoreDB/OStoreDBWithAgent.cpp b/scheduler/OStoreDB/OStoreDBWithAgent.cpp index ff900d2af3..de65c0bebc 100644 --- a/scheduler/OStoreDB/OStoreDBWithAgent.cpp +++ b/scheduler/OStoreDB/OStoreDBWithAgent.cpp @@ -21,13 +21,13 @@ //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ -cta::OStoreDBWithAgent::OStoreDBWithAgent(cta::objectstore::Backend & be, cta::objectstore::Agent & ag): cta::OStoreDB(be) { - cta::OStoreDB::setAgent(ag); +cta::OStoreDBWithAgent::OStoreDBWithAgent(cta::objectstore::Backend & be, cta::objectstore::AgentReference & ar): cta::OStoreDB(be) { + cta::OStoreDB::setAgentReference(&ar); } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ cta::OStoreDBWithAgent::~OStoreDBWithAgent() throw () { - cta::OStoreDB::setAgent(*((cta::objectstore::Agent *)NULL)); + cta::OStoreDB::setAgentReference(nullptr); } \ No newline at end of file diff --git a/scheduler/OStoreDB/OStoreDBWithAgent.hpp b/scheduler/OStoreDB/OStoreDBWithAgent.hpp index 7ce0fa8664..411d565629 100644 --- a/scheduler/OStoreDB/OStoreDBWithAgent.hpp +++ b/scheduler/OStoreDB/OStoreDBWithAgent.hpp @@ -33,7 +33,7 @@ public: * @param be The objectstore backend * @param ag The agent */ - OStoreDBWithAgent(cta::objectstore::Backend & be, cta::objectstore::Agent & ag); + OStoreDBWithAgent(cta::objectstore::Backend & be, cta::objectstore::AgentReference & ar); /** * Destructor diff --git a/tapeserver/castor/tape/tapeserver/daemon/ProcessForker.cpp b/tapeserver/castor/tape/tapeserver/daemon/ProcessForker.cpp index 26225cb7f2..45fafdbec1 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/ProcessForker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/ProcessForker.cpp @@ -568,7 +568,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction dynamic_cast<cta::objectstore::BackendVFS &>(*backend).noDeleteOnExit(); } catch (std::bad_cast &){} cta::objectstore::BackendPopulator backendPopulator(*backend); - cta::OStoreDBWithAgent osdb(*backend, backendPopulator.getAgent()); + cta::OStoreDBWithAgent osdb(*backend, backendPopulator.getAgentReference()); const cta::rdbms::Login catalogueLogin = cta::rdbms::Login::parseFile("/etc/cta/cta_catalogue_db.conf"); const uint64_t nbConns = 1; std::unique_ptr<cta::catalogue::Catalogue> catalogue(cta::catalogue::CatalogueFactory::create(catalogueLogin, nbConns)); diff --git a/xroot_plugins/XrdCtaFilesystem.cpp b/xroot_plugins/XrdCtaFilesystem.cpp index 9d654616a6..8d1ad832e3 100644 --- a/xroot_plugins/XrdCtaFilesystem.cpp +++ b/xroot_plugins/XrdCtaFilesystem.cpp @@ -258,7 +258,7 @@ XrdCtaFilesystem::XrdCtaFilesystem(): m_ctaConf("/etc/cta/cta-frontend.conf"), m_backend(cta::objectstore::BackendFactory::createBackend(m_ctaConf.getConfEntString("ObjectStore", "BackendPath", nullptr)).release()), m_backendPopulator(*m_backend), - m_scheddb(*m_backend, m_backendPopulator.getAgent()) { + m_scheddb(*m_backend, m_backendPopulator.getAgentReference()) { using namespace cta; // Try to instantiate the logging system API -- GitLab