diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index c7b05915d13cb6840c90878ba28c4815c734649e..d8dcde3c260f37c7b08020a37d9f244c2ea73088 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -336,7 +336,9 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro auto vidToTapeMap = m_catalogue.getTapesByVid({rqp.vid}); if(vidToTapeMap.at(rqp.vid).disabled){ //Check if there are Repack Retrieve requests with forceDisabledTape flag in the queue - for(auto &job: rqueue.dumpJobs()){ + auto retrieveQueueJobs = rqueue.dumpJobs(); + uint64_t nbJobsNotExistInQueue = 0; + for(auto &job: retrieveQueueJobs){ cta::objectstore::RetrieveRequest rr(job.address,this->m_objectStore); try{ rr.fetchNoLock(); @@ -349,8 +351,15 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro } catch(const cta::objectstore::Backend::NoSuchObject & ex){ //In the case of a repack cancellation, the RetrieveRequest object is deleted, so we just ignore the exception //it will not be a potential mount. + nbJobsNotExistInQueue++; } } + if(!isPotentialMount && nbJobsNotExistInQueue == retrieveQueueJobs.size()){ + //The tape is disabled, there are only jobs that have been deleted, it is a potential mount as we want to flush the queue. + //If there is at least one job that is in the queue, and is not a repack with the --disabledtape flag, + //the jobs have to stay in the queue as long as the tape is disabled. + isPotentialMount = true; + } } else { isPotentialMount = true; } diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 7c357562ae8090f53e93b46fa512efd511c93c9c..057d92056fb4a8b0b3f1aab19cfc2329dce6db0f 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -3100,6 +3100,145 @@ TEST_P(SchedulerTest, noMountIsTriggeredWhenTapeIsDisabled) { ASSERT_EQ(11,jobBatch.size()); //1 user job + 10 Repack jobs = 11 jobs in the batch } +TEST_P(SchedulerTest, emptyMountIsTriggeredWhenCancelledRetrieveRequest) { + using namespace cta; + using namespace cta::objectstore; + unitTests::TempDirectory tempDirectory; + auto &catalogue = getCatalogue(); + auto &scheduler = getScheduler(); + auto &schedulerDB = getSchedulerDB(); + + cta::objectstore::Backend& backend = schedulerDB.getBackend(); + setupDefaultCatalogue(); +#ifdef STDOUT_LOGGING + log::StdoutLogger dl("dummy", "unitTest"); +#else + log::DummyLogger dl("", ""); +#endif + log::LogContext lc(dl); + + //Create an agent to represent this test process + cta::objectstore::AgentReference agentReference("expandRepackRequestTest", dl); + cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + + const bool disabledValue = false; + const bool fullValue = true; + const bool readOnlyValue = false; + const std::string comment = "Create tape"; + cta::common::dataStructures::SecurityIdentity admin; + admin.username = "admin_user_name"; + admin.host = "admin_host"; + + //Create a logical library in the catalogue + const bool logicalLibraryIsDisabled = false; + catalogue.createLogicalLibrary(admin, s_libraryName, logicalLibraryIsDisabled, "Create logical library"); + + std::ostringstream ossVid; + ossVid << s_vid << "_" << 1; + std::string vid = ossVid.str(); + + { + catalogue::CreateTapeAttributes tape; + tape.vid = vid; + tape.mediaType = s_mediaType; + tape.vendor = s_vendor; + tape.logicalLibraryName = s_libraryName; + tape.tapePoolName = s_tapePoolName; + tape.full = fullValue; + tape.disabled = disabledValue; + tape.readOnly = readOnlyValue; + tape.comment = comment; + catalogue.createTape(s_adminOnAdminHost, tape); + } + + //Create a storage class in the catalogue + common::dataStructures::StorageClass storageClass; + storageClass.name = s_storageClassName; + storageClass.nbCopies = 2; + storageClass.comment = "Create storage class"; + + const std::string tapeDrive = "tape_drive"; + const uint64_t nbArchiveFilesPerTape = 10; + const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000; + + //Simulate the writing of 10 files in 1 tape in the catalogue + std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1; + { + uint64_t archiveFileId = 1; + std::string currentVid = vid; + for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) { + std::ostringstream diskFileId; + diskFileId << (12345677 + archiveFileId); + std::ostringstream diskFilePath; + diskFilePath << "/public_dir/public_file_"<<1<<"_"<< j; + auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>(); + auto & fileWritten = *fileWrittenUP; + fileWritten.archiveFileId = archiveFileId++; + fileWritten.diskInstance = s_diskInstance; + fileWritten.diskFileId = diskFileId.str(); + + fileWritten.diskFileOwnerUid = PUBLIC_OWNER_UID; + fileWritten.diskFileGid = PUBLIC_GID; + fileWritten.size = archiveFileSize; + fileWritten.checksumBlob.insert(cta::checksum::ADLER32,"1234"); + fileWritten.storageClassName = s_storageClassName; + fileWritten.vid = currentVid; + fileWritten.fSeq = j; + fileWritten.blockId = j * 100; + fileWritten.size = archiveFileSize; + fileWritten.copyNb = 1; + fileWritten.tapeDrive = tapeDrive; + tapeFilesWrittenCopy1.emplace(fileWrittenUP.release()); + } + //update the DB tape + catalogue.filesWrittenToTape(tapeFilesWrittenCopy1); + tapeFilesWrittenCopy1.clear(); + } + //Test the queueing of the Retrieve Request and try to mount after having disabled the tape + scheduler.waitSchedulerDbSubthreadsComplete(); + uint64_t archiveFileId = 1; + std::string dstUrl = "dst_url"; + std::string diskInstance="disk_instance"; + { + cta::common::dataStructures::RetrieveRequest rReq; + rReq.archiveFileID=1; + rReq.requester.name = s_userName; + rReq.requester.group = "someGroup"; + rReq.dstURL = dstUrl; + scheduler.queueRetrieve(diskInstance, rReq, lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + } + //disabled the tape + catalogue.setTapeDisabled(admin,vid,true); + + //No mount should be returned by getNextMount + ASSERT_EQ(nullptr,scheduler.getNextMount(s_libraryName, "drive0", lc)); + + //abort the retrieve request + { + //Get the only retrieve job in the queue + cta::objectstore::RootEntry re(backend); + re.fetchNoLock(); + std::string retrieveQueueAddr = re.getRetrieveQueueAddress(vid,JobQueueType::JobsToTransferForUser); + cta::objectstore::RetrieveQueue retrieveQueue(retrieveQueueAddr,backend); + retrieveQueue.fetchNoLock(); + std::string retrieveRequestAddr = retrieveQueue.dumpJobs().front().address; + common::dataStructures::CancelRetrieveRequest crr; + crr.retrieveRequestId = retrieveRequestAddr; + crr.archiveFileID = archiveFileId; + crr.dstURL = dstUrl; + scheduler.abortRetrieve(diskInstance,crr,lc); + } + + //A mount should be returned by getNextMount + auto retrieveMount = scheduler.getNextMount(s_libraryName,"drive0",lc); + ASSERT_NE(nullptr,retrieveMount); + +} + TEST_P(SchedulerTest, archiveReportMultipleAndQueueRetrievesWithActivities) { using namespace cta;