diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index da4c9117a74d66982cc7f8ce592978db93c0f945..c9aa596fa865287ec10cebb902a6ec9c83c1c8d2 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -3958,177 +3958,100 @@ objectstore::ArchiveRequest::RepackInfo OStoreDB::ArchiveJob::getRepackInfoAfter // OStoreDB::RepackArchiveSuccessesReportBatch::report() //------------------------------------------------------------------------------ void OStoreDB::RepackArchiveSuccessesReportBatch::report(log::LogContext& lc) { - // We have a batch of popped jobs to report. We will first record them in the repack requests (update statistics), - // and then either mark them as complete (if any sibling jobs will still require processing) or - // simply remove the request. - // Repack request will be filpped from running to successsful (or failed) if we process the last job. - utils::Timer t; - log::TimingList timingList; - - // 1) Update statistics. As the repack request is protected against double reporting, we can release its lock - // before the next (deletions). - { - // Prepare the report - objectstore::RepackRequest::SubrequestStatistics::List ssl; - for (auto &sri: m_subrequestList) { - ssl.push_back(objectstore::RepackRequest::SubrequestStatistics()); - ssl.back().bytes = sri.archiveFile.fileSize; - ssl.back().files = 1; - ssl.back().fSeq = sri.repackInfo.fSeq; - ssl.back().copyNb = sri.archivedCopyNb; - for(auto &j: sri.archiveJobsStatusMap){ - if(j.first != sri.archivedCopyNb && - (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Complete) && - (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Failed)){ - break; - } else { - ssl.back().subrequestDeleted = true; - } - } - } - // Record it. - timingList.insertAndReset("successStatsPrepareTime", t); - objectstore::ScopedExclusiveLock rrl(m_repackRequest); - timingList.insertAndReset("successStatsLockTime", t); - m_repackRequest.fetch(); - timingList.insertAndReset("successStatsFetchTime", t); - m_repackRequest.reportArchiveSuccesses(ssl); - timingList.insertAndReset("successStatsUpdateTime", t); - m_repackRequest.commit(); - timingList.insertAndReset("successStatsCommitTime", t); - } - - // 2) For each job, determine if sibling jobs are complete or not. If so, delete, else just update status and set empty owner. - struct Deleters { - std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter> deleter; - RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> & subrequestInfo; - typedef std::list<Deleters> List; - }; - struct JobOwnerUpdaters { - std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> jobOwnerUpdater; - RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> & subrequestInfo; - typedef std::list<JobOwnerUpdaters> List; - }; - Deleters::List deletersList; - JobOwnerUpdaters::List jobOwnerUpdatersList; + OStoreDB::RepackArchiveReportBatch::report(lc); +} + +//------------------------------------------------------------------------------ +// OStoreDB::RepackArchiveSuccessesReportBatch::recordReport() +//------------------------------------------------------------------------------ +void OStoreDB::RepackArchiveSuccessesReportBatch::recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t){ + timingList.insertAndReset("successStatsPrepareTime", t); + objectstore::ScopedExclusiveLock rrl(m_repackRequest); + timingList.insertAndReset("successStatsLockTime", t); + m_repackRequest.fetch(); + timingList.insertAndReset("successStatsFetchTime", t); + m_repackRequest.reportArchiveSuccesses(ssl); + timingList.insertAndReset("successStatsUpdateTime", t); + m_repackRequest.commit(); + timingList.insertAndReset("successStatsCommitTime", t); +} + +//------------------------------------------------------------------------------ +// OStoreDB::RepackArchiveSuccessesReportBatch::getNewStatus() +//------------------------------------------------------------------------------ +serializers::ArchiveJobStatus OStoreDB::RepackArchiveSuccessesReportBatch::getNewStatus(){ + return serializers::ArchiveJobStatus::AJS_Complete; +} + +//------------------------------------------------------------------------------ +// OStoreDB::RepackArchiveFailureReportBatch::report() +//------------------------------------------------------------------------------ +void OStoreDB::RepackArchiveFailureReportBatch::report(log::LogContext& lc){ + OStoreDB::RepackArchiveReportBatch::report(lc); +} + +//------------------------------------------------------------------------------ +// OStoreDB::RepackArchiveFailureReportBatch::recordReport() +//------------------------------------------------------------------------------ +void OStoreDB::RepackArchiveFailureReportBatch::recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t){ + timingList.insertAndReset("failureStatsPrepareTime", t); + objectstore::ScopedExclusiveLock rrl(m_repackRequest); + timingList.insertAndReset("failureStatsLockTime", t); + m_repackRequest.fetch(); + timingList.insertAndReset("failureStatsFetchTime", t); + m_repackRequest.reportArchiveFailures(ssl); + timingList.insertAndReset("failureStatsUpdateTime", t); + m_repackRequest.commit(); + timingList.insertAndReset("failureStatsCommitTime", t); +} + +//------------------------------------------------------------------------------ +// OStoreDB::RepackArchiveFailureReportBatch::getNewStatus() +//------------------------------------------------------------------------------ +serializers::ArchiveJobStatus OStoreDB::RepackArchiveFailureReportBatch::getNewStatus(){ + return serializers::ArchiveJobStatus::AJS_Failed; +} + +//------------------------------------------------------------------------------ +// OStoreDB::RepackArchiveReportBatch::prepareReport() +//------------------------------------------------------------------------------ +objectstore::RepackRequest::SubrequestStatistics::List OStoreDB::RepackArchiveReportBatch::prepareReport() { + objectstore::RepackRequest::SubrequestStatistics::List ssl; for (auto &sri: m_subrequestList) { - bool moreJobsToDo = false; - for (auto &j: sri.archiveJobsStatusMap) { - if ((j.first != sri.archivedCopyNb) && - (j.second != serializers::ArchiveJobStatus::AJS_Complete) && - (j.second != serializers::ArchiveJobStatus::AJS_Failed)) { - moreJobsToDo = true; + ssl.push_back(objectstore::RepackRequest::SubrequestStatistics()); + ssl.back().bytes = sri.archiveFile.fileSize; + ssl.back().files = 1; + ssl.back().fSeq = sri.repackInfo.fSeq; + ssl.back().copyNb = sri.archivedCopyNb; + for(auto &j: sri.archiveJobsStatusMap){ + if(j.first != sri.archivedCopyNb && + (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Complete) && + (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Failed)){ break; + } else { + ssl.back().subrequestDeleted = true; } } - objectstore::ArchiveRequest & ar = *sri.subrequest; - if (moreJobsToDo) { - try { - jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> ( - ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(), - serializers::ArchiveJobStatus::AJS_Complete)), - sri}); - } catch (cta::exception::Exception & ex) { - // Log the error - log::ScopedParamContainer params(lc); - params.add("fileId", sri.archiveFile.archiveFileID) - .add("subrequestAddress", sri.subrequest->getAddressIfSet()) - .add("exceptionMsg", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): failed to asyncUpdateJobOwner()"); - } - } else { - try { - deletersList.push_back({std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter>(ar.asyncDeleteRequest()), sri}); - } catch (cta::exception::Exception & ex) { - // Log the error - log::ScopedParamContainer params(lc); - params.add("fileId", sri.archiveFile.archiveFileID) - .add("subrequestAddress", sri.subrequest->getAddressIfSet()) - .add("exceptionMsg", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): failed to asyncDelete()"); - } - } - } - timingList.insertAndReset("asyncUpdateOrDeleteLaunchTime", t); - for (auto & d: deletersList) { - try { - d.deleter->wait(); - log::ScopedParamContainer params(lc); - params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID) - .add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet()); - lc.log(log::INFO, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): deleted request."); - } catch (cta::exception::Exception & ex) { - // Log the error - log::ScopedParamContainer params(lc); - params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID) - .add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet()) - .add("exceptionMsg", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): async deletion failed."); - } } - for (auto & jou: jobOwnerUpdatersList) { - try { - jou.jobOwnerUpdater->wait(); - log::ScopedParamContainer params(lc); - params.add("fileId", jou.subrequestInfo.archiveFile.archiveFileID) - .add("subrequestAddress", jou.subrequestInfo.subrequest->getAddressIfSet()); - lc.log(log::INFO, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): async updated job."); - } catch (cta::exception::Exception & ex) { - // Log the error - log::ScopedParamContainer params(lc); - params.add("fileId", jou.subrequestInfo.archiveFile.archiveFileID) - .add("subrequestAddress", jou.subrequestInfo.subrequest->getAddressIfSet()) - .add("exceptionMsg", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): async job update."); - } - } - timingList.insertAndReset("asyncUpdateOrDeleteCompletionTime", t); - // 3) Just remove all jobs from ownership - std::list<std::string> jobsToUnown; - for (auto sri: m_subrequestList) jobsToUnown.push_back(sri.subrequest->getAddressIfSet()); - m_oStoreDb.m_agentReference->removeBatchFromOwnership(jobsToUnown, m_oStoreDb.m_objectStore); - timingList.insertAndReset("ownershipRemoval", t); - log::ScopedParamContainer params(lc); - timingList.addToLog(params); - lc.log(log::INFO, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): reported a batch of jobs."); + return ssl; } -void OStoreDB::RepackArchiveFailureReportBatch::report(log::LogContext& lc){ +//------------------------------------------------------------------------------ +// OStoreDB::RepackArchiveReportBatch::report() +//------------------------------------------------------------------------------ +void OStoreDB::RepackArchiveReportBatch::report(log::LogContext& lc){ + // We have a batch of popped jobs to report. We will first record them in the repack requests (update statistics), + // and then either mark them as complete (if any sibling jobs will still require processing) or + // simply remove the request. + // Repack request will be filpped from running to successsful (or failed) if we process the last job. utils::Timer t; log::TimingList timingList; // 1) Update statistics. As the repack request is protected against double reporting, we can release its lock // before the next (deletions). - { - // Prepare the report - objectstore::RepackRequest::SubrequestStatistics::List ssl; - for (auto &sri: m_subrequestList) { - ssl.push_back(objectstore::RepackRequest::SubrequestStatistics()); - ssl.back().bytes = sri.archiveFile.fileSize; - ssl.back().files = 1; - ssl.back().fSeq = sri.repackInfo.fSeq; - ssl.back().copyNb = sri.archivedCopyNb; - for(auto &j: sri.archiveJobsStatusMap){ - if(j.first != sri.archivedCopyNb && - (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Complete) && - (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Failed)){ - break; - } else { - ssl.back().subrequestDeleted = true; - } - } - } - // Record it. - timingList.insertAndReset("failureStatsPrepareTime", t); - objectstore::ScopedExclusiveLock rrl(m_repackRequest); - timingList.insertAndReset("failureStatsLockTime", t); - m_repackRequest.fetch(); - timingList.insertAndReset("failureStatsFetchTime", t); - m_repackRequest.reportArchiveFailures(ssl); - timingList.insertAndReset("failureStatsUpdateTime", t); - m_repackRequest.commit(); - timingList.insertAndReset("failureStatsCommitTime", t); - } + objectstore::RepackRequest::SubrequestStatistics::List statistics = prepareReport(); + recordReport(statistics,timingList,t); + // 2) For each job, determine if sibling jobs are complete or not. If so, delete, else just update status and set empty owner. struct Deleters { std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter> deleter; @@ -4157,7 +4080,7 @@ void OStoreDB::RepackArchiveFailureReportBatch::report(log::LogContext& lc){ try { jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> ( ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(), - serializers::ArchiveJobStatus::AJS_Failed)), + getNewStatus())), sri}); } catch (cta::exception::Exception & ex) { // Log the error @@ -4165,7 +4088,7 @@ void OStoreDB::RepackArchiveFailureReportBatch::report(log::LogContext& lc){ params.add("fileId", sri.archiveFile.archiveFileID) .add("subrequestAddress", sri.subrequest->getAddressIfSet()) .add("exceptionMsg", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): failed to asyncUpdateJobOwner()"); + lc.log(log::ERR, "In OStoreDB::RepackArchiveReportBatch::report(): failed to asyncUpdateJobOwner()"); } } else { try { @@ -4176,7 +4099,7 @@ void OStoreDB::RepackArchiveFailureReportBatch::report(log::LogContext& lc){ params.add("fileId", sri.archiveFile.archiveFileID) .add("subrequestAddress", sri.subrequest->getAddressIfSet()) .add("exceptionMsg", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): failed to asyncDelete()"); + lc.log(log::ERR, "In OStoreDB::RepackArchiveReportBatch::report(): failed to asyncDelete()"); } } } @@ -4187,14 +4110,14 @@ void OStoreDB::RepackArchiveFailureReportBatch::report(log::LogContext& lc){ log::ScopedParamContainer params(lc); params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID) .add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet()); - lc.log(log::INFO, "In OStoreDB::RepackArchiveFailureReportBatch::report(): deleted request."); + lc.log(log::INFO, "In OStoreDB::RepackArchiveReportBatch::report(): deleted request."); } catch (cta::exception::Exception & ex) { // Log the error log::ScopedParamContainer params(lc); params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID) .add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet()) .add("exceptionMsg", ex.getMessageValue()); - lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): async deletion failed."); + lc.log(log::ERR, "In OStoreDB::RepackArchiveReportBatch::report(): async deletion failed."); } } for (auto & jou: jobOwnerUpdatersList) { @@ -4221,7 +4144,7 @@ void OStoreDB::RepackArchiveFailureReportBatch::report(log::LogContext& lc){ timingList.insertAndReset("ownershipRemoval", t); log::ScopedParamContainer params(lc); timingList.addToLog(params); - lc.log(log::INFO, "In OStoreDB::RepackArchiveFailureReportBatch::report(): reported a batch of jobs."); + lc.log(log::INFO, "In OStoreDB::RepackArchiveReportBatch::report(): reported a batch of jobs."); } //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 2a40ccb030a7295749863194703689c61342a49b..35ce5351f706ebdf9f7e969abbc7ec49892115dc 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -396,6 +396,7 @@ public: class RepackRetrieveSuccessesReportBatch; class RepackRetrieveFailureReportBatch; + class RepackArchiveReportBatch; class RepackArchiveSuccessesReportBatch; class RepackArchiveFailureReportBatch; friend class RepackRetrieveSuccessesReportBatch; @@ -452,27 +453,45 @@ public: SubrequestInfo::List m_subrequestList; }; - class RepackArchiveSuccessesReportBatch: public RepackReportBatch { + /** + * Super class that holds the common code for the reporting + * of Archive successes and failures + */ + class RepackArchiveReportBatch: public RepackReportBatch{ + friend class OStoreDB; + protected: + typedef RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> SubrequestInfo; + SubrequestInfo::List m_subrequestList; + public: + RepackArchiveReportBatch(objectstore::Backend & backend, OStoreDB & oStoreDb):RepackReportBatch(backend,oStoreDb){} + void report(log::LogContext &lc); + private: + objectstore::RepackRequest::SubrequestStatistics::List prepareReport(); + virtual void recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t) = 0; + virtual cta::objectstore::serializers::ArchiveJobStatus getNewStatus() = 0; + }; + + class RepackArchiveSuccessesReportBatch: public RepackArchiveReportBatch { friend class OStoreDB; RepackArchiveSuccessesReportBatch(objectstore::Backend & backend, OStoreDB & oStoreDb): - RepackReportBatch(backend,oStoreDb) {} + RepackArchiveReportBatch(backend,oStoreDb) {} public: void report(log::LogContext& lc) override; private: - typedef RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> SubrequestInfo; - SubrequestInfo::List m_subrequestList; + void recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t) override; + cta::objectstore::serializers::ArchiveJobStatus getNewStatus() override; }; - class RepackArchiveFailureReportBatch: public RepackReportBatch { + class RepackArchiveFailureReportBatch: public RepackArchiveReportBatch { friend class OStoreDB; RepackArchiveFailureReportBatch(objectstore::Backend & backend, OStoreDB & oStoreDb): - RepackReportBatch(backend,oStoreDb) {} - public: - void report(log::LogContext& lc) override; - private: - typedef RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> SubrequestInfo; - SubrequestInfo::List m_subrequestList; + RepackArchiveReportBatch(backend,oStoreDb) {} + public: + void report(log::LogContext& lc) override; + private: + void recordReport(objectstore::RepackRequest::SubrequestStatistics::List& ssl, log::TimingList& timingList, utils::Timer& t) override; + cta::objectstore::serializers::ArchiveJobStatus getNewStatus() override; }; std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextRepackReportBatch(log::LogContext& lc) override;