From d6f97e34c3430c8396d3a0e2348bb5ea99b717ab Mon Sep 17 00:00:00 2001 From: Cedric CAFFY <cedric.caffy@cern.ch> Date: Wed, 29 Jan 2020 11:52:35 +0100 Subject: [PATCH] Scheduler getNextMount() does not return an ArchiveForRepack mount on a tape that is being repacked. Scenario: The operator launches a Repack Request in a tape V01001 (full) During the expansion, he sets the full flag to false. The tape could be mounted for Archival. Worse, the tape could repack in itself ! This is not possible anymore. --- scheduler/Scheduler.cpp | 11 +++ scheduler/SchedulerTest.cpp | 180 ++++++++++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+) diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 78a3739206..f79acc67e7 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -1098,6 +1098,17 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T mountInfo->potentialMounts.cbegin(), mountInfo->potentialMounts.cend(), [](decltype(*mountInfo->potentialMounts.cbegin())& m){ return common::dataStructures::getMountBasicType(m.type) == common::dataStructures::MountType::ArchiveAllTypes; } )) { tapeList = m_catalogue.getTapesForWriting(logicalLibraryName); + + //filter the tapes that are currently used by repack so that we do not mount them for Archival + //This solves the scenario when a user set a currently-repacking tape full flag to false + //(will trigger an archive mount on it) + std::list<cta::common::dataStructures::RepackInfo> repacks = this->getRepacks(); + tapeList.remove_if([&repacks](const cta::catalogue::TapeForWriting& tapeForWriting){ + return std::find_if(repacks.begin(),repacks.end(),[&tapeForWriting](const cta::common::dataStructures::RepackInfo& repackReq){ + return tapeForWriting.vid == repackReq.vid; + }) != repacks.end(); + }); + getTapeForWriteTime = timer.secs(utils::Timer::resetCounter); } diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index b799b74a03..23c7894e9c 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -2194,6 +2194,186 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) { } } +TEST_P(SchedulerTest, expandRepackRequestDoNotArchiveOnCurrentlyRepackingTapeSetNotFull){ + using namespace cta; + using namespace cta::objectstore; + unitTests::TempDirectory tempDirectory; + auto &catalogue = getCatalogue(); + auto &scheduler = getScheduler(); + auto &schedulerDB = getSchedulerDB(); + cta::objectstore::Backend& backend = schedulerDB.getBackend(); + setupDefaultCatalogue(); + +#ifdef STDOUT_LOGGING + log::StdoutLogger dl("dummy", "unitTest"); +#else + log::DummyLogger dl("", ""); +#endif + log::LogContext lc(dl); + + //Create an agent to represent this test process + cta::objectstore::AgentReference agentReference("expandRepackRequestTest", dl); + cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; + const bool disabledValue = false; + const bool fullValue = true; + const bool readOnlyValue = false; + const std::string comment = "Create tape"; + cta::common::dataStructures::SecurityIdentity admin; + admin.username = "admin_user_name"; + admin.host = "admin_host"; + + //Create a logical library in the catalogue + const bool libraryIsDisabled = false; + catalogue.createLogicalLibrary(admin, s_libraryName, libraryIsDisabled, "Create logical library"); + + std::ostringstream ossVid; + ossVid << s_vid << "_" << 1; + std::string vid = ossVid.str(); + catalogue.createTape(s_adminOnAdminHost,vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes, + disabledValue, fullValue, readOnlyValue, comment); + //Create a repack destination tape + std::string vidDestination = "vidDestination"; + catalogue.createTape(s_adminOnAdminHost,vidDestination, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes, + disabledValue, false, readOnlyValue, comment); + + //Create a storage class in the catalogue + common::dataStructures::StorageClass storageClass; + storageClass.diskInstance = s_diskInstance; + storageClass.name = s_storageClassName; + storageClass.nbCopies = 2; + storageClass.comment = "Create storage class"; + + const std::string tapeDrive = "tape_drive"; + const uint64_t nbArchiveFilesPerTape = 10; + const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000; + + //Simulate the writing of 10 files per tape in the catalogue + std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1; + { + uint64_t archiveFileId = 1; + std::string currentVid = vid; + for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) { + std::ostringstream diskFileId; + diskFileId << (12345677 + archiveFileId); + std::ostringstream diskFilePath; + diskFilePath << "/public_dir/public_file_"<<1<<"_"<< j; + auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>(); + auto & fileWritten = *fileWrittenUP; + fileWritten.archiveFileId = archiveFileId++; + fileWritten.diskInstance = storageClass.diskInstance; + fileWritten.diskFileId = diskFileId.str(); + fileWritten.diskFilePath = diskFilePath.str(); + fileWritten.diskFileOwnerUid = PUBLIC_OWNER_UID; + fileWritten.diskFileGid = PUBLIC_GID; + fileWritten.size = archiveFileSize; + fileWritten.checksumBlob.insert(cta::checksum::ADLER32,"1234"); + fileWritten.storageClassName = s_storageClassName; + fileWritten.vid = currentVid; + fileWritten.fSeq = j; + fileWritten.blockId = j * 100; + fileWritten.size = archiveFileSize; + fileWritten.copyNb = 1; + fileWritten.tapeDrive = tapeDrive; + tapeFilesWrittenCopy1.emplace(fileWrittenUP.release()); + } + //update the DB tape + catalogue.filesWrittenToTape(tapeFilesWrittenCopy1); + tapeFilesWrittenCopy1.clear(); + } + //Test the expandRepackRequest method + scheduler.waitSchedulerDbSubthreadsComplete(); + + { + scheduler.queueRepack(admin,vid,"file://"+tempDirectory.path(),common::dataStructures::RepackInfo::Type::MoveOnly,common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, s_defaultRepackDisabledTapeFlag,lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + //scheduler.waitSchedulerDbSubthreadsComplete(); + + log::TimingList tl; + utils::Timer t; + + //The promoteRepackRequestsToToExpand will only promote 2 RepackRequests to ToExpand status at a time. + scheduler.promoteRepackRequestsToToExpand(lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + + auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand(); + //If we have expanded 2 repack requests, the getNextRepackRequestToExpand will return null as it is not possible + //to promote more than 2 repack requests at a time. So we break here. + + scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + } + { + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); + std::unique_ptr<cta::RetrieveMount> retrieveMount; + retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); + ASSERT_NE(nullptr, retrieveMount.get()); + std::unique_ptr<cta::RetrieveJob> retrieveJob; + + std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs; + //For each tape we will see if the retrieve jobs are not null + for(uint64_t j = 1; j<=nbArchiveFilesPerTape; ++j) + { + auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc); + retrieveJob.reset(jobBatch.front().release()); + ASSERT_NE(nullptr, retrieveJob.get()); + executedJobs.push_back(std::move(retrieveJob)); + } + //Now, report the retrieve jobs to be completed + castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc); + + rrp.startThreads(); + + //Report all jobs as succeeded + for(auto it = executedJobs.begin(); it != executedJobs.end(); ++it) + { + rrp.reportCompletedJob(std::move(*it)); + } + + rrp.setDiskDone(); + rrp.setTapeDone(); + + rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting); + + rrp.reportEndOfSession(); + rrp.waitThread(); + + ASSERT_TRUE(rrp.allThreadsDone()); + } + { + //Do the reporting of RetrieveJobs, will transform the Retrieve request in Archive requests + while (true) { + auto rep = schedulerDB.getNextRepackReportBatch(lc); + if (nullptr == rep) break; + rep->report(lc); + } + } + scheduler.waitSchedulerDbSubthreadsComplete(); + { + //Test that if we set the repacking-tape full flag to false, + //and the destiniation full flag to true, no mount will be returned by the scheduler + catalogue.setTapeFull(admin,vidDestination,true); + catalogue.setTapeFull(admin,vid,false); + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_EQ(nullptr, mount.get()); + + //Set the destination tape full flag as false + catalogue.setTapeFull(admin,vidDestination,false); + //We should have a not null mount now + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack, mount.get()->getMountType()); + } +} + TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) { using namespace cta; using namespace cta::objectstore; -- GitLab