diff --git a/catalogue/RdbmsCatalogueGetArchiveFilesForRepackItor.cpp b/catalogue/RdbmsCatalogueGetArchiveFilesForRepackItor.cpp index 488655edb6d0f21c8bd62a497bb0c001f8b945e4..5857695308921701d8d93b6ab39a5e7382573176 100644 --- a/catalogue/RdbmsCatalogueGetArchiveFilesForRepackItor.cpp +++ b/catalogue/RdbmsCatalogueGetArchiveFilesForRepackItor.cpp @@ -62,7 +62,14 @@ namespace { tapeFile.copyNb = rset.columnUint64("COPY_NB"); tapeFile.creationTime = rset.columnUint64("TAPE_FILE_CREATION_TIME"); tapeFile.checksumBlob = archiveFile.checksumBlob; // Duplicated for convenience - + cta::optional<std::string> supersededByVid = rset.columnOptionalString("SUPERSEDED_BY_VID"); + if(supersededByVid){ + tapeFile.supersededByVid = supersededByVid.value(); + } + cta::optional<uint64_t> supersededByFSeq = rset.columnOptionalUint64("SUPERSEDED_BY_FSEQ"); + if(supersededByFSeq){ + tapeFile.supersededByFSeq = supersededByFSeq.value(); + } archiveFile.tapeFiles.push_back(tapeFile); } @@ -104,6 +111,8 @@ RdbmsCatalogueGetArchiveFilesForRepackItor::RdbmsCatalogueGetArchiveFilesForRepa "TAPE_COPY.LOGICAL_SIZE_IN_BYTES AS LOGICAL_SIZE_IN_BYTES," "TAPE_COPY.COPY_NB AS COPY_NB," "TAPE_COPY.CREATION_TIME AS TAPE_FILE_CREATION_TIME, " + "TAPE_COPY.SUPERSEDED_BY_VID AS SUPERSEDED_BY_VID, " + "TAPE_COPY.SUPERSEDED_BY_FSEQ AS SUPERSEDED_BY_FSEQ, " "TAPE.TAPE_POOL_NAME AS TAPE_POOL_NAME " "FROM " "TAPE_FILE REPACK_TAPE " @@ -119,6 +128,10 @@ RdbmsCatalogueGetArchiveFilesForRepackItor::RdbmsCatalogueGetArchiveFilesForRepa "REPACK_TAPE.VID = :VID " "AND " "REPACK_TAPE.FSEQ >= :START_FSEQ " + "AND " + "REPACK_TAPE.SUPERSEDED_BY_VID IS NULL " + "AND " + "REPACK_TAPE.SUPERSEDED_BY_FSEQ IS NULL " "ORDER BY REPACK_TAPE.FSEQ"; m_conn = connPool.getConn(); diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index a61e58e3097ccd4fb06f8c7ac80acb7be77caa31..fa481405e9745f80a6954fa79bfabe126d456d00 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -503,6 +503,15 @@ auto RepackRequest::getStats() -> std::map<StatsType, StatsValues> { return ret; } +//------------------------------------------------------------------------------ +// RepackRequest::reportRetrieveCreationFailures() +//------------------------------------------------------------------------------ +void RepackRequest::reportRetrieveCreationFailures(const StatsValues& failedRetrieveCreation){ + checkPayloadWritable(); + m_payload.set_failedtoretrievebytes(m_payload.failedtoretrievebytes() + failedRetrieveCreation.bytes); + m_payload.set_failedtoretrievefiles(m_payload.failedtoretrievefiles() + failedRetrieveCreation.files); + setStatus(); +} //------------------------------------------------------------------------------ // RepackRequest::garbageCollect() diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp index 29b1d618aed8fc04ec7c88f5341134d9f348585f..f1ae27b6aea5a4ebd98316fe9e0d813f1f549c2f 100644 --- a/objectstore/RepackRequest.hpp +++ b/objectstore/RepackRequest.hpp @@ -123,6 +123,8 @@ public: }; std::map<StatsType, StatsValues> getStats(); + void reportRetrieveCreationFailures(const StatsValues &failedRetrieveCreated); + void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, cta::catalogue::Catalogue & catalogue) override; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index ff904950d12a0f6895706cd3444ba3a2a0122f80..10364dcb100933a5d688bfada64da5f9e9974086 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -2185,7 +2185,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> for (auto rsr: repackSubrequests) fSeqs.insert(rsr.fSeq); auto subrequestsNames = m_repackRequest.getOrPrepareSubrequestInfo(fSeqs, *m_oStoreDB.m_agentReference); m_repackRequest.setTotalStats(totalStatsFiles); - uint64_t fSeq = std::max(maxFSeqLowBound+1, maxAddedFSeq + 1); + uint64_t fSeq = std::max(maxFSeqLowBound + 1, maxAddedFSeq + 1); m_repackRequest.setLastExpandedFSeq(fSeq); // We make sure the references to subrequests exist persistently before creating them. m_repackRequest.commit(); @@ -2197,6 +2197,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> // Try to create the retrieve subrequests (owned by this process, to be queued in a second step) // subrequests can already fail at that point if we cannot find a copy on a valid tape. std::list<uint64_t> failedFSeqs; + objectstore::RepackRequest::StatsValues failedCreationStats; uint64_t failedFiles = 0; uint64_t failedBytes = 0; // First loop: we will issue the async insertions of the subrequests. @@ -2231,8 +2232,8 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> } } catch (std::out_of_range &) { failedFSeqs.emplace_back(rsr.fSeq); - failedFiles++; - failedBytes += rsr.archiveFile.fileSize; + failedCreationStats.files++; + failedCreationStats.bytes += rsr.archiveFile.fileSize; log::ScopedParamContainer params(lc); params.add("fileID", rsr.archiveFile.archiveFileID) .add("diskInstance", rsr.archiveFile.diskInstance) @@ -2302,8 +2303,8 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> { // Count the failure for this subrequest. failedFSeqs.emplace_back(rsr.fSeq); - failedFiles++; - failedBytes += rsr.archiveFile.fileSize; + failedCreationStats.files++; + failedCreationStats.bytes += rsr.archiveFile.fileSize; log::ScopedParamContainer params(lc); params.add("fileId", rsr.archiveFile.archiveFileID) .add("repackVid", repackInfo.vid) @@ -2325,8 +2326,8 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> // We can fail to serialize here... // Count the failure for this subrequest. failedFSeqs.emplace_back(rsr.fSeq); - failedFiles++; - failedBytes += rsr.archiveFile.fileSize; + failedCreationStats.files++; + failedCreationStats.bytes += rsr.archiveFile.fileSize; failedFSeqs.emplace_back(rsr.fSeq); log::ScopedParamContainer params(lc); params.add("fileId", rsr.archiveFile) @@ -2363,8 +2364,8 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> } catch (exception::Exception & ex) { // Count the failure for this subrequest. failedFSeqs.emplace_back(aii.rsr.fSeq); - failedFiles++; - failedBytes += aii.rsr.archiveFile.fileSize; + failedCreationStats.files++; + failedCreationStats.bytes += aii.rsr.archiveFile.fileSize; log::ScopedParamContainer params(lc); params.add("fileId", aii.rsr.archiveFile) .add("repackVid", repackInfo.vid) @@ -2375,6 +2376,14 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest> "In OStoreDB::RepackRequest::addSubrequests(): could not asyncInsert the subrequest."); } } + if(failedFSeqs.size()){ + log::ScopedParamContainer params(lc); + params.add("files", failedCreationStats.files); + params.add("bytes", failedCreationStats.bytes); + m_repackRequest.reportRetrieveCreationFailures(failedCreationStats); + m_repackRequest.commit(); + lc.log(log::ERR, "In OStoreDB::RepackRequest::addSubRequests(), reported the failed creation of Retrieve Requests to the Repack request"); + } // We now have created the subrequests. Time to enqueue. { objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue); diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index eb40949d9d05deb89c5b7a9816c604ab1cbefbc9..671ac6500a3195a88ff776f4e95f2f2adcb75998 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -474,24 +474,28 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques timingList.insertAndReset("fillTotalStatsFileBeforeExpandTime",t); cta::catalogue::ArchiveFileItor archiveFilesForCatalogue = m_catalogue.getArchiveFilesForRepackItor(repackInfo.vid, fSeq); timingList.insertAndReset("catalogueGetArchiveFilesForRepackItorTime",t); + std::stringstream dirBufferURL; dirBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/"; - cta::disk::DirectoryFactory dirFactory; - std::unique_ptr<cta::disk::Directory> dir; - dir.reset(dirFactory.createDirectory(dirBufferURL.str())); std::set<std::string> filesInDirectory; - if(dir->exist()){ - filesInDirectory = dir->getFilesName(); - } else { - dir->mkdir(); + if(archiveFilesForCatalogue.hasMore()){ + //We only create the folder if there are some files to Repack + cta::disk::DirectoryFactory dirFactory; + std::unique_ptr<cta::disk::Directory> dir; + dir.reset(dirFactory.createDirectory(dirBufferURL.str())); + if(dir->exist()){ + filesInDirectory = dir->getFilesName(); + } else { + dir->mkdir(); + } } double elapsedTime = 0; bool stopExpansion = false; + repackRequest->m_dbReq->setExpandStartedAndChangeStatus(); while(archiveFilesForCatalogue.hasMore() && !stopExpansion) { size_t filesCount = 0; uint64_t maxAddedFSeq = 0; std::list<SchedulerDatabase::RepackRequest::Subrequest> retrieveSubrequests; - repackRequest->m_dbReq->setExpandStartedAndChangeStatus(); while(filesCount < c_defaultMaxNbFilesForRepack && !stopExpansion && archiveFilesForCatalogue.hasMore()) { filesCount++; @@ -507,14 +511,19 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::MoveOnly) { // determine which fSeq(s) (normally only one) lives on this tape. for (auto & tc: archiveFile.tapeFiles) if (tc.vid == repackInfo.vid) { - retrieveSubRequest.copyNbsToRearchive.insert(tc.copyNb); // We make the (reasonable) assumption that the archive file only has one copy on this tape. // If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape. // This will prevent double subrequest creation (we already have such a mechanism in case of crash and // restart of expansion. - retrieveSubRequest.fSeq = std::min(tc.fSeq, retrieveSubRequest.fSeq); - totalStatsFile.totalFilesToArchive += 1; - totalStatsFile.totalBytesToArchive += retrieveSubRequest.archiveFile.fileSize; + if(tc.supersededByVid.empty()){ + //We want to Archive the "active" copies on the tape, thus the copies that are not superseded by another + //we want to Retrieve the "active" fSeq + totalStatsFile.totalFilesToArchive += 1; + totalStatsFile.totalBytesToArchive += retrieveSubRequest.archiveFile.fileSize; + retrieveSubRequest.copyNbsToRearchive.insert(tc.copyNb); + retrieveSubRequest.fSeq = tc.fSeq; + } + //retrieveSubRequest.fSeq = (retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) ? tc.fSeq : std::max(tc.fSeq, retrieveSubRequest.fSeq); } } std::stringstream fileName; @@ -560,7 +569,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques // We know that the fSeq processed on the tape are >= initial fSeq + filesCount - 1 (or fSeq - 1 as we counted). // We pass this information to the db for recording in the repack request. This will allow restarting from the right // value in case of crash. - repackRequest->m_dbReq->addSubrequestsAndUpdateStats(retrieveSubrequests, archiveRoutesMap, fSeq - 1, maxAddedFSeq, totalStatsFile, lc); + repackRequest->m_dbReq->addSubrequestsAndUpdateStats(retrieveSubrequests, archiveRoutesMap, fSeq, maxAddedFSeq, totalStatsFile, lc); timingList.insertAndReset("addSubrequestsAndUpdateStatsTime",t); { if(!stopExpansion && archiveFilesForCatalogue.hasMore()){