Commit 0380c989 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Corrected the bug that allowed operators to repack twice the same tape

parent ce3161c0
......@@ -62,7 +62,14 @@ namespace {
tapeFile.copyNb = rset.columnUint64("COPY_NB");
tapeFile.creationTime = rset.columnUint64("TAPE_FILE_CREATION_TIME");
tapeFile.checksumBlob = archiveFile.checksumBlob; // Duplicated for convenience
cta::optional<std::string> supersededByVid = rset.columnOptionalString("SUPERSEDED_BY_VID");
if(supersededByVid){
tapeFile.supersededByVid = supersededByVid.value();
}
cta::optional<uint64_t> supersededByFSeq = rset.columnOptionalUint64("SUPERSEDED_BY_FSEQ");
if(supersededByFSeq){
tapeFile.supersededByFSeq = supersededByFSeq.value();
}
archiveFile.tapeFiles.push_back(tapeFile);
}
......@@ -104,6 +111,8 @@ RdbmsCatalogueGetArchiveFilesForRepackItor::RdbmsCatalogueGetArchiveFilesForRepa
"TAPE_COPY.LOGICAL_SIZE_IN_BYTES AS LOGICAL_SIZE_IN_BYTES,"
"TAPE_COPY.COPY_NB AS COPY_NB,"
"TAPE_COPY.CREATION_TIME AS TAPE_FILE_CREATION_TIME, "
"TAPE_COPY.SUPERSEDED_BY_VID AS SUPERSEDED_BY_VID, "
"TAPE_COPY.SUPERSEDED_BY_FSEQ AS SUPERSEDED_BY_FSEQ, "
"TAPE.TAPE_POOL_NAME AS TAPE_POOL_NAME "
"FROM "
"TAPE_FILE REPACK_TAPE "
......@@ -119,6 +128,10 @@ RdbmsCatalogueGetArchiveFilesForRepackItor::RdbmsCatalogueGetArchiveFilesForRepa
"REPACK_TAPE.VID = :VID "
"AND "
"REPACK_TAPE.FSEQ >= :START_FSEQ "
"AND "
"REPACK_TAPE.SUPERSEDED_BY_VID IS NULL "
"AND "
"REPACK_TAPE.SUPERSEDED_BY_FSEQ IS NULL "
"ORDER BY REPACK_TAPE.FSEQ";
m_conn = connPool.getConn();
......
......@@ -503,6 +503,15 @@ auto RepackRequest::getStats() -> std::map<StatsType, StatsValues> {
return ret;
}
//------------------------------------------------------------------------------
// RepackRequest::reportRetrieveCreationFailures()
//------------------------------------------------------------------------------
void RepackRequest::reportRetrieveCreationFailures(const StatsValues& failedRetrieveCreation){
checkPayloadWritable();
m_payload.set_failedtoretrievebytes(m_payload.failedtoretrievebytes() + failedRetrieveCreation.bytes);
m_payload.set_failedtoretrievefiles(m_payload.failedtoretrievefiles() + failedRetrieveCreation.files);
setStatus();
}
//------------------------------------------------------------------------------
// RepackRequest::garbageCollect()
......
......@@ -123,6 +123,8 @@ public:
};
std::map<StatsType, StatsValues> getStats();
void reportRetrieveCreationFailures(const StatsValues &failedRetrieveCreated);
void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
cta::catalogue::Catalogue & catalogue) override;
......
......@@ -2185,7 +2185,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
for (auto rsr: repackSubrequests) fSeqs.insert(rsr.fSeq);
auto subrequestsNames = m_repackRequest.getOrPrepareSubrequestInfo(fSeqs, *m_oStoreDB.m_agentReference);
m_repackRequest.setTotalStats(totalStatsFiles);
uint64_t fSeq = std::max(maxFSeqLowBound+1, maxAddedFSeq + 1);
uint64_t fSeq = std::max(maxFSeqLowBound + 1, maxAddedFSeq + 1);
m_repackRequest.setLastExpandedFSeq(fSeq);
// We make sure the references to subrequests exist persistently before creating them.
m_repackRequest.commit();
......@@ -2197,6 +2197,7 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
// Try to create the retrieve subrequests (owned by this process, to be queued in a second step)
// subrequests can already fail at that point if we cannot find a copy on a valid tape.
std::list<uint64_t> failedFSeqs;
objectstore::RepackRequest::StatsValues failedCreationStats;
uint64_t failedFiles = 0;
uint64_t failedBytes = 0;
// First loop: we will issue the async insertions of the subrequests.
......@@ -2231,8 +2232,8 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
}
} catch (std::out_of_range &) {
failedFSeqs.emplace_back(rsr.fSeq);
failedFiles++;
failedBytes += rsr.archiveFile.fileSize;
failedCreationStats.files++;
failedCreationStats.bytes += rsr.archiveFile.fileSize;
log::ScopedParamContainer params(lc);
params.add("fileID", rsr.archiveFile.archiveFileID)
.add("diskInstance", rsr.archiveFile.diskInstance)
......@@ -2302,8 +2303,8 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
{
// Count the failure for this subrequest.
failedFSeqs.emplace_back(rsr.fSeq);
failedFiles++;
failedBytes += rsr.archiveFile.fileSize;
failedCreationStats.files++;
failedCreationStats.bytes += rsr.archiveFile.fileSize;
log::ScopedParamContainer params(lc);
params.add("fileId", rsr.archiveFile.archiveFileID)
.add("repackVid", repackInfo.vid)
......@@ -2325,8 +2326,8 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
// We can fail to serialize here...
// Count the failure for this subrequest.
failedFSeqs.emplace_back(rsr.fSeq);
failedFiles++;
failedBytes += rsr.archiveFile.fileSize;
failedCreationStats.files++;
failedCreationStats.bytes += rsr.archiveFile.fileSize;
failedFSeqs.emplace_back(rsr.fSeq);
log::ScopedParamContainer params(lc);
params.add("fileId", rsr.archiveFile)
......@@ -2363,8 +2364,8 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
} catch (exception::Exception & ex) {
// Count the failure for this subrequest.
failedFSeqs.emplace_back(aii.rsr.fSeq);
failedFiles++;
failedBytes += aii.rsr.archiveFile.fileSize;
failedCreationStats.files++;
failedCreationStats.bytes += aii.rsr.archiveFile.fileSize;
log::ScopedParamContainer params(lc);
params.add("fileId", aii.rsr.archiveFile)
.add("repackVid", repackInfo.vid)
......@@ -2375,6 +2376,14 @@ void OStoreDB::RepackRequest::addSubrequestsAndUpdateStats(std::list<Subrequest>
"In OStoreDB::RepackRequest::addSubrequests(): could not asyncInsert the subrequest.");
}
}
if(failedFSeqs.size()){
log::ScopedParamContainer params(lc);
params.add("files", failedCreationStats.files);
params.add("bytes", failedCreationStats.bytes);
m_repackRequest.reportRetrieveCreationFailures(failedCreationStats);
m_repackRequest.commit();
lc.log(log::ERR, "In OStoreDB::RepackRequest::addSubRequests(), reported the failed creation of Retrieve Requests to the Repack request");
}
// We now have created the subrequests. Time to enqueue.
{
objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue);
......
......@@ -474,24 +474,28 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
timingList.insertAndReset("fillTotalStatsFileBeforeExpandTime",t);
cta::catalogue::ArchiveFileItor archiveFilesForCatalogue = m_catalogue.getArchiveFilesForRepackItor(repackInfo.vid, fSeq);
timingList.insertAndReset("catalogueGetArchiveFilesForRepackItorTime",t);
std::stringstream dirBufferURL;
dirBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/";
cta::disk::DirectoryFactory dirFactory;
std::unique_ptr<cta::disk::Directory> dir;
dir.reset(dirFactory.createDirectory(dirBufferURL.str()));
std::set<std::string> filesInDirectory;
if(dir->exist()){
filesInDirectory = dir->getFilesName();
} else {
dir->mkdir();
if(archiveFilesForCatalogue.hasMore()){
//We only create the folder if there are some files to Repack
cta::disk::DirectoryFactory dirFactory;
std::unique_ptr<cta::disk::Directory> dir;
dir.reset(dirFactory.createDirectory(dirBufferURL.str()));
if(dir->exist()){
filesInDirectory = dir->getFilesName();
} else {
dir->mkdir();
}
}
double elapsedTime = 0;
bool stopExpansion = false;
repackRequest->m_dbReq->setExpandStartedAndChangeStatus();
while(archiveFilesForCatalogue.hasMore() && !stopExpansion) {
size_t filesCount = 0;
uint64_t maxAddedFSeq = 0;
std::list<SchedulerDatabase::RepackRequest::Subrequest> retrieveSubrequests;
repackRequest->m_dbReq->setExpandStartedAndChangeStatus();
while(filesCount < c_defaultMaxNbFilesForRepack && !stopExpansion && archiveFilesForCatalogue.hasMore())
{
filesCount++;
......@@ -507,14 +511,19 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::MoveOnly) {
// determine which fSeq(s) (normally only one) lives on this tape.
for (auto & tc: archiveFile.tapeFiles) if (tc.vid == repackInfo.vid) {
retrieveSubRequest.copyNbsToRearchive.insert(tc.copyNb);
// We make the (reasonable) assumption that the archive file only has one copy on this tape.
// If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape.
// This will prevent double subrequest creation (we already have such a mechanism in case of crash and
// restart of expansion.
retrieveSubRequest.fSeq = std::min(tc.fSeq, retrieveSubRequest.fSeq);
totalStatsFile.totalFilesToArchive += 1;
totalStatsFile.totalBytesToArchive += retrieveSubRequest.archiveFile.fileSize;
if(tc.supersededByVid.empty()){
//We want to Archive the "active" copies on the tape, thus the copies that are not superseded by another
//we want to Retrieve the "active" fSeq
totalStatsFile.totalFilesToArchive += 1;
totalStatsFile.totalBytesToArchive += retrieveSubRequest.archiveFile.fileSize;
retrieveSubRequest.copyNbsToRearchive.insert(tc.copyNb);
retrieveSubRequest.fSeq = tc.fSeq;
}
//retrieveSubRequest.fSeq = (retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) ? tc.fSeq : std::max(tc.fSeq, retrieveSubRequest.fSeq);
}
}
std::stringstream fileName;
......@@ -560,7 +569,7 @@ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackReques
// We know that the fSeq processed on the tape are >= initial fSeq + filesCount - 1 (or fSeq - 1 as we counted).
// We pass this information to the db for recording in the repack request. This will allow restarting from the right
// value in case of crash.
repackRequest->m_dbReq->addSubrequestsAndUpdateStats(retrieveSubrequests, archiveRoutesMap, fSeq - 1, maxAddedFSeq, totalStatsFile, lc);
repackRequest->m_dbReq->addSubrequestsAndUpdateStats(retrieveSubrequests, archiveRoutesMap, fSeq, maxAddedFSeq, totalStatsFile, lc);
timingList.insertAndReset("addSubrequestsAndUpdateStatsTime",t);
{
if(!stopExpansion && archiveFilesForCatalogue.hasMore()){
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment