diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 26b393f958207c041584927ea31c8b6acf349abf..6685632923aabac7c77ba057b1aa99ee1ea4f685 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -11,6 +11,7 @@ ### Bug fixes - cta/CTA#1059 - Clean up warnings reported by static analysis - cta/CTA#1062 - cta-admin tf rm should store the diskFilePath when deleting the tape file copy +- cta/CTA#1073 - Retry failed reporting for archive jobs # v4.3-3 diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp index b888968936184d1b68ffc108777e57bf7a2af724..a053f1f268d7cfba3e933ff3e39d380ed5d928bd 100644 --- a/objectstore/BackendRados.cpp +++ b/objectstore/BackendRados.cpp @@ -750,7 +750,9 @@ void BackendRados::lockBackoff(const std::string& name, uint64_t timeout_us, Loc } if (-EBUSY != rc) break; if (timeout_us && (timeoutTimer.usecs() > (int64_t)timeout_us)) { - throw exception::Exception("In BackendRados::lockBackoff(): timeout : timeout set = "+std::to_string(timeout_us)+" usec, time to lock the object : "+std::to_string(timeoutTimer.usecs())+" usec, number of tries to lock = "+std::to_string(nbTriesToLock)); + throw exception::Exception( + "In BackendRados::lockBackoff(): timeout : timeout set = " + std::to_string(timeout_us) + " usec, time to lock the object : " + + std::to_string(timeoutTimer.usecs()) + " usec, number of tries to lock = " + std::to_string(nbTriesToLock) + " object: " + name); } timespec ts; auto latencyUsecs=t.usecs(); diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index 94a689fb1c2cc97835b1ab0fa314427a1c904098..0f7d5741583ecf5a403822450c5bdab8152cce3d 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -32,10 +32,10 @@ cta::ArchiveJob::~ArchiveJob() throw() { // constructor //------------------------------------------------------------------------------ cta::ArchiveJob::ArchiveJob(ArchiveMount *mount, - catalogue::Catalogue & catalogue, - const common::dataStructures::ArchiveFile &archiveFile, - const std::string &srcURL, - const common::dataStructures::TapeFile &tapeFile): + catalogue::Catalogue& catalogue, + const common::dataStructures::ArchiveFile& archiveFile, + const std::string& srcURL, + const common::dataStructures::TapeFile& tapeFile) : m_mount(mount), m_catalogue(catalogue), archiveFile(archiveFile), srcURL(srcURL), @@ -54,7 +54,7 @@ double cta::ArchiveJob::reportTime() { cta::catalogue::TapeItemWrittenPointer cta::ArchiveJob::validateAndGetTapeFileWritten() { validate(); auto fileReportUP = cta::make_unique<catalogue::TapeFileWritten>(); - auto & fileReport = *fileReportUP; + auto& fileReport = *fileReportUP; fileReport.archiveFileId = archiveFile.archiveFileID; fileReport.blockId = tapeFile.blockId; fileReport.checksumBlob = tapeFile.checksumBlob; @@ -74,10 +74,9 @@ cta::catalogue::TapeItemWrittenPointer cta::ArchiveJob::validateAndGetTapeFileWr //------------------------------------------------------------------------------ // ArchiveJob::validate //------------------------------------------------------------------------------ -void cta::ArchiveJob::validate(){ +void cta::ArchiveJob::validate() { // First check that the block Id for the file has been set. - if (tapeFile.blockId == - std::numeric_limits<decltype(tapeFile.blockId)>::max()) + if (tapeFile.blockId == std::numeric_limits<decltype(tapeFile.blockId)>::max()) throw BlockIdNotSet("In cta::ArchiveJob::validate(): Block ID not set"); // Also check the checksum has been set if (archiveFile.checksumBlob.empty() || tapeFile.checksumBlob.empty()) @@ -91,27 +90,27 @@ void cta::ArchiveJob::validate(){ //------------------------------------------------------------------------------ std::string cta::ArchiveJob::exceptionThrowingReportURL() { switch (m_dbJob->reportType) { - case SchedulerDatabase::ArchiveJob::ReportType::CompletionReport: - return m_dbJob->archiveReportURL; - case SchedulerDatabase::ArchiveJob::ReportType::FailureReport: { - if (m_dbJob->latestError.empty()) { - throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): empty failure reason."); + case SchedulerDatabase::ArchiveJob::ReportType::CompletionReport: + return m_dbJob->archiveReportURL; + case SchedulerDatabase::ArchiveJob::ReportType::FailureReport: { + if (m_dbJob->latestError.empty()) { + throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): empty failure reason."); + } + std::string base64ErrorReport; + // Construct a pipe: msg -> sign -> Base64 encode -> result goes into ret. + const bool noNewLineInBase64Output = false; + CryptoPP::StringSource ss1(m_dbJob->latestError, true, + new CryptoPP::Base64Encoder( + new CryptoPP::StringSink(base64ErrorReport), noNewLineInBase64Output)); + return m_dbJob->errorReportURL + base64ErrorReport; } - std::string base64ErrorReport; - // Construct a pipe: msg -> sign -> Base64 encode -> result goes into ret. - const bool noNewLineInBase64Output = false; - CryptoPP::StringSource ss1(m_dbJob->latestError, true, - new CryptoPP::Base64Encoder( - new CryptoPP::StringSink(base64ErrorReport), noNewLineInBase64Output)); - return m_dbJob->errorReportURL + base64ErrorReport; - } - case SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired: - throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): job status NoReportRequired does not require reporting."); - case SchedulerDatabase::ArchiveJob::ReportType::Report: - throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): job status Report does not require reporting."); + case SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired: + throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): job status NoReportRequired does not require reporting."); + case SchedulerDatabase::ArchiveJob::ReportType::Report: + throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): job status Report does not require reporting."); } throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): invalid report type reportType=" + - std::to_string(static_cast<uint8_t>(m_dbJob->reportType))); + std::to_string(static_cast<uint8_t>(m_dbJob->reportType))); } //------------------------------------------------------------------------------ @@ -120,9 +119,9 @@ std::string cta::ArchiveJob::exceptionThrowingReportURL() { std::string cta::ArchiveJob::reportURL() noexcept { try { return exceptionThrowingReportURL(); - } catch(exception::Exception &ex) { + } catch (exception::Exception& ex) { return ex.what(); - } catch(...) { + } catch (...) { return "In ArchiveJob::reportURL(): unknown exception"; } } @@ -132,16 +131,15 @@ std::string cta::ArchiveJob::reportURL() noexcept { //------------------------------------------------------------------------------ std::string cta::ArchiveJob::reportType() { switch (m_dbJob->reportType) { - case SchedulerDatabase::ArchiveJob::ReportType::CompletionReport: - return "CompletionReport"; - case SchedulerDatabase::ArchiveJob::ReportType::FailureReport: - return "FailureReport"; - case SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired: - return "NoReportRequired"; - case SchedulerDatabase::ArchiveJob::ReportType::Report: - return "Report"; - default: - { + case SchedulerDatabase::ArchiveJob::ReportType::CompletionReport: + return "CompletionReport"; + case SchedulerDatabase::ArchiveJob::ReportType::FailureReport: + return "FailureReport"; + case SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired: + return "NoReportRequired"; + case SchedulerDatabase::ArchiveJob::ReportType::Report: + return "Report"; + default: { throw exception::Exception("In ArchiveJob::reportType(): job status does not require reporting."); } } @@ -160,7 +158,7 @@ void cta::ArchiveJob::reportFailed(const std::string& failureReason, log::LogCon //------------------------------------------------------------------------------ // ArchiveJob::transferFailed //------------------------------------------------------------------------------ -void cta::ArchiveJob::transferFailed(const std::string &failureReason, log::LogContext & lc) { +void cta::ArchiveJob::transferFailed(const std::string& failureReason, log::LogContext& lc) { // This is fully delegated to the DB, which will handle the queueing for next steps, if any. m_dbJob->failTransfer(failureReason, lc); } diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index ce84522753c57d7f9e5288c60fd35b31e4803cc5..4a323f779f4707ec39d79c1ba35f443e9f8927b8 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -25,20 +25,20 @@ //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ -cta::ArchiveMount::ArchiveMount(catalogue::Catalogue & catalogue): m_catalogue(catalogue), m_sessionRunning(false){ +cta::ArchiveMount::ArchiveMount(catalogue::Catalogue& catalogue) : m_catalogue(catalogue), m_sessionRunning(false) { } //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ -cta::ArchiveMount::ArchiveMount(catalogue::Catalogue & catalogue, - std::unique_ptr<SchedulerDatabase::ArchiveMount> dbMount): m_catalogue(catalogue), - m_sessionRunning(false) { +cta::ArchiveMount::ArchiveMount(catalogue::Catalogue& catalogue, + std::unique_ptr<SchedulerDatabase::ArchiveMount> dbMount) : m_catalogue(catalogue), + m_sessionRunning(false) { m_dbMount.reset( - dynamic_cast<SchedulerDatabase::ArchiveMount*>(dbMount.release())); - if(!m_dbMount.get()) { + dynamic_cast<SchedulerDatabase::ArchiveMount *>(dbMount.release())); + if (!m_dbMount) { throw WrongMountType(std::string(__FUNCTION__) + - ": could not cast mount to SchedulerDatabase::ArchiveMount"); + ": could not cast mount to SchedulerDatabase::ArchiveMount"); } } @@ -75,29 +75,28 @@ std::string cta::ArchiveMount::getPoolName() const { // getVo //------------------------------------------------------------------------------ std::string cta::ArchiveMount::getVo() const { - return m_dbMount->mountInfo.vo; + return m_dbMount->mountInfo.vo; } //------------------------------------------------------------------------------ // getMediaType //------------------------------------------------------------------------------ -std::string cta::ArchiveMount::getMediaType() const{ - return m_dbMount->mountInfo.mediaType; +std::string cta::ArchiveMount::getMediaType() const { + return m_dbMount->mountInfo.mediaType; } //------------------------------------------------------------------------------ // getVendor //------------------------------------------------------------------------------ -std::string cta::ArchiveMount::getVendor() const{ - return m_dbMount->mountInfo.vendor; +std::string cta::ArchiveMount::getVendor() const { + return m_dbMount->mountInfo.vendor; } //------------------------------------------------------------------------------ // getCapacityInBytes //------------------------------------------------------------------------------ -uint64_t cta::ArchiveMount::getCapacityInBytes() const -{ - return m_dbMount->mountInfo.capacityInBytes; +uint64_t cta::ArchiveMount::getCapacityInBytes() const { + return m_dbMount->mountInfo.capacityInBytes; } //------------------------------------------------------------------------------ @@ -110,7 +109,7 @@ uint32_t cta::ArchiveMount::getNbFiles() const { //------------------------------------------------------------------------------ // createDiskReporter //------------------------------------------------------------------------------ -cta::disk::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL) { +cta::disk::DiskReporter *cta::ArchiveMount::createDiskReporter(std::string& URL) { return m_reporterFactory.createDiskReporter(URL); } @@ -119,7 +118,7 @@ cta::disk::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL) //------------------------------------------------------------------------------ std::string cta::ArchiveMount::getMountTransactionId() const { std::stringstream id; - if (!m_dbMount.get()) + if (!m_dbMount) throw exception::Exception("In cta::ArchiveMount::getMountTransactionId(): got NULL dbMount"); id << m_dbMount->mountInfo.mountId; return id.str(); @@ -128,7 +127,7 @@ std::string cta::ArchiveMount::getMountTransactionId() const { //------------------------------------------------------------------------------ // updateCatalogueWithTapeFilesWritten //------------------------------------------------------------------------------ -void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta::catalogue::TapeItemWrittenPointer> &tapeFilesWritten) { +void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta::catalogue::TapeItemWrittenPointer>& tapeFilesWritten) { m_catalogue.filesWrittenToTape(tapeFilesWritten); } @@ -136,18 +135,18 @@ void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta:: // getNextJobBatch //------------------------------------------------------------------------------ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(uint64_t filesRequested, - uint64_t bytesRequested, log::LogContext& logContext) { + uint64_t bytesRequested, log::LogContext& logContext) { // Check we are still running the session if (!m_sessionRunning) throw SessionNotRunning("In ArchiveMount::getNextJobBatch(): trying to get job from complete/not started session"); // try and get a new job from the DB side std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> dbJobBatch(m_dbMount->getNextJobBatch(filesRequested, - bytesRequested, logContext)); + bytesRequested, logContext)); std::list<std::unique_ptr<ArchiveJob>> ret; // We prepare the response - for (auto & sdaj: dbJobBatch) { + for (auto& sdaj: dbJobBatch) { ret.emplace_back(new ArchiveJob(this, m_catalogue, - sdaj->archiveFile, sdaj->srcURL, sdaj->tapeFile)); + sdaj->archiveFile, sdaj->srcURL, sdaj->tapeFile)); ret.back()->m_dbJob.reset(sdaj.release()); } return ret; @@ -156,13 +155,16 @@ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch( //------------------------------------------------------------------------------ // reportJobsBatchWritten //------------------------------------------------------------------------------ -void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs, - std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, std::queue<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>>& failedToReportArchiveJobs,cta::log::LogContext& logContext) { +void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<cta::ArchiveJob> >& successfulArchiveJobs, + std::queue<cta::catalogue::TapeItemWritten>& skippedFiles, + std::queue<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>>& failedToReportArchiveJobs, + cta::log::LogContext& logContext) { std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten; std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> validatedSuccessfulDBArchiveJobs; std::unique_ptr<cta::ArchiveJob> job; std::string failedValidationJobReportURL; + bool catalogue_updated = false; try{ uint64_t files=0; uint64_t bytes=0; @@ -173,16 +175,16 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct // Get the next job to report and make sure we will not attempt to process it twice. job = std::move(successfulArchiveJobs.front()); successfulArchiveJobs.pop(); - if (!job.get()) continue; + if (!job) continue; cta::log::ScopedParamContainer params(logContext); - params.add("tapeVid",job->tapeFile.vid) - .add("mountType",cta::common::dataStructures::toString(job->m_mount->getMountType())) - .add("fileId",job->archiveFile.archiveFileID) + params.add("tapeVid", job->tapeFile.vid) + .add("mountType", cta::common::dataStructures::toString(job->m_mount->getMountType())) + .add("fileId", job->archiveFile.archiveFileID) .add("type", "ReportSuccessful"); - logContext.log(cta::log::INFO, "In cta::ArchiveMount::reportJobsBatchTransferred(), archive job succesful."); + logContext.log(cta::log::INFO, "In cta::ArchiveMount::reportJobsBatchTransferred(), archive job successful."); try { tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release()); - } catch (const cta::exception::Exception &ex){ + } catch (const cta::exception::Exception& ex) { //We put the not validated job into this list in order to insert the job //into the failedToReportArchiveJobs list in the exception catching block failedValidationJobReportURL = job->reportURL(); @@ -190,7 +192,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct throw ex; } files++; - bytes+=job->archiveFile.fileSize; + bytes += job->archiveFile.fileSize; validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); job.reset(); } @@ -204,12 +206,13 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct // Now get the db mount to mark the jobs as successful. // Extract the db jobs from the scheduler jobs. - for (auto &schJob: validatedSuccessfulArchiveJobs) { + for (auto& schJob: validatedSuccessfulArchiveJobs) { validatedSuccessfulDBArchiveJobs.emplace_back(std::move(schJob->m_dbJob)); } validatedSuccessfulArchiveJobs.clear(); updateCatalogueWithTapeFilesWritten(tapeItemsWritten); + catalogue_updated = true; catalogueTime=t.secs(utils::Timer::resetCounter); { cta::log::ScopedParamContainer params(logContext); @@ -220,69 +223,79 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct logContext.log(cta::log::INFO, "Catalog updated for batch of jobs"); } - // We can now pass thevalidatedSuccessfulArchiveJobs list for the dbMount to process. We are done at that point. + // We can now pass the validatedSuccessfulArchiveJobs list for the dbMount to process. We are done at that point. // Reporting to client will be queued if needed and done in another process. m_dbMount->setJobBatchTransferred(validatedSuccessfulDBArchiveJobs, logContext); - schedulerDbTime=t.secs(utils::Timer::resetCounter); + schedulerDbTime = t.secs(utils::Timer::resetCounter); cta::log::ScopedParamContainer params(logContext); params.add("files", files) .add("bytes", bytes) .add("catalogueTime", catalogueTime) .add("schedulerDbTime", schedulerDbTime) - .add("totalTime", catalogueTime + schedulerDbTime + clientReportingTime); + .add("totalTime", catalogueTime + schedulerDbTime + clientReportingTime); logContext.log(log::INFO, "In ArchiveMount::reportJobsBatchWritten(): recorded a batch of archive jobs in metadata."); } catch (const cta::exception::NoSuchObject& ex){ cta::log::ScopedParamContainer params(logContext); params.add("exceptionMessageValue", ex.getMessageValue()); - if (job.get()) { + if (job) { params.add("fileId", job->archiveFile.archiveFileID) .add("diskInstance", job->archiveFile.diskInstance) .add("diskFileId", job->archiveFile.diskFileId) .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) .add("reportURL", failedValidationJobReportURL); } - const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): job does not exist in the objectstore."; + const std::string msg_error = "In ArchiveMount::reportJobsBatchWritten(): job does not exist in the objectstore."; logContext.log(cta::log::WARNING, msg_error); - } catch(const cta::exception::Exception& e){ + } catch (const cta::exception::Exception& e) { cta::log::ScopedParamContainer params(logContext); params.add("exceptionMessageValue", e.getMessageValue()); - if (job.get()) { + if (job) { params.add("fileId", job->archiveFile.archiveFileID) .add("diskInstance", job->archiveFile.diskInstance) .add("diskFileId", job->archiveFile.diskFileId) .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) .add("reportURL", failedValidationJobReportURL); } - const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception"; + const std::string msg_error = "In ArchiveMount::reportJobsBatchWritten(): got an exception"; logContext.log(cta::log::ERR, msg_error); //If validatedSuccessfulArchiveJobs has still jobs in it, it means that //the validation job->validateAndGetTapeFileWritten() failed for one job and //threw an exception. We will then have to fail all the others. - for(auto &ctaJob: validatedSuccessfulArchiveJobs){ - if(ctaJob.get()) + for (auto& ctaJob: validatedSuccessfulArchiveJobs) { + if (ctaJob) validatedSuccessfulDBArchiveJobs.emplace_back(std::move(ctaJob->m_dbJob)); } - for(auto &aj: validatedSuccessfulDBArchiveJobs){ - if(aj.get()) + for (auto& aj: validatedSuccessfulDBArchiveJobs) { + if (aj) failedToReportArchiveJobs.push(std::move(aj)); } - throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); + if (catalogue_updated) { + throw cta::ArchiveMount::FailedReportMoveToQueue(msg_error); + } + else { + throw cta::ArchiveMount::FailedReportCatalogueUpdate(msg_error); + } } catch(const std::exception& e){ cta::log::ScopedParamContainer params(logContext); params.add("exceptionWhat", e.what()); - if (job.get()) { + if (job) { params.add("fileId", job->archiveFile.archiveFileID) .add("diskInstance", job->archiveFile.diskInstance) .add("diskFileId", job->archiveFile.diskFileId) .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); } - const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an standard exception"; + const std::string msg_error = "In ArchiveMount::reportJobsBatchWritten(): got an standard exception"; logContext.log(cta::log::ERR, msg_error); - for(auto &aj: validatedSuccessfulDBArchiveJobs){ - if(aj.get()) + for (auto& aj: validatedSuccessfulDBArchiveJobs) { + if (aj) failedToReportArchiveJobs.push(std::move(aj)); } - throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); + if (catalogue_updated) { + throw cta::ArchiveMount::FailedReportMoveToQueue(msg_error); + } + else { + throw cta::ArchiveMount::FailedReportCatalogueUpdate(msg_error); + } } } @@ -292,7 +305,7 @@ void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<ct //------------------------------------------------------------------------------ void cta::ArchiveMount::complete() { // Just set the session as complete in the DB. - m_dbMount->complete(time(NULL)); + m_dbMount->complete(time(nullptr)); // and record we are done with the mount m_sessionRunning = false; } @@ -308,20 +321,19 @@ void cta::ArchiveMount::abort(const std::string& reason) { //------------------------------------------------------------------------------ // destructor //------------------------------------------------------------------------------ -cta::ArchiveMount::~ArchiveMount() throw() { -} +cta::ArchiveMount::~ArchiveMount() noexcept = default; //------------------------------------------------------------------------------ // setDriveStatus() //------------------------------------------------------------------------------ -void cta::ArchiveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, const cta::optional<std::string> & reason) { - m_dbMount->setDriveStatus(status, time(NULL), reason); +void cta::ArchiveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, const cta::optional<std::string>& reason) { + m_dbMount->setDriveStatus(status, time(nullptr), reason); } //------------------------------------------------------------------------------ // setTapeSessionStats() //------------------------------------------------------------------------------ -void cta::ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) { +void cta::ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats& stats) { m_dbMount->setTapeSessionStats(stats); } @@ -336,11 +348,11 @@ void cta::ArchiveMount::setTapeMounted(cta::log::LogContext& logContext) const { auto catalogueTime = t.secs(cta::utils::Timer::resetCounter); spc.add("catalogueTime", catalogueTime); logContext.log(log::INFO, "In ArchiveMount::setTapeMounted(): success."); - } catch (cta::exception::Exception &ex) { + } catch (cta::exception::Exception& ex) { auto catalogueTimeFailed = t.secs(cta::utils::Timer::resetCounter); spc.add("catalogueTime", catalogueTimeFailed); logContext.log(cta::log::WARNING, - "Failed to update catalogue for the tape mounted for archive."); + "Failed to update catalogue for the tape mounted for archive."); } } diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index e8390df1184100e18a6c77d1c5a8a6d5368fd614..7bcc021030ad8096422a81e02a2cb148e58e29f2 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -57,7 +57,8 @@ namespace cta { CTA_GENERATE_EXCEPTION_CLASS(WrongMountType); CTA_GENERATE_EXCEPTION_CLASS(NotImplemented); - CTA_GENERATE_EXCEPTION_CLASS(FailedMigrationRecallResult); + CTA_GENERATE_EXCEPTION_CLASS(FailedReportCatalogueUpdate); + CTA_GENERATE_EXCEPTION_CLASS(FailedReportMoveToQueue); /** * Returns The type of this tape mount. diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 59ec999cf0762c62efe731f9b525c4d0ba01f500..00f3d6726bb36be48856bff4dad8dcaee08f6d68 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -3795,13 +3795,22 @@ void OStoreDB::ArchiveMount::setJobBatchTransferred(std::list<std::unique_ptr<ct // We let the exceptions through as failing to report is fatal. auto jobsBatchItor = jobsBatch.begin(); while(jobsBatchItor != jobsBatch.end()){ + { + log::ScopedParamContainer params (lc); + params.add("tapeVid", (*jobsBatchItor)->tapeFile.vid) + .add("fileId", (*jobsBatchItor)->archiveFile.archiveFileID) + .add("requestObject", castFromSchedDBJob(jobsBatchItor->get())->m_archiveRequest.getAddressIfSet()); + lc.log(log::INFO, "In OStoreDB::ArchiveMount::setJobBatchTransferred(): received a job to be reported."); + } try { castFromSchedDBJob(jobsBatchItor->get())->asyncSucceedTransfer(); jobsBatchItor++; } catch (cta::exception::NoSuchObject &ex) { jobsBatch.erase(jobsBatchItor++); log::ScopedParamContainer params(lc); - params.add("fileId", (*jobsBatchItor)->archiveFile.archiveFileID) + params.add("tapeVid", (*jobsBatchItor)->tapeFile.vid) + .add("fileId", (*jobsBatchItor)->archiveFile.archiveFileID) + .add("requestObject", castFromSchedDBJob(jobsBatchItor->get())->m_archiveRequest.getAddressIfSet()) .add("exceptionMessage", ex.getMessageValue()); lc.log(log::WARNING, "In OStoreDB::RetrieveMount::setJobBatchTransferred(): async succeed transfer failed, job does not exist in the objectstore."); diff --git a/scheduler/testingMocks/MockArchiveMount.hpp b/scheduler/testingMocks/MockArchiveMount.hpp index 57500d210a7bd0c4daf4985584598dd2128c211d..609c19f868668f21f4a7854ad9962d24c276da79 100644 --- a/scheduler/testingMocks/MockArchiveMount.hpp +++ b/scheduler/testingMocks/MockArchiveMount.hpp @@ -45,6 +45,7 @@ namespace cta { void reportJobsBatchTransferred(std::queue<std::unique_ptr<cta::ArchiveJob> >& successfulArchiveJobs, std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, std::queue<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>>& failedToReportArchiveJobs, cta::log::LogContext& logContext) override { + bool catalogue_updated = false; try { std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten; std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; @@ -65,6 +66,7 @@ namespace cta { tapeItemsWritten.emplace(tiwup.release()); } m_catalogue.filesWrittenToTape(tapeItemsWritten); + catalogue_updated = true; for (auto &job: validatedSuccessfulArchiveJobs) { MockArchiveJob * maj = dynamic_cast<MockArchiveJob *>(job.get()); if (!maj) throw cta::exception::Exception("Wrong job type."); @@ -77,7 +79,12 @@ namespace cta { params.add("exceptionMessageValue", e.getMessageValue()); const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception"; logContext.log(cta::log::ERR, msg_error); - throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error); + if (catalogue_updated) { + throw cta::ArchiveMount::FailedReportMoveToQueue(msg_error); + } + else { + throw cta::ArchiveMount::FailedReportCatalogueUpdate(msg_error); + } } } diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp index f0cc19d62c40dbdb99da5b80ab32704c089205f1..d82d7a9c0ee7a908834462a2eb8a9bbf301f29b1 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp @@ -38,21 +38,23 @@ namespace daemon { //Constructor //------------------------------------------------------------------------------ MigrationReportPacker::MigrationReportPacker(cta::ArchiveMount *archiveMount, - cta::log::LogContext & lc): -ReportPackerInterface<detail::Migration>(lc), -m_workerThread(*this),m_errorHappened(false),m_continue(true), m_archiveMount(archiveMount) { + cta::log::LogContext& lc) : + ReportPackerInterface<detail::Migration>(lc), + m_workerThread(*this), m_errorHappened(false), m_continue(true), m_archiveMount(archiveMount) { } + //------------------------------------------------------------------------------ //Destructor //------------------------------------------------------------------------------ -MigrationReportPacker::~MigrationReportPacker(){ +MigrationReportPacker::~MigrationReportPacker() { cta::threading::MutexLocker ml(m_producterProtection); } + //------------------------------------------------------------------------------ //reportCompletedJob //------------------------------------------------------------------------------ void MigrationReportPacker::reportCompletedJob( - std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext & lc) { + std::unique_ptr<cta::ArchiveJob> successfulArchiveJob, cta::log::LogContext& lc) { std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulArchiveJob))); cta::log::ScopedParamContainer params(lc); params.add("type", "ReportSuccessful"); @@ -60,13 +62,14 @@ void MigrationReportPacker::reportCompletedJob( cta::threading::MutexLocker ml(m_producterProtection); m_fifo.push(rep.release()); } + //------------------------------------------------------------------------------ //reportSkippedJob //------------------------------------------------------------------------------ void MigrationReportPacker::reportSkippedJob(std::unique_ptr<cta::ArchiveJob> skippedArchiveJob, const std::string& failure, - cta::log::LogContext& lc) { + cta::log::LogContext& lc) { std::string failureLog = cta::utils::getCurrentLocalTime() + " " + cta::utils::getShortHostname() + - " " + failure; + " " + failure; std::unique_ptr<Report> rep(new ReportSkipped(std::move(skippedArchiveJob), failureLog)); cta::log::ScopedParamContainer params(lc); params.add("type", "ReporSkipped"); @@ -74,13 +77,14 @@ void MigrationReportPacker::reportSkippedJob(std::unique_ptr<cta::ArchiveJob> sk cta::threading::MutexLocker ml(m_producterProtection); m_fifo.push(rep.release()); } + //------------------------------------------------------------------------------ //reportFailedJob //------------------------------------------------------------------------------ void MigrationReportPacker::reportFailedJob(std::unique_ptr<cta::ArchiveJob> failedArchiveJob, - const cta::exception::Exception &ex, cta::log::LogContext & lc){ + const cta::exception::Exception& ex, cta::log::LogContext& lc) { std::string failureLog = cta::utils::getCurrentLocalTime() + " " + cta::utils::getShortHostname() + - " " + ex.getMessageValue(); + " " + ex.getMessageValue(); std::unique_ptr<Report> rep(new ReportError(std::move(failedArchiveJob), failureLog)); cta::log::ScopedParamContainer params(lc); params.add("type", "ReportError"); @@ -88,10 +92,11 @@ void MigrationReportPacker::reportFailedJob(std::unique_ptr<cta::ArchiveJob> fai cta::threading::MutexLocker ml(m_producterProtection); m_fifo.push(rep.release()); } + //------------------------------------------------------------------------------ //reportFlush //------------------------------------------------------------------------------ -void MigrationReportPacker::reportFlush(drive::compressionStats compressStats, cta::log::LogContext & lc){ +void MigrationReportPacker::reportFlush(drive::compressionStats compressStats, cta::log::LogContext& lc) { cta::log::ScopedParamContainer params(lc); params.add("type", "ReportFlush"); lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportFlush(), pushing a report."); @@ -99,10 +104,11 @@ void MigrationReportPacker::reportFlush(drive::compressionStats compressStats, c std::unique_ptr<Report> rep(new ReportFlush(compressStats)); m_fifo.push(rep.release()); } + //------------------------------------------------------------------------------ //reportTapeFull //------------------------------------------------------------------------------ -void MigrationReportPacker::reportTapeFull(cta::log::LogContext & lc){ +void MigrationReportPacker::reportTapeFull(cta::log::LogContext& lc) { cta::log::ScopedParamContainer params(lc); params.add("type", "ReportTapeFull"); lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportTapeFull(), pushing a report."); @@ -110,6 +116,7 @@ void MigrationReportPacker::reportTapeFull(cta::log::LogContext & lc){ std::unique_ptr<Report> rep(new ReportTapeFull()); m_fifo.push(rep.release()); } + //------------------------------------------------------------------------------ //reportEndOfSession //------------------------------------------------------------------------------ @@ -121,6 +128,7 @@ void MigrationReportPacker::reportEndOfSession(cta::log::LogContext & lc) { std::unique_ptr<Report> rep(new ReportEndofSession()); m_fifo.push(rep.release()); } + //------------------------------------------------------------------------------ //reportEndOfSessionWithErrors //------------------------------------------------------------------------------ @@ -129,14 +137,14 @@ void MigrationReportPacker::reportEndOfSessionWithErrors(std::string msg,int err params.add("type", "ReportEndofSessionWithErrors"); lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportEndOfSessionWithErrors(), pushing a report."); cta::threading::MutexLocker ml(m_producterProtection); - std::unique_ptr<Report> rep(new ReportEndofSessionWithErrors(msg,errorCode)); + std::unique_ptr<Report> rep(new ReportEndofSessionWithErrors(msg, errorCode)); m_fifo.push(rep.release()); } //------------------------------------------------------------------------------ //reportTestGoingToEnd //------------------------------------------------------------------------------ -void MigrationReportPacker::reportTestGoingToEnd(cta::log::LogContext & lc){ +void MigrationReportPacker::reportTestGoingToEnd(cta::log::LogContext& lc) { cta::log::ScopedParamContainer params(lc); params.add("type", "ReportTestGoingToEnd"); lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportTestGoingToEnd(), pushing a report."); @@ -152,36 +160,37 @@ void MigrationReportPacker::synchronousReportEndWithErrors(const std::string msg cta::log::ScopedParamContainer params(lc); params.add("type", "ReportEndofSessionWithErrors"); lc.log(cta::log::DEBUG, "In MigrationReportPacker::synchronousReportEndWithErrors(), reporting asynchronously session complete."); - m_continue=false; + m_continue = false; m_archiveMount->complete(); - if(m_errorHappened) { + if (m_errorHappened) { cta::log::ScopedParamContainer sp(lc); sp.add("errorMessage", msg) .add("errorCode", errorCode); - lc.log(cta::log::INFO,"Reported end of session with error to client after sending file errors"); - } else{ - const std::string& msg ="Reported end of session with error to client"; + lc.log(cta::log::INFO, "Reported end of session with error to client after sending file errors"); + } + else { + const std::string& msg = "Reported end of session with error to client"; // As a measure of safety we censor any session error which is not ENOSPC into // Meaningless 666 (used to be SEINTERNAL in CASTOR). ENOSPC is the only one interpreted by the tape gateway. if (ENOSPC != errorCode) { errorCode = 666; } - lc.log(cta::log::INFO,msg); + lc.log(cta::log::INFO, msg); } - if(m_watchdog) { + if (m_watchdog) { m_watchdog->addParameter(cta::log::Param("status", - ENOSPC == errorCode?"success":"failure")); + ENOSPC == errorCode ? "success" : "failure")); // We have a race condition here between the processing of this message by // the initial process and the printing of the end-of-session log, triggered // by the end our process. To delay the latter, we sleep half a second here. - usleep(500*1000); + usleep(500 * 1000); } } //------------------------------------------------------------------------------ //ReportSuccessful::execute //------------------------------------------------------------------------------ -void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& reportPacker){ +void MigrationReportPacker::ReportSuccessful::execute(MigrationReportPacker& reportPacker) { reportPacker.m_successfulArchiveJobs.push(std::move(m_successfulArchiveJob)); } @@ -193,11 +202,11 @@ void MigrationReportPacker::ReportSkipped::execute(MigrationReportPacker& report { cta::log::ScopedParamContainer params(reportPacker.m_lc); params.add("failureLog", m_failureLog) - .add("fileSize",m_skippedArchiveJob->archiveFile.fileSize) + .add("fileSize", m_skippedArchiveJob->archiveFile.fileSize) .add("fileId", m_skippedArchiveJob->archiveFile.archiveFileID); m_skippedArchiveJob->archiveFile.checksumBlob.addFirstChecksumToLog(params); - reportPacker.m_lc.log(cta::log::DEBUG,"In MigrationReportPacker::ReportSkipped::execute(): skipping archive job after exception."); + reportPacker.m_lc.log(cta::log::DEBUG, "In MigrationReportPacker::ReportSkipped::execute(): skipping archive job after exception."); } try { m_skippedArchiveJob->transferFailed(m_failureLog, reportPacker.m_lc); @@ -205,16 +214,18 @@ void MigrationReportPacker::ReportSkipped::execute(MigrationReportPacker& report cta::log::ScopedParamContainer params(reportPacker.m_lc); params.add("ExceptionMSG", ex.getMessageValue()) .add("fileId", m_skippedArchiveJob->archiveFile.archiveFileID); - reportPacker.m_lc.log(cta::log::WARNING,"In MigrationReportPacker::ReportSkipped::execute(): call to m_failedArchiveJob->failed(), job does not exist in the objectstore."); - } catch (cta::exception::Exception & ex) { + reportPacker.m_lc.log(cta::log::WARNING, + "In MigrationReportPacker::ReportSkipped::execute(): call to m_failedArchiveJob->failed(), job does not exist in the objectstore."); + } catch (cta::exception::Exception& ex) { cta::log::ScopedParamContainer params(reportPacker.m_lc); params.add("ExceptionMSG", ex.getMessageValue()) .add("fileId", m_skippedArchiveJob->archiveFile.archiveFileID); - reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportSkipped::execute(): call to m_failedArchiveJob->failed() threw an exception."); + reportPacker.m_lc.log(cta::log::ERR, + "In MigrationReportPacker::ReportSkipped::execute(): call to m_failedArchiveJob->failed() threw an exception."); reportPacker.m_lc.logBacktrace(cta::log::ERR, ex.backtrace()); } reportPacker.m_skippedFiles.push(cta::catalogue::TapeItemWritten()); - auto & tapeItem = reportPacker.m_skippedFiles.back(); + auto& tapeItem = reportPacker.m_skippedFiles.back(); tapeItem.fSeq = m_skippedArchiveJob->tapeFile.fSeq; tapeItem.tapeDrive = reportPacker.m_archiveMount->getDrive(); tapeItem.vid = m_skippedArchiveJob->tapeFile.vid; @@ -223,95 +234,136 @@ void MigrationReportPacker::ReportSkipped::execute(MigrationReportPacker& report //------------------------------------------------------------------------------ //reportDriveStatus //------------------------------------------------------------------------------ -void MigrationReportPacker::reportDriveStatus(cta::common::dataStructures::DriveStatus status,const cta::optional<std::string> & reason, cta::log::LogContext & lc) { +void MigrationReportPacker::reportDriveStatus(cta::common::dataStructures::DriveStatus status, const cta::optional<std::string>& reason, + cta::log::LogContext& lc) { cta::log::ScopedParamContainer params(lc); params.add("type", "ReportDriveStatus") .add("Status", cta::common::dataStructures::toString(status)); lc.log(cta::log::DEBUG, "In MigrationReportPacker::reportDriveStatus(), pushing a report."); cta::threading::MutexLocker ml(m_producterProtection); - m_fifo.push(new ReportDriveStatus(status,reason)); + m_fifo.push(new ReportDriveStatus(status, reason)); } //------------------------------------------------------------------------------ //ReportDriveStatus::execute //------------------------------------------------------------------------------ -void MigrationReportPacker::ReportDriveStatus::execute(MigrationReportPacker& parent){ +void MigrationReportPacker::ReportDriveStatus::execute(MigrationReportPacker& parent) { cta::log::ScopedParamContainer params(parent.m_lc); params.add("status", cta::common::dataStructures::toString(m_status)); parent.m_lc.log(cta::log::DEBUG, "In MigrationReportPacker::ReportDriveStatus::execute(): reporting drive status."); - parent.m_archiveMount->setDriveStatus(m_status,m_reason); + parent.m_archiveMount->setDriveStatus(m_status, m_reason); } //------------------------------------------------------------------------------ //ReportFlush::execute //------------------------------------------------------------------------------ -void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPacker){ - if(!reportPacker.m_errorHappened){ +void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPacker) { + if (!reportPacker.m_errorHappened) { // We can receive double flushes when the periodic flush happens // right before the end of session (which triggers also a flush) // We refrain from sending an empty report to the client in this case. if (reportPacker.m_successfulArchiveJobs.empty() && reportPacker.m_skippedFiles.empty()) { - reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing."); + reportPacker.m_lc.log(cta::log::INFO, "Received a flush report from tape, but had no file to report to client. Doing nothing."); return; } std::queue<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> failedToReportArchiveJobs; try{ reportPacker.m_archiveMount->reportJobsBatchTransferred(reportPacker.m_successfulArchiveJobs, reportPacker.m_skippedFiles, failedToReportArchiveJobs, reportPacker.m_lc); - } catch(const cta::ArchiveMount::FailedMigrationRecallResult &ex){ + } catch(const cta::ArchiveMount::FailedReportCatalogueUpdate &ex){ while(!failedToReportArchiveJobs.empty()){ auto archiveJob = std::move(failedToReportArchiveJobs.front()); - try{ - archiveJob->failTransfer(ex.getMessageValue(),reportPacker.m_lc); - } catch(const cta::exception::Exception &ex2) { + try { + archiveJob->failTransfer(ex.getMessageValue(), reportPacker.m_lc); + } + catch (const cta::exception::NoSuchObject& nso_ex) { + cta::log::ScopedParamContainer params(reportPacker.m_lc); + params.add("fileId", archiveJob->archiveFile.archiveFileID) + .add("latestError", archiveJob->latestError) + .add("exceptionMSG", ex.getMessageValue()); + reportPacker.m_lc.log(cta::log::WARNING, + "In MigrationReportPacker::ReportFlush::execute(): failed to failTransfer for the archive job because it does not exist in the objectstore."); + } + catch (const cta::exception::Exception& cta_ex) { //If the failTransfer method fails, we can't do anything about it + cta::log::ScopedParamContainer params(reportPacker.m_lc); + params.add("fileId", archiveJob->archiveFile.archiveFileID) + .add("latestError", archiveJob->latestError) + .add("exceptionMSG", ex.getMessageValue()); + reportPacker.m_lc.log(cta::log::WARNING, "In MigrationReportPacker::ReportFlush::execute(): failed to failTransfer for the archive job because of CTA exception."); + } + failedToReportArchiveJobs.pop(); + } + throw ex; + } catch(const cta::ArchiveMount::FailedReportMoveToQueue &ex){ + while(!failedToReportArchiveJobs.empty()){ + auto archiveJob = std::move(failedToReportArchiveJobs.front()); + try{ + archiveJob->failReport(ex.getMessageValue(),reportPacker.m_lc); + } + catch (const cta::exception::NoSuchObject& nso_ex) { + cta::log::ScopedParamContainer params(reportPacker.m_lc); + params.add("fileId", archiveJob->archiveFile.archiveFileID) + .add("latestError", archiveJob->latestError) + .add("exceptionMSG", ex.getMessageValue()); + reportPacker.m_lc.log(cta::log::WARNING, + "In MigrationReportPacker::ReportFlush::execute(): failed to failReport for the archive job because it does not exist in the objectstore."); + } + catch(const cta::exception::Exception& cta_ex) { + //If the failReport method fails, we can't do anything about it + cta::log::ScopedParamContainer params(reportPacker.m_lc); + params.add("fileId", archiveJob->archiveFile.archiveFileID) + .add("latestError", archiveJob->latestError) + .add("exceptionMSG", ex.getMessageValue()); + reportPacker.m_lc.log(cta::log::WARNING, "In MigrationReportPacker::ReportFlush::execute(): failed to failReport for the archive job because of CTA exception."); } failedToReportArchiveJobs.pop(); } throw ex; } - } else { + } + else { // This is an abnormal situation: we should never flush after an error! - reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client"); + reportPacker.m_lc.log(cta::log::ALERT, "Received a flush after an error: sending file errors to client"); } } //------------------------------------------------------------------------------ //reportTapeFull()::execute //------------------------------------------------------------------------------ -void MigrationReportPacker::ReportTapeFull::execute(MigrationReportPacker& reportPacker){ +void MigrationReportPacker::ReportTapeFull::execute(MigrationReportPacker& reportPacker) { reportPacker.m_archiveMount->setTapeFull(); } //------------------------------------------------------------------------------ //ReportEndofSession::execute //------------------------------------------------------------------------------ -void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& reportPacker){ - reportPacker.m_continue=false; +void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& reportPacker) { + reportPacker.m_continue = false; reportPacker.m_lc.log(cta::log::DEBUG, "In MigrationReportPacker::ReportEndofSession::execute(): reporting session complete."); reportPacker.m_archiveMount->complete(); - if(!reportPacker.m_errorHappened){ + if (!reportPacker.m_errorHappened) { cta::log::ScopedParamContainer sp(reportPacker.m_lc); - reportPacker.m_lc.log(cta::log::INFO,"Reported end of session to client"); - if(reportPacker.m_watchdog) { - reportPacker.m_watchdog->addParameter(cta::log::Param("status","success")); + reportPacker.m_lc.log(cta::log::INFO, "Reported end of session to client"); + if (reportPacker.m_watchdog) { + reportPacker.m_watchdog->addParameter(cta::log::Param("status", "success")); // We have a race condition here between the processing of this message by // the initial process and the printing of the end-of-session log, triggered // by the end our process. To delay the latter, we sleep half a second here. - usleep(500*1000); + usleep(500 * 1000); } } else { // We have some errors cta::log::ScopedParamContainer sp(reportPacker.m_lc); sp.add("errorMessage", "Previous file errors"); - reportPacker.m_lc.log(cta::log::ERR,"Reported end of session with error to client due to previous file errors"); - if(reportPacker.m_watchdog) { - reportPacker.m_watchdog->addParameter(cta::log::Param("status","failure")); + reportPacker.m_lc.log(cta::log::ERR, "Reported end of session with error to client due to previous file errors"); + if (reportPacker.m_watchdog) { + reportPacker.m_watchdog->addParameter(cta::log::Param("status", "failure")); // We have a race condition here between the processing of this message by // the initial process and the printing of the end-of-session log, triggered // by the end our process. To delay the latter, we sleep half a second here. - usleep(500*1000); + usleep(500 * 1000); } } } @@ -319,43 +371,45 @@ void MigrationReportPacker::ReportEndofSession::execute(MigrationReportPacker& r //------------------------------------------------------------------------------ //ReportEndofSessionWithErrors::execute //------------------------------------------------------------------------------ -void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& reportPacker){ - reportPacker.m_continue=false; +void MigrationReportPacker::ReportEndofSessionWithErrors::execute(MigrationReportPacker& reportPacker) { + reportPacker.m_continue = false; reportPacker.m_lc.log(cta::log::DEBUG, "In MigrationReportPacker::ReportEndofSessionWithErrors::execute(): reporting session complete."); reportPacker.m_archiveMount->complete(); - if(reportPacker.m_errorHappened) { + if (reportPacker.m_errorHappened) { cta::log::ScopedParamContainer sp(reportPacker.m_lc); sp.add("errorMessage", m_message) .add("errorCode", m_errorCode); - reportPacker.m_lc.log(cta::log::INFO,"Reported end of session with error to client after sending file errors"); - } else{ - const std::string& msg ="Reported end of session with error to client"; + reportPacker.m_lc.log(cta::log::INFO, "Reported end of session with error to client after sending file errors"); + } + else { + const std::string& msg = "Reported end of session with error to client"; // As a measure of safety we censor any session error which is not ENOSPC into // SEINTERNAL. ENOSPC is the only one interpreted by the tape gateway. if (ENOSPC != m_errorCode) { m_errorCode = 666; } - reportPacker.m_lc.log(cta::log::INFO,msg); + reportPacker.m_lc.log(cta::log::INFO, msg); } - if(reportPacker.m_watchdog) { + if (reportPacker.m_watchdog) { reportPacker.m_watchdog->addParameter(cta::log::Param("status", - ENOSPC == m_errorCode?"success":"failure")); + ENOSPC == m_errorCode ? "success" : "failure")); // We have a race condition here between the processing of this message by // the initial process and the printing of the end-of-session log, triggered // by the end our process. To delay the latter, we sleep half a second here. - usleep(500*1000); + usleep(500 * 1000); } } + //------------------------------------------------------------------------------ //ReportError::execute //------------------------------------------------------------------------------ -void MigrationReportPacker::ReportError::execute(MigrationReportPacker& reportPacker){ - reportPacker.m_errorHappened=true; +void MigrationReportPacker::ReportError::execute(MigrationReportPacker& reportPacker) { + reportPacker.m_errorHappened = true; { cta::log::ScopedParamContainer params(reportPacker.m_lc); params.add("failureLog", m_failureLog) .add("fileId", m_failedArchiveJob->archiveFile.archiveFileID); - reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportError::execute(): failing archive job after exception."); + reportPacker.m_lc.log(cta::log::ERR, "In MigrationReportPacker::ReportError::execute(): failing archive job after exception."); } try { m_failedArchiveJob->transferFailed(m_failureLog, reportPacker.m_lc); @@ -369,7 +423,8 @@ void MigrationReportPacker::ReportError::execute(MigrationReportPacker& reportPa cta::log::ScopedParamContainer params(reportPacker.m_lc); params.add("ExceptionMSG", ex.getMessageValue()) .add("fileId", m_failedArchiveJob->archiveFile.archiveFileID); - reportPacker.m_lc.log(cta::log::ERR,"In MigrationReportPacker::ReportError::execute(): call to m_failedArchiveJob->failed() threw an exception."); + reportPacker.m_lc.log(cta::log::ERR, + "In MigrationReportPacker::ReportError::execute(): call to m_failedArchiveJob->failed() threw an exception."); reportPacker.m_lc.logBacktrace(cta::log::ERR, ex.backtrace()); } } @@ -377,34 +432,36 @@ void MigrationReportPacker::ReportError::execute(MigrationReportPacker& reportPa //------------------------------------------------------------------------------ //WorkerThread::WorkerThread //------------------------------------------------------------------------------ -MigrationReportPacker::WorkerThread::WorkerThread(MigrationReportPacker& parent): -m_parent(parent) { +MigrationReportPacker::WorkerThread::WorkerThread(MigrationReportPacker& parent) : + m_parent(parent) { } + //------------------------------------------------------------------------------ //WorkerThread::run //------------------------------------------------------------------------------ -void MigrationReportPacker::WorkerThread::run(){ +void MigrationReportPacker::WorkerThread::run() { // Create our own log context for the new thread. cta::log::LogContext lc = m_parent.m_lc; lc.pushOrReplace(cta::log::Param("thread", "ReportPacker")); - try{ - while(m_parent.m_continue) { - std::unique_ptr<Report> rep (m_parent.m_fifo.pop()); + try { + while (m_parent.m_continue) { + std::unique_ptr<Report> rep(m_parent.m_fifo.pop()); { cta::log::ScopedParamContainer params(lc); int demangleStatus; - char * demangledReportType = abi::__cxa_demangle(typeid(*rep.get()).name(), nullptr, nullptr, &demangleStatus); + char *demangledReportType = abi::__cxa_demangle(typeid(*rep.get()).name(), nullptr, nullptr, &demangleStatus); if (!demangleStatus) { params.add("typeId", demangledReportType); - } else { + } + else { params.add("typeId", typeid(*rep.get()).name()); } free(demangledReportType); - lc.log(cta::log::DEBUG,"In MigrationReportPacker::WorkerThread::run(): Got a new report."); + lc.log(cta::log::DEBUG, "In MigrationReportPacker::WorkerThread::run(): Got a new report."); } rep->execute(m_parent); } - } catch(const cta::exception::Exception& e){ + } catch (const cta::exception::Exception& e) { //we get there because to tried to close the connection and it failed //either from the catch a few lines above or directly from rep->execute cta::log::ScopedParamContainer params(lc); @@ -412,50 +469,55 @@ void MigrationReportPacker::WorkerThread::run(){ lc.log(cta::log::ERR, "In MigrationPacker::run(): Received a CTA exception while reporting archive mount results."); if (m_parent.m_watchdog) { m_parent.m_watchdog->addToErrorCount("Error_reporting"); - m_parent.m_watchdog->addParameter(cta::log::Param("status","failure")); + m_parent.m_watchdog->addParameter(cta::log::Param("status", "failure")); } - } catch(const std::exception& e){ + } catch (const std::exception& e) { //we get there because to tried to close the connection and it failed //either from the catch a few lines above or directly from rep->execute cta::log::ScopedParamContainer params(lc); params.add("exceptionMSG", e.what()); int demangleStatus; - char * demangleExceptionType = abi::__cxa_demangle(typeid(e).name(), nullptr, nullptr, &demangleStatus); + char *demangleExceptionType = abi::__cxa_demangle(typeid(e).name(), nullptr, nullptr, &demangleStatus); if (!demangleStatus) { params.add("exceptionType", demangleExceptionType); - } else { + } + else { params.add("exceptionType", typeid(e).name()); } lc.log(cta::log::ERR, "In MigrationPacker::run(): Received a standard exception while reporting archive mount results."); if (m_parent.m_watchdog) { m_parent.m_watchdog->addToErrorCount("Error_reporting"); - m_parent.m_watchdog->addParameter(cta::log::Param("status","failure")); + m_parent.m_watchdog->addParameter(cta::log::Param("status", "failure")); } - } catch(...){ + } catch (...) { //we get there because to tried to close the connection and it failed //either from the catch a few lines above or directly from rep->execute lc.log(cta::log::ERR, "In MigrationPacker::run(): Received an unknown exception while reporting archive mount results."); if (m_parent.m_watchdog) { m_parent.m_watchdog->addToErrorCount("Error_reporting"); - m_parent.m_watchdog->addParameter(cta::log::Param("status","failure")); + m_parent.m_watchdog->addParameter(cta::log::Param("status", "failure")); } } // Drain the FIFO if necessary. We know that m_continue will be // set by ReportEndofSessionWithErrors or ReportEndofSession // TODO devise a more generic mechanism - while(m_parent.m_fifo.size()) { - std::unique_ptr<Report> rep (m_parent.m_fifo.pop()); + while (m_parent.m_fifo.size()) { + std::unique_ptr<Report> rep(m_parent.m_fifo.pop()); cta::log::ScopedParamContainer params(lc); int demangleStatus; - char * demangledReportType = abi::__cxa_demangle(typeid(*rep.get()).name(), nullptr, nullptr, &demangleStatus); + char *demangledReportType = abi::__cxa_demangle(typeid(*rep.get()).name(), nullptr, nullptr, &demangleStatus); if (!demangleStatus) { params.add("typeId", demangledReportType); - } else { + } + else { params.add("typeId", typeid(*rep.get()).name()); } free(demangledReportType); - lc.log(cta::log::DEBUG,"In MigrationReportPacker::WorkerThread::run(): Draining leftover."); + lc.log(cta::log::DEBUG, "In MigrationReportPacker::WorkerThread::run(): Draining leftover."); } } -}}}} +} +} +} +}