diff --git a/scheduler/OStoreDB/OStoreDBFactory.hpp b/scheduler/OStoreDB/OStoreDBFactory.hpp index 38dff789d6f36c8d7dcf42cc281ed57b37fbb494..2f488e51c39edcf940eb8abd9af2088a60b46780 100644 --- a/scheduler/OStoreDB/OStoreDBFactory.hpp +++ b/scheduler/OStoreDB/OStoreDBFactory.hpp @@ -50,6 +50,9 @@ public: virtual objectstore::Backend & getBackend() = 0; virtual objectstore::AgentReference & getAgentReference() = 0; virtual cta::OStoreDB & getOstoreDB() = 0; + + //! Create a new agent to allow tests to continue after garbage collection + virtual void replaceAgent(objectstore::AgentReference & agentReference) = 0; }; } @@ -72,6 +75,25 @@ public: objectstore::Backend& getBackend() override { return *m_backend; } objectstore::AgentReference& getAgentReference() override { return m_agentReference; } + + void replaceAgent(objectstore::AgentReference & agentReference) override { + objectstore::RootEntry re(*m_backend); + objectstore::ScopedExclusiveLock rel(re); + re.fetch(); + objectstore::Agent agent(agentReference.getAgentAddress(), *m_backend); + agent.initialize(); + objectstore::EntryLogSerDeser cl("user0", "systemhost", time(NULL)); + log::LogContext lc(*m_logger); + re.addOrGetAgentRegisterPointerAndCommit(agentReference, cl, lc); + rel.release(); + agent.insertAndRegisterSelf(lc); + rel.lock(re); + re.fetch(); + re.addOrGetDriveRegisterPointerAndCommit(agentReference, cl); + re.addOrGetSchedulerGlobalLockAndCommit(agentReference, cl); + rel.release(); + m_OStoreDB.setAgentReference(&agentReference); + } cta::OStoreDB& getOstoreDB() override { return m_OStoreDB; } diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 1e26d7b04750c3eebc94e277575f4e83fc701e6d..779253f39868401a9db68ac2ebfee4696e565ba7 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -111,8 +111,6 @@ public: // We know the cast will not fail, so we can safely do it (otherwise we could leak memory) m_db.reset(dynamic_cast<cta::objectstore::OStoreDBWrapperInterface*>(osdb.release())); - //const SchedulerTestParam ¶m = GetParam(); - //m_db = param.dbFactory.create(); const uint64_t nbConns = 1; const uint64_t nbArchiveFileListingConns = 1; const uint32_t maxTriesToConnect = 1; @@ -134,7 +132,7 @@ public: } return *ptr; } - + cta::Scheduler &getScheduler() { cta::Scheduler *const ptr = m_scheduler.get(); if(NULL == ptr) { @@ -711,30 +709,34 @@ TEST_P(SchedulerTest, archive_and_retrieve_failure) { ASSERT_EQ(0, scheduler.getNextArchiveJobsToReportBatch(10, lc).size()); } - { - cta::common::dataStructures::EntryLog creationLog; - creationLog.host="host2"; - creationLog.time=0; - creationLog.username="admin1"; - cta::common::dataStructures::DiskFileInfo diskFileInfo; - diskFileInfo.recoveryBlob="blob"; - diskFileInfo.group="group2"; - diskFileInfo.owner="cms_user"; - diskFileInfo.path="path/to/file"; - cta::common::dataStructures::RetrieveRequest request; - request.archiveFileID = archiveFileId; - request.creationLog = creationLog; - request.diskFileInfo = diskFileInfo; - request.dstURL = "dstURL"; - request.requester.name = s_userName; - request.requester.group = "userGroup"; - scheduler.queueRetrieve("disk_instance", request, lc); - scheduler.waitSchedulerDbSubthreadsComplete(); - } + // Create a new agent to replace the stale agent reference in the DB + objectstore::AgentReference agentRef("OStoreDBFactory2", dl); // Try mounting the tape twice for(int j = 0; j < 2; ++j) { std::cerr << "Pass " << j << std::endl; + Scheduler scheduler(getCatalogue(), getSchedulerDB(), 5, 2*1000*1000); + + { + cta::common::dataStructures::EntryLog creationLog; + creationLog.host="host2"; + creationLog.time=0; + creationLog.username="admin1"; + cta::common::dataStructures::DiskFileInfo diskFileInfo; + diskFileInfo.recoveryBlob="blob"; + diskFileInfo.group="group2"; + diskFileInfo.owner="cms_user"; + diskFileInfo.path="path/to/file"; + cta::common::dataStructures::RetrieveRequest request; + request.archiveFileID = archiveFileId; + request.creationLog = creationLog; + request.diskFileInfo = diskFileInfo; + request.dstURL = "dstURL"; + request.requester.name = s_userName; + request.requester.group = "userGroup"; + scheduler.queueRetrieve("disk_instance", request, lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + } // Check that the retrieve request is queued { @@ -742,7 +744,7 @@ std::cerr << "Pass " << j << std::endl; // We expect 1 tape with queued jobs ASSERT_EQ(1, rqsts.size()); // We expect the queue to contain 1 job - ASSERT_EQ(1, rqsts.cbegin()->second.size()); + ASSERT_EQ(j+1, rqsts.cbegin()->second.size()); // We expect the job to be single copy auto & job = rqsts.cbegin()->second.back(); ASSERT_EQ(1, job.tapeCopies.size()); @@ -759,7 +761,7 @@ std::cerr << "Pass " << j << std::endl; auto vid = rqsts.begin()->second.back().tapeCopies.begin()->first; auto rqsts_vid = scheduler.getPendingRetrieveJobs(vid, lc); // same tests as above - ASSERT_EQ(1, rqsts_vid.size()); + ASSERT_EQ(j+1, rqsts_vid.size()); auto &job_vid = rqsts_vid.back(); ASSERT_EQ(1, job_vid.tapeCopies.size()); ASSERT_TRUE(s_vid == job_vid.tapeCopies.cbegin()->first); @@ -810,12 +812,17 @@ std::cerr << "Attempt " << i << std::endl; cta::objectstore::GarbageCollector gc(getDb().getBackend(), gcAgentRef, catalogue); gc.runOnePass(lc); } - } - } // end for + // Assign a new agent to replace the stale agent reference in the DB + getDb().replaceAgent(agentRef); + } // end of retries + } // end of pass - // and the failure should be reported on the jobs to report queue - auto retrieveJobToReportList = scheduler.getNextRetrieveJobsToReportBatch(1,lc); - ASSERT_EQ(1, retrieveJobToReportList.size()); + { + Scheduler &scheduler = getScheduler(); + // and the failure should be reported on the jobs to report queue + auto retrieveJobToReportList = scheduler.getNextRetrieveJobsToReportBatch(1,lc); + ASSERT_EQ(1, retrieveJobToReportList.size()); + } } TEST_P(SchedulerTest, retry_archive_until_max_reached) {