Skip to content
Snippets Groups Projects
Commit 4e3433e5 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Making the failure of Archive success reporting more robust

parent 9cee3f2b
No related branches found
No related tags found
No related merge requests found
...@@ -162,6 +162,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct ...@@ -162,6 +162,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct
std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;
std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> validatedSuccessfulDBArchiveJobs; std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> validatedSuccessfulDBArchiveJobs;
std::unique_ptr<cta::ArchiveJob> job; std::unique_ptr<cta::ArchiveJob> job;
std::string failedValidationJobReportURL;
try{ try{
uint64_t files=0; uint64_t files=0;
uint64_t bytes=0; uint64_t bytes=0;
...@@ -184,6 +185,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct ...@@ -184,6 +185,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct
} catch (const cta::exception::Exception &ex){ } catch (const cta::exception::Exception &ex){
//We put the not validated job into this list in order to insert the job //We put the not validated job into this list in order to insert the job
//into the failedToReportArchiveJobs list in the exception catching block //into the failedToReportArchiveJobs list in the exception catching block
failedValidationJobReportURL = job->reportURL();
validatedSuccessfulDBArchiveJobs.emplace_back(std::move(job->m_dbJob)); validatedSuccessfulDBArchiveJobs.emplace_back(std::move(job->m_dbJob));
throw ex; throw ex;
} }
...@@ -199,7 +201,14 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct ...@@ -199,7 +201,14 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct
tapeItemsWritten.emplace(tiwup.release()); tapeItemsWritten.emplace(tiwup.release());
} }
utils::Timer t; utils::Timer t;
// Note: former content of ReportFlush::updateCatalogueWithTapeFilesWritten
// Now get the db mount to mark the jobs as successful.
// Extract the db jobs from the scheduler jobs.
for (auto &schJob: validatedSuccessfulArchiveJobs) {
validatedSuccessfulDBArchiveJobs.emplace_back(std::move(schJob->m_dbJob));
}
validatedSuccessfulArchiveJobs.clear();
updateCatalogueWithTapeFilesWritten(tapeItemsWritten); updateCatalogueWithTapeFilesWritten(tapeItemsWritten);
catalogueTime=t.secs(utils::Timer::resetCounter); catalogueTime=t.secs(utils::Timer::resetCounter);
{ {
...@@ -211,13 +220,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct ...@@ -211,13 +220,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct
logContext.log(cta::log::INFO, "Catalog updated for batch of jobs"); logContext.log(cta::log::INFO, "Catalog updated for batch of jobs");
} }
// Now get the db mount to mark the jobs as successful. // We can now pass thevalidatedSuccessfulArchiveJobs list for the dbMount to process. We are done at that point.
// Extract the db jobs from the scheduler jobs.
for (auto &schJob: validatedSuccessfulArchiveJobs) {
validatedSuccessfulDBArchiveJobs.emplace_back(std::move(schJob->m_dbJob));
}
// We can now pass this list for the dbMount to process. We are done at that point.
// Reporting to client will be queued if needed and done in another process. // Reporting to client will be queued if needed and done in another process.
m_dbMount->setJobBatchTransferred(validatedSuccessfulDBArchiveJobs, logContext); m_dbMount->setJobBatchTransferred(validatedSuccessfulDBArchiveJobs, logContext);
schedulerDbTime=t.secs(utils::Timer::resetCounter); schedulerDbTime=t.secs(utils::Timer::resetCounter);
...@@ -236,7 +239,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct ...@@ -236,7 +239,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct
.add("diskInstance", job->archiveFile.diskInstance) .add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId) .add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportURL", job->reportURL()); .add("reportURL", failedValidationJobReportURL);
} }
const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): job does not exist in the objectstore."; const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): job does not exist in the objectstore.";
logContext.log(cta::log::WARNING, msg_error); logContext.log(cta::log::WARNING, msg_error);
...@@ -248,10 +251,17 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct ...@@ -248,10 +251,17 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct
.add("diskInstance", job->archiveFile.diskInstance) .add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId) .add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportURL", job->reportURL()); .add("reportURL", failedValidationJobReportURL);
} }
const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception"; const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception";
logContext.log(cta::log::ERR, msg_error); logContext.log(cta::log::ERR, msg_error);
//If validatedSuccessfulArchiveJobs has still jobs in it, it means that
//the validation job->validateAndGetTapeFileWritten() failed for one job and
//threw an exception. We will then have to fail all the others.
for(auto &ctaJob: validatedSuccessfulArchiveJobs){
if(ctaJob.get())
validatedSuccessfulDBArchiveJobs.emplace_back(std::move(ctaJob->m_dbJob));
}
for(auto &aj: validatedSuccessfulDBArchiveJobs){ for(auto &aj: validatedSuccessfulDBArchiveJobs){
if(aj.get()) if(aj.get())
failedToReportArchiveJobs.push(std::move(aj)); failedToReportArchiveJobs.push(std::move(aj));
......
...@@ -258,7 +258,11 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa ...@@ -258,7 +258,11 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa
} catch(const cta::ArchiveMount::FailedMigrationRecallResult &ex){ } catch(const cta::ArchiveMount::FailedMigrationRecallResult &ex){
while(!failedToReportArchiveJobs.empty()){ while(!failedToReportArchiveJobs.empty()){
auto archiveJob = std::move(failedToReportArchiveJobs.front()); auto archiveJob = std::move(failedToReportArchiveJobs.front());
archiveJob->failTransfer(ex.getMessageValue(),reportPacker.m_lc); try{
archiveJob->failTransfer(ex.getMessageValue(),reportPacker.m_lc);
} catch(const cta::exception::Exception &ex2) {
//If the failTransfer method fails, we can't do anything about it
}
failedToReportArchiveJobs.pop(); failedToReportArchiveJobs.pop();
} }
throw ex; throw ex;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment