diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index 3860f8600d2b3cd3e0d827566cc0cad28f274627..9fcbb39061d40996e6a125c92a7debae083f4d8e 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -43,6 +43,7 @@ struct RetrieveQueue; struct RetrieveQueueToTransfer; struct RetrieveQueueToReport; struct RetrieveQueueFailed; +struct RetrieveQueueToReportToRepackForSuccess; struct RepackQueue; struct RepackQueuePending; struct RepackQueueToExpand; @@ -58,6 +59,7 @@ class ObjectOpsBase { friend ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>; friend ContainerTraits<RetrieveQueue,RetrieveQueueToReport>; friend ContainerTraits<RetrieveQueue,RetrieveQueueFailed>; + friend ContainerTraits<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess>; friend ContainerTraits<RepackQueue,RepackQueuePending>; friend ContainerTraits<RepackQueue,RepackQueueToExpand>; protected: diff --git a/objectstore/RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp b/objectstore/RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp index 207f113e94f947b7584903ec8c99b4d4554cf7b6..56a796b201a2eef63306f9a1610100241eb2908d 100644 --- a/objectstore/RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp +++ b/objectstore/RetrieveQueueToReportToRepackForSuccessAlgorithms.cpp @@ -23,4 +23,26 @@ namespace cta { namespace objectstore { template<> const std::string ContainerTraits<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess>::c_identifierType = "tapeVid"; -}} \ No newline at end of file + + template<> + auto ContainerTraits<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess>:: + getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, ElementsToSkipSet &elemtsToSkip, + log::LogContext &lc) -> PoppedElementsBatch + { + PoppedElementsBatch ret; + + auto candidateJobsFromQueue = cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files, elemtsToSkip); + for(auto &cjfq : candidateJobsFromQueue.candidates) { + ret.elements.emplace_back(PoppedElement{ + cta::make_unique<RetrieveRequest>(cjfq.address, cont.m_objectStore), + cjfq.copyNb, + cjfq.size, + common::dataStructures::ArchiveFile(), + common::dataStructures::RetrieveRequest() + }); + ret.summary.files++; + } + return ret; + } +} +} \ No newline at end of file diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index bfd518e9b01ab243189fb55259a0f7e9174b2d47..b49d7df4bd8835cf06e5bd55499a0c5984ecae76 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -3158,7 +3158,31 @@ void OStoreDB::RetrieveJob::checkReportSucceedForRepack(){ std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> OStoreDB::getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) { std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> ret; - //TODO : Use Algorithms to retrieve the jobs from the RetrieveQueueToReportToRepackForSuccess - return ret; + typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess> Carqtrtrfs; + Carqtrtrfs algo(this->m_objectStore, *m_agentReference); + // Decide from which queue we are going to pop. + RootEntry re(m_objectStore); + re.fetchNoLock(); + while(true) { + auto queueList = re.dumpRetrieveQueues(JobQueueType::JobsToReportToRepackForSuccess); + if (queueList.empty()) return ret; + + // Try to get jobs from the first queue. If it is empty, it will be trimmed, so we can go for another round. + Carqtrtrfs::PopCriteria criteria; + criteria.files = filesRequested; + auto jobs = algo.popNextBatch(queueList.front().vid, criteria, lc); + if(jobs.elements.empty()) continue; + for(auto &j : jobs.elements) + { + std::unique_ptr<OStoreDB::RetrieveJob> rj(new OStoreDB::RetrieveJob(j.retrieveRequest->getAddressIfSet(), *this, nullptr)); + rj->archiveFile = j.archiveFile; + rj->retrieveRequest = j.rr; + rj->selectedCopyNb = j.copyNb; + rj->setJobOwned(); + ret.emplace_back(std::move(rj)); + } + return ret; + } } + } // namespace cta diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index fc7d80d3b60327ae7ba077b2c630dafd481bda96..5ef120f6e5007512c7d047209c5351561d41b6bf 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -1233,7 +1233,8 @@ TEST_P(SchedulerTest, expandRepackRequest) { //Now, we need to simulate a retrieve for each file { // Emulate a tape server by asking for nbTapesForTest mount and then all files - uint64_t archiveFileId = 1; + uint64_t archiveFileId1 = 1; + uint64_t archiveFileId2 = 1; for(uint64_t i = 1; i<= nbTapesForTest ;++i){ std::unique_ptr<cta::TapeMount> mount; mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); @@ -1273,67 +1274,104 @@ TEST_P(SchedulerTest, expandRepackRequest) { //After the jobs reported as completed, we will test that all jobs have been put in //the RetrieveQueueToReportToRepackForSuccess and that they have the status RJS_Succeeded - - cta::objectstore::RootEntry re(schedulerDB.getBackend()); - cta::objectstore::ScopedExclusiveLock sel(re); - re.fetch(); - - //Get the retrieveQueueToReportToRepackForSuccess - std::string retrieveQueueToReportToRepackForSuccessAddress = re.getRetrieveQueueAddress(allVid.at(i-1),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); - cta::objectstore::RetrieveQueue rq(retrieveQueueToReportToRepackForSuccessAddress,schedulerDB.getBackend()); - - //Fetch the queue so that we can get the retrieveRequests from it - cta::objectstore::ScopedExclusiveLock rql(rq); - rq.fetch(); - - //There should be nbArchiveFiles jobs in the retrieve queue - ASSERT_EQ(rq.dumpJobs().size(),nbArchiveFiles); - - int j = 1; - for (auto &job: rq.dumpJobs()) { - //Create the retrieve request from the address of the job and the current backend - cta::objectstore::RetrieveRequest retrieveRequest(job.address,schedulerDB.getBackend()); - retrieveRequest.fetchNoLock(); - uint64_t copyNb = job.copyNb; - common::dataStructures::TapeFile tapeFile = retrieveRequest.getArchiveFile().tapeFiles[copyNb]; - common::dataStructures::RetrieveRequest schedulerRetrieveRequest = retrieveRequest.getSchedulerRequest(); - common::dataStructures::ArchiveFile archiveFile = retrieveRequest.getArchiveFile(); - - //Testing tape file - ASSERT_EQ(tapeFile.vid,allVid.at(i-1)); - ASSERT_EQ(tapeFile.blockId,j * 100); - ASSERT_EQ(tapeFile.fSeq,j); - ASSERT_EQ(tapeFile.checksumType, checksumType); - ASSERT_EQ(tapeFile.checksumValue,checksumValue); - ASSERT_EQ(tapeFile.compressedSize, compressedFileSize); - - //Testing scheduler retrieve request - ASSERT_EQ(schedulerRetrieveRequest.archiveFileID,archiveFileId++); - std::stringstream ss; - ss<<"repack://public_dir/public_file_"<<i<<"_"<<j; - ASSERT_EQ(schedulerRetrieveRequest.dstURL,ss.str()); - ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); - std::ostringstream diskFilePath; - diskFilePath << "/public_dir/public_file_"<<i<<"_"<<j; - ASSERT_EQ(schedulerRetrieveRequest.diskFileInfo.path,diskFilePath.str()); - //Testing the retrieve request - ASSERT_EQ(retrieveRequest.isRepack(),true); - ASSERT_EQ(retrieveRequest.getQueueType(),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); - ASSERT_EQ(retrieveRequest.getRetrieveFileQueueCriteria().mountPolicy,cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); - ASSERT_EQ(retrieveRequest.getActiveCopyNumber(),1); - ASSERT_EQ(retrieveRequest.getJobStatus(job.copyNb),cta::objectstore::serializers::RetrieveJobStatus::RJS_Succeeded); - ASSERT_EQ(retrieveRequest.getJobs().size(),1); - - //Testing the archive file associated to the retrieve request - ASSERT_EQ(archiveFile.storageClass,storageClass.name); - ASSERT_EQ(archiveFile.diskInstance,storageClass.diskInstance); - ++j; + { + cta::objectstore::RootEntry re(schedulerDB.getBackend()); + cta::objectstore::ScopedExclusiveLock sel(re); + re.fetch(); + + //Get the retrieveQueueToReportToRepackForSuccess + std::string retrieveQueueToReportToRepackForSuccessAddress = re.getRetrieveQueueAddress(allVid.at(i-1),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); + cta::objectstore::RetrieveQueue rq(retrieveQueueToReportToRepackForSuccessAddress,schedulerDB.getBackend()); + + //Fetch the queue so that we can get the retrieveRequests from it + cta::objectstore::ScopedExclusiveLock rql(rq); + rq.fetch(); + + //There should be nbArchiveFiles jobs in the retrieve queue + ASSERT_EQ(rq.dumpJobs().size(),nbArchiveFiles); + int j = 1; + for (auto &job: rq.dumpJobs()) { + //Create the retrieve request from the address of the job and the current backend + cta::objectstore::RetrieveRequest retrieveRequest(job.address,schedulerDB.getBackend()); + retrieveRequest.fetchNoLock(); + uint64_t copyNb = job.copyNb; + common::dataStructures::TapeFile tapeFile = retrieveRequest.getArchiveFile().tapeFiles[copyNb]; + common::dataStructures::RetrieveRequest schedulerRetrieveRequest = retrieveRequest.getSchedulerRequest(); + common::dataStructures::ArchiveFile archiveFile = retrieveRequest.getArchiveFile(); + + //Testing tape file + ASSERT_EQ(tapeFile.vid,allVid.at(i-1)); + ASSERT_EQ(tapeFile.blockId,j * 100); + ASSERT_EQ(tapeFile.fSeq,j); + ASSERT_EQ(tapeFile.checksumType, checksumType); + ASSERT_EQ(tapeFile.checksumValue,checksumValue); + ASSERT_EQ(tapeFile.compressedSize, compressedFileSize); + + //Testing scheduler retrieve request + ASSERT_EQ(schedulerRetrieveRequest.archiveFileID,archiveFileId1++); + std::stringstream ss; + ss<<"repack://public_dir/public_file_"<<i<<"_"<<j; + ASSERT_EQ(schedulerRetrieveRequest.dstURL,ss.str()); + ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); + std::ostringstream diskFilePath; + diskFilePath << "/public_dir/public_file_"<<i<<"_"<<j; + ASSERT_EQ(schedulerRetrieveRequest.diskFileInfo.path,diskFilePath.str()); + //Testing the retrieve request + ASSERT_EQ(retrieveRequest.isRepack(),true); + ASSERT_EQ(retrieveRequest.getQueueType(),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); + ASSERT_EQ(retrieveRequest.getRetrieveFileQueueCriteria().mountPolicy,cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); + ASSERT_EQ(retrieveRequest.getActiveCopyNumber(),1); + ASSERT_EQ(retrieveRequest.getJobStatus(job.copyNb),cta::objectstore::serializers::RetrieveJobStatus::RJS_Succeeded); + ASSERT_EQ(retrieveRequest.getJobs().size(),1); + + //Testing the archive file associated to the retrieve request + ASSERT_EQ(archiveFile.storageClass,storageClass.name); + ASSERT_EQ(archiveFile.diskInstance,storageClass.diskInstance); + ++j; + } + } + //We will now test the getNextSucceededRetrieveRequestForRepackBatch method that + //pop all the RetrieveRequest from the RetrieveQueueToReportToRepackForSuccess queue + { + auto listSucceededRetrieveRequests = scheduler.getNextSucceededRetrieveRequestForRepackBatch(15,lc); + ASSERT_EQ(listSucceededRetrieveRequests.size(),nbArchiveFiles); + int j = 1; + for (auto &retrieveRequest: listSucceededRetrieveRequests) { + //Create the retrieve request from the address of the job and the current backend + uint64_t copyNb = retrieveRequest->selectedCopyNb; + common::dataStructures::TapeFile tapeFile = retrieveRequest->archiveFile.tapeFiles[copyNb]; + common::dataStructures::RetrieveRequest schedulerRetrieveRequest = retrieveRequest->retrieveRequest; + common::dataStructures::ArchiveFile archiveFile = retrieveRequest->archiveFile; + + //Testing tape file + ASSERT_EQ(tapeFile.vid,allVid.at(i-1)); + ASSERT_EQ(tapeFile.blockId,j * 100); + ASSERT_EQ(tapeFile.fSeq,j); + ASSERT_EQ(tapeFile.checksumType, checksumType); + ASSERT_EQ(tapeFile.checksumValue,checksumValue); + ASSERT_EQ(tapeFile.compressedSize, compressedFileSize); + + //Testing scheduler retrieve request + ASSERT_EQ(schedulerRetrieveRequest.archiveFileID,archiveFileId2++); + std::stringstream ss; + ss<<"repack://public_dir/public_file_"<<i<<"_"<<j; + ASSERT_EQ(schedulerRetrieveRequest.dstURL,ss.str()); + ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); + std::ostringstream diskFilePath; + diskFilePath << "/public_dir/public_file_"<<i<<"_"<<j; + ASSERT_EQ(schedulerRetrieveRequest.diskFileInfo.path,diskFilePath.str()); + //Testing the retrieve request + ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); + + //Testing the archive file associated to the retrieve request + ASSERT_EQ(archiveFile.storageClass,storageClass.name); + ASSERT_EQ(archiveFile.diskInstance,storageClass.diskInstance); + ++j; + } } } + ASSERT_EQ(scheduler.getNextSucceededRetrieveRequestForRepackBatch(10,lc).size(),0); } - scheduler.waitSchedulerDbSubthreadsComplete(); - scheduler.getNextSucceededRetrieveRequestForRepackBatch(5,lc); - //scheduler.transformRetrieveRequestsToArchiveForRepack(lc); } #undef TEST_MOCK_DB