diff --git a/disk/DiskFile.cpp b/disk/DiskFile.cpp index 18667be23094059e3ae3f8897a1ca488bb44979a..32a31aa32cdb2587498b23504f0b97150e1f2de0 100644 --- a/disk/DiskFile.cpp +++ b/disk/DiskFile.cpp @@ -774,11 +774,7 @@ bool XRootdDirectory::exist() { if(statStatus.errNo == XErrorCode::kXR_NotFound){ return false; } - cta::exception::XrootCl::throwOnError(statStatus,"In XrootdDirectory::exist(): fail to determine if directory exists."); - if(statInfo->GetSize() != 0){ - return true; - } - return false; + return true; } std::set<std::string> XRootdDirectory::getFilesName(){ diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 7e52328a14b50974d5342f9c8f34d677335ffa83..3d42dd084c322636e8384dc92caf87ca90c8bac1 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -65,6 +65,20 @@ void ArchiveRequest::initialize() { m_payloadInterpreted = true; } +void ArchiveRequest::commit(){ + checkPayloadWritable(); + checkPayloadReadable(); + for(auto & job: m_payload.jobs()){ + int nbTapepool = std::count_if(m_payload.jobs().begin(),m_payload.jobs().end(),[&job](const cta::objectstore::serializers::ArchiveJob & archiveJob){ + return archiveJob.tapepool() == job.tapepool(); + }); + if(nbTapepool != 1){ + throw cta::exception::Exception("In ArchiveRequest::commit(), cannot insert an ArchiveRequest containing archive jobs with the same destination tapepool"); + } + } + ObjectOps<serializers::ArchiveRequest, serializers::ArchiveRequest_t>::commit(); +} + //------------------------------------------------------------------------------ // ArchiveRequest::addJob() //------------------------------------------------------------------------------ diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index 6936f967a1c55b695804566d571279b364bfc9e8..8d925e8150ec27b0a8633a900bc913bf6a0ec149 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -44,6 +44,7 @@ public: ArchiveRequest(Backend & os); ArchiveRequest(GenericObject & go); void initialize(); + void commit(); // Ownership of archive requests is managed per job. Object level owner has no meaning. std::string getOwner() = delete; void setOwner(const std::string &) = delete; diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index ee799a1592f350bb314acf1b264df21452d5cf91..45101ef3b5ad15cb8861402d422dd53366116922 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -489,6 +489,14 @@ void RetrieveRequest::setRetrieveFileQueueCriteria(const cta::common::dataStruct ArchiveFileSerDeser(criteria.archiveFile).serialize(*m_payload.mutable_archivefile()); for (auto &tf: criteria.archiveFile.tapeFiles) { MountPolicySerDeser(criteria.mountPolicy).serialize(*m_payload.mutable_mountpolicy()); + /* + * Explaination about these hardcoded retries : + * The hardcoded RetriesWithinMount will ensure that we will try to retrieve the file 3 times + * in the same mount. + * The hardcoded TotalRetries ensure that we will never try more than 6 times to retrieve a file. + * As totalretries = 6 and retrieswithinmount = 3, this will ensure that the file will be retried by maximum 2 mounts. + * (2 mounts * 3 retrieswithinmount = 6 totalretries) + */ const uint32_t hardcodedRetriesWithinMount = 3; const uint32_t hardcodedTotalRetries = 6; const uint32_t hardcodedReportRetries = 2; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index f074a177c48d1632bb578f97d6b2fcf32f33cc61..419984bb62ca3646ccd40b107249576169f6e132 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -2237,6 +2237,15 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> for (auto & ar: archiveRoutesMap.at(std::make_tuple(rsr.archiveFile.diskInstance, rsr.archiveFile.storageClass))) { rRRepackInfo.archiveRouteMap[ar.second.copyNb] = ar.second.tapePoolName; } + //Check that we do not have the same destination tapepool for two different copyNb + for(auto & currentCopyNbTapePool: rRRepackInfo.archiveRouteMap){ + int nbTapepool = std::count_if(rRRepackInfo.archiveRouteMap.begin(),rRRepackInfo.archiveRouteMap.end(),[¤tCopyNbTapePool](const std::pair<uint64_t,std::string> & copyNbTapepool){ + return copyNbTapepool.second == currentCopyNbTapePool.second; + }); + if(nbTapepool != 1){ + throw cta::ExpandRepackRequestException("In OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(), found the same destination tapepool for multiple copyNb."); + } + } } catch (std::out_of_range &) { notCreatedSubrequests.emplace_back(rsr); failedCreationStats.files++; @@ -4299,10 +4308,12 @@ void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){ objectstore::ArchiveRequest & ar = *sri.subrequest; if (moreJobsToDo) { try { - jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> ( - ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(), - newStatus)), - sri}); + if(ar.exists()){ + jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> ( + ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(), + newStatus)), + sri}); + } } catch (cta::exception::Exception & ex) { // Log the error log::ScopedParamContainer params(lc); diff --git a/scheduler/RepackRequestManager.cpp b/scheduler/RepackRequestManager.cpp index d2c385ac3eb0ea5f2f0617c93e2c8fc37d162fa8..7f7146793d46699bcea90dc380e284149840d772 100644 --- a/scheduler/RepackRequestManager.cpp +++ b/scheduler/RepackRequestManager.cpp @@ -41,10 +41,7 @@ void RepackRequestManager::runOnePass(log::LogContext& lc) { //We have a RepackRequest that has the status ToExpand, expand it try{ m_scheduler.expandRepackRequest(repackRequest,timingList,t,lc); - } catch (const NoArchiveRoute& ex){ - lc.log(log::ERR,ex.what()); - repackRequest->fail(); - } catch (const NoStorageClass &ex){ + } catch (const ExpandRepackRequestException& ex){ lc.log(log::ERR,ex.what()); repackRequest->fail(); } catch (const cta::exception::Exception &e){ diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 691988c67bc84435799cb8a4ec1b70e4734307f6..eddebd0e81a624ea4a94bb62770876123f986789 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -450,13 +450,6 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques auto repackInfo = repackRequest->getRepackInfo(); typedef cta::common::dataStructures::RepackInfo::Type RepackType; - /*if (repackInfo.type != RepackType::MoveOnly && repackInfo.type != RepackType::AddCopiesOnly) { - log::ScopedParamContainer params(lc); - params.add("tapeVid", repackInfo.vid); - lc.log(log::ERR, "In Scheduler::expandRepackRequest(): failing repack request with unsupported (yet) type."); - repackRequest->fail(); - return; - }*/ //We need to get the ArchiveRoutes to allow the retrieval of the tapePool in which the //tape where the file is is located @@ -479,10 +472,10 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques std::stringstream dirBufferURL; dirBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/"; std::set<std::string> filesInDirectory; + std::unique_ptr<cta::disk::Directory> dir; if(archiveFilesForCatalogue.hasMore()){ //We only create the folder if there are some files to Repack cta::disk::DirectoryFactory dirFactory; - std::unique_ptr<cta::disk::Directory> dir; dir.reset(dirFactory.createDirectory(dirBufferURL.str())); if(dir->exist()){ filesInDirectory = dir->getFilesName(); @@ -544,34 +537,35 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques common::dataStructures::StorageClass sc = *storageClassItor; uint64_t nbFilesAlreadyArchived = archiveFile.tapeFiles.size(); uint64_t nbCopiesInStorageClass = sc.nbCopies; - if(nbFilesAlreadyArchived < nbCopiesInStorageClass){ - uint64_t filesToArchive = nbCopiesInStorageClass - nbFilesAlreadyArchived; + uint64_t filesToArchive = nbCopiesInStorageClass - nbFilesAlreadyArchived; + if(filesToArchive > 0){ totalStatsFile.totalFilesToArchive += filesToArchive; totalStatsFile.totalBytesToArchive += (filesToArchive * archiveFile.fileSize); - if(filesToArchive != 0){ - std::set<uint64_t> copyNbsAlreadyInCTA; - for (auto & tc: archiveFile.tapeFiles) { - copyNbsAlreadyInCTA.insert(tc.copyNb); - if (tc.vid == repackInfo.vid) { - // We make the (reasonable) assumption that the archive file only has one copy on this tape. - // If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape. - // This will prevent double subrequest creation (we already have such a mechanism in case of crash and - // restart of expansion. - //We found the copy of the file we want to retrieve and archive - //retrieveSubRequest.fSeq = tc.fSeq; + std::set<uint64_t> copyNbsAlreadyInCTA; + for (auto & tc: archiveFile.tapeFiles) { + copyNbsAlreadyInCTA.insert(tc.copyNb); + if (tc.vid == repackInfo.vid) { + // We make the (reasonable) assumption that the archive file only has one copy on this tape. + // If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape. + // This will prevent double subrequest creation (we already have such a mechanism in case of crash and + // restart of expansion. + //We found the copy of the file we want to retrieve and archive + //retrieveSubRequest.fSeq = tc.fSeq; + if(repackInfo.type == RepackType::AddCopiesOnly) retrieveSubRequest.fSeq = (retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) ? tc.fSeq : std::max(tc.fSeq, retrieveSubRequest.fSeq); - } - } - for(auto archiveFileRoutesItor = archiveFileRoutes.begin(); archiveFileRoutesItor != archiveFileRoutes.end(); ++archiveFileRoutesItor){ - if(copyNbsAlreadyInCTA.find(archiveFileRoutesItor->first) == copyNbsAlreadyInCTA.end()){ - //We need to archive the missing copy - retrieveSubRequest.copyNbsToRearchive.insert(archiveFileRoutesItor->first); - } } - if(retrieveSubRequest.copyNbsToRearchive.size() != filesToArchive){ - throw NoArchiveRoute("In Scheduler::expandRepackRequest(): Missing archive routes for the creation of the new copies of the files"); + } + for(auto archiveFileRoutesItor = archiveFileRoutes.begin(); archiveFileRoutesItor != archiveFileRoutes.end(); ++archiveFileRoutesItor){ + if(copyNbsAlreadyInCTA.find(archiveFileRoutesItor->first) == copyNbsAlreadyInCTA.end()){ + //We need to archive the missing copy + retrieveSubRequest.copyNbsToRearchive.insert(archiveFileRoutesItor->first); } - } else { + } + if(retrieveSubRequest.copyNbsToRearchive.size() != filesToArchive){ + throw ExpandRepackRequestException("In Scheduler::expandRepackRequest(): Missing archive routes for the creation of the new copies of the files"); + } + } else { + if(repackInfo.type == RepackType::AddCopiesOnly){ //Nothing to Archive so nothing to Retrieve as well retrieveSubrequests.pop_back(); continue; @@ -579,7 +573,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques } } else { //No storage class have been found for the current tapefile throw an exception - throw NoStorageClass("In Scheduler::expandRepackRequest(): No storage class have been found for the file to add copies"); + throw ExpandRepackRequestException("In Scheduler::expandRepackRequest(): No storage class have been found for the file to add copies"); } } @@ -649,6 +643,12 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques lc.log(log::INFO,"Expansion time reached, Repack Request requeued in ToExpand queue."); } } else { + if(totalStatsFile.totalFilesToRetrieve == 0){ + //If no files have been retrieve, the repack buffer will have to be deleted + if(dir != nullptr && dir->exist()){ + dir->rmdir(); + } + } repackRequest->m_dbReq->expandDone(); lc.log(log::INFO,"In Scheduler::expandRepackRequest(), repack request expanded"); } diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 3de3649479ae49a204c806bc859d4c29563eec56..dc4cbb1d135309de26f32486b25526c8a444ffae 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -74,8 +74,7 @@ class RetrieveJob; * The scheduler is the unique entry point to the central storage for taped. It is * */ -CTA_GENERATE_EXCEPTION_CLASS(NoArchiveRoute); -CTA_GENERATE_EXCEPTION_CLASS(NoStorageClass); +CTA_GENERATE_EXCEPTION_CLASS(ExpandRepackRequestException); class Scheduler { diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 1c251816c6d95397fe2896155175da04b4ca7f96..94df1ce61aa30b0fd07de47ff348b1ce102e9c08 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -596,13 +596,19 @@ public: return false; /** * For the tests, we try to have the priority by - * alphabetical order : vid1 should be treated before vid2, + * alphabetical order : vid1 / tapepool1 should be treated before vid2/tapepool2, * so if this->vid < other.vid : then this > other.vid, so return false */ if(vid < other.vid) return false; if(vid > other.vid) return true; + + if(tapePool < other.tapePool) + return false; + if(tapePool > other.tapePool) + return true; + return false; } }; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 37709a3f0a4e6c61a8bc503434940f8abbfa003d..4fd01500cf0fa29e30ed11d6b031aa331e80af34 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -3050,7 +3050,7 @@ TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) { { scheduler.waitSchedulerDbSubthreadsComplete(); { - //The first mount given by the scheduler should be the vidDestination2 + //The first mount given by the scheduler should be the vidDestination1 that belongs to the tapepool1 std::unique_ptr<cta::TapeMount> mount; mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); ASSERT_NE(nullptr, mount.get()); @@ -3063,11 +3063,12 @@ TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) { { auto jobBatch = archiveMount->getNextJobBatch(20,20 * archiveFileSize,lc); ASSERT_EQ(10,jobBatch.size()); - ASSERT_EQ(vidDestination2,archiveMount->getVid()); + ASSERT_EQ(vidDestination1,archiveMount->getVid()); } } { + //Second mount should be the vidDestination2 that belongs to the tapepool2 std::unique_ptr<cta::TapeMount> mount; mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); ASSERT_NE(nullptr, mount.get()); @@ -3080,7 +3081,7 @@ TEST_P(SchedulerTest, expandRepackRequestAddCopiesOnly) { { auto jobBatch = archiveMount->getNextJobBatch(20,20 * archiveFileSize,lc); ASSERT_EQ(10,jobBatch.size()); - ASSERT_EQ(vidDestination1,archiveMount->getVid()); + ASSERT_EQ(vidDestination2,archiveMount->getVid()); } } }