diff --git a/objectstore/ArchiveQueueToReportToRepackForFailureAlgorithms.cpp b/objectstore/ArchiveQueueToReportToRepackForFailureAlgorithms.cpp index 5f2c3c4ee658f464298f13475630176d7e827f0e..0ba55b576bf9b452994d7eed91f9ab68a20d6dcd 100644 --- a/objectstore/ArchiveQueueToReportToRepackForFailureAlgorithms.cpp +++ b/objectstore/ArchiveQueueToReportToRepackForFailureAlgorithms.cpp @@ -31,6 +31,30 @@ namespace cta { namespace objectstore { ContainerSummary ret; ret.JobsSummary::operator=(cont.getJobsSummary()); return ret; -} + } + + template<> + auto ContainerTraits<ArchiveQueue,ArchiveQueueToReportToRepackForFailure>:: + getPoppingElementsCandidates(Container& cont, PopCriteria& unfulfilledCriteria, ElementsToSkipSet& elemtsToSkip, + log::LogContext& lc) -> PoppedElementsBatch + { + PoppedElementsBatch ret; + auto candidateJobsFromQueue=cont.getCandidateList(std::numeric_limits<uint64_t>::max(), unfulfilledCriteria.files, elemtsToSkip); + for (auto &cjfq: candidateJobsFromQueue.candidates) { + ret.elements.emplace_back(PoppedElement()); + PoppedElement & elem = ret.elements.back(); + elem.archiveRequest = cta::make_unique<ArchiveRequest>(cjfq.address, cont.m_objectStore); + elem.copyNb = cjfq.copyNb; + elem.bytes = cjfq.size; + elem.archiveFile = common::dataStructures::ArchiveFile(); + elem.srcURL = ""; + elem.archiveReportURL = ""; + elem.errorReportURL = ""; + elem.latestError = ""; + elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::Report; + ret.summary.files++; + } + return ret; + } }} \ No newline at end of file diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index e2c88afc36d86a417de35ebd99965496bc2322ec..779246b3906eb9c7bf844b7b1f2312808d58618b 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -145,13 +145,14 @@ auto ArchiveRequest::addTransferFailure(uint32_t copyNumber, return determineNextStep(copyNumber, JobEvent::TransferFailed, lc); } else { EnqueueingNextStep ret; - ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToTransferForUser; + bool isRepack = m_payload.isrepack(); + ret.nextStatus = isRepack ? serializers::ArchiveJobStatus::AJS_ToTransferForRepack : serializers::ArchiveJobStatus::AJS_ToTransferForUser; // Decide if we want the job to have a chance to come back to this mount (requeue) or not. In the latter // case, the job will remain owned by this session and get garbage collected. if (j.retrieswithinmount() >= j.maxretrieswithinmount()) ret.nextStep = EnqueueingNextStep::NextStep::Nothing; else - ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForTransfer; + ret.nextStep = isRepack ? EnqueueingNextStep::NextStep::EnqueueForTransferForRepack : EnqueueingNextStep::NextStep::EnqueueForTransferForUser; return ret; } } @@ -179,7 +180,7 @@ auto ArchiveRequest::addReportFailure(uint32_t copyNumber, uint64_t sessionId, c } else { // Status is unchanged ret.nextStatus = j.status(); - ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReport; + ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReportForUser; } return ret; } @@ -851,8 +852,13 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE { if (!m_payload.reportdecided()) { m_payload.set_reportdecided(true); - ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReport; - ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure; + if(!m_payload.isrepack()){ + ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReportForUser; + ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToReportToUserForFailure; + } else { + ret.nextStep = EnqueueingNextStep::NextStep::EnqueueForReportForRepack; + ret.nextStatus = serializers::ArchiveJobStatus::AJS_ToReportToRepackForFailure; + } } else { ret.nextStep = EnqueueingNextStep::NextStep::StoreInFailedJobsContainer; ret.nextStatus = serializers::ArchiveJobStatus::AJS_Failed; @@ -868,6 +874,28 @@ auto ArchiveRequest::determineNextStep(uint32_t copyNumberUpdated, JobEvent jobE return ret; } +//------------------------------------------------------------------------------ +// ArchiveRequest::getRepackInfo() +//------------------------------------------------------------------------------ +ArchiveRequest::RepackInfo ArchiveRequest::getRepackInfo(){ + checkPayloadReadable(); + cta::objectstore::serializers::ArchiveRequestRepackInfo repackInfo = m_payload.repack_info(); + ArchiveRequest::RepackInfo ret; + ret.fSeq = repackInfo.fseq(); + ret.fileBufferURL = repackInfo.file_buffer_url(); + ret.isRepack = true; + ret.repackRequestAddress = repackInfo.repack_request_address(); + return ret; +} + +void ArchiveRequest::setRepackInfo(const RepackInfo& repackInfo){ + checkPayloadWritable(); + auto repackInfoToWrite = m_payload.mutable_repack_info(); + repackInfoToWrite->set_repack_request_address(repackInfo.repackRequestAddress); + repackInfoToWrite->set_file_buffer_url(repackInfo.fileBufferURL); + repackInfoToWrite->set_fseq(repackInfo.fSeq); +} + //------------------------------------------------------------------------------ // ArchiveRequest::dump() //------------------------------------------------------------------------------ diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index eb6d503b1bfeb2c1ed8cb969e40951e0e9de42e3..386f73b75e953ae43efa0ce2ea61431ebb72a7ae 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -72,8 +72,10 @@ public: struct EnqueueingNextStep { enum class NextStep { Nothing, - EnqueueForTransfer, - EnqueueForReport, + EnqueueForTransferForUser, + EnqueueForTransferForRepack, + EnqueueForReportForUser, + EnqueueForReportForRepack, StoreInFailedJobsContainer, Delete } nextStep = NextStep::Nothing; diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index c5c6e9f940e5fa77810da3ead7974e5dc5edf430..e7b4de80ae464a67998c96d7d1468a8d7e6cbc8c 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -299,6 +299,7 @@ void RepackRequest::reportArchiveSuccesses(SubrequestStatistics::List& archiveSu p.archiveCopyNbsAccounted.insert(as.copyNb); m_payload.set_archivedbytes(m_payload.archivedbytes() + as.bytes); m_payload.set_archivedfiles(m_payload.archivedfiles() + as.files); + p.subrequestDeleted = as.subrequestDeleted; didUpdate = true; } } catch (std::out_of_range &) { @@ -333,6 +334,7 @@ void RepackRequest::reportArchiveFailures(SubrequestStatistics::List& archiveFai auto & p = pointerMap.at(af.fSeq); if (!p.archiveCopyNbsAccounted.count(af.copyNb)) { p.archiveCopyNbsAccounted.insert(af.copyNb); + p.subrequestDeleted = true; m_payload.set_failedtoarchivebytes(m_payload.failedtoarchivebytes() + af.bytes); m_payload.set_failedtoarchivefiles(m_payload.failedtoarchivefiles() + af.files); didUpdate = true; diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp index c0d28598d1f3aae11f2502c801f6e7299d4cd245..639bfbb51c0f0d4fe6c13c097a64c8e51588207a 100644 --- a/objectstore/RepackRequest.hpp +++ b/objectstore/RepackRequest.hpp @@ -84,6 +84,7 @@ public: uint64_t bytes; /// CopyNb is needed to record archive jobs statistics (we can have several archive jobs for the same fSeq) uint32_t copyNb = 0; + bool subrequestDeleted = false; typedef std::list<SubrequestStatistics> List; bool operator< (const SubrequestStatistics & o) const { return fSeq < o.fSeq; } }; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 066688abde221675fb15ccd6dbbb0843fa4973f8..da4c9117a74d66982cc7f8ce592978db93c0f945 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1580,6 +1580,9 @@ std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextRepackRep try { return getNextSuccessfulArchiveRepackReportBatch(lc); } catch (NoRepackReportBatchFound &) {} + try{ + return getNextFailedArchiveRepackReportBatch(lc); + } catch (NoRepackReportBatchFound &) {} return nullptr; } @@ -1717,6 +1720,48 @@ std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextSuccessfu throw NoRepackReportBatchFound("In OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(): no report found."); } +std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextFailedArchiveRepackReportBatch(log::LogContext& lc){ + typedef objectstore::ContainerAlgorithms<ArchiveQueue, ArchiveQueueToReportToRepackForFailure> Caaqtrtrff; + Caaqtrtrff algo(this->m_objectStore, *m_agentReference); + // Decide from which queue we are going to pop. + RootEntry re(m_objectStore); + re.fetchNoLock(); + while(true) { + auto queueList = re.dumpArchiveQueues(JobQueueType::JobsToReportToRepackForFailure); + if (queueList.empty()) throw NoRepackReportBatchFound("In OStoreDB::getNextFailedArchiveRepackReportBatch(): no queue found."); + // Try to get jobs from the first queue. If it is empty, it will be trimmed, so we can go for another round. + Caaqtrtrff::PopCriteria criteria; + criteria.files = c_repackReportBatchSize; + auto jobs = algo.popNextBatch(queueList.front().tapePool, criteria, lc); + if(jobs.elements.empty()) continue; + std::unique_ptr<RepackArchiveFailureReportBatch> privateRet; + privateRet.reset(new RepackArchiveFailureReportBatch(m_objectStore, *this)); + std::set<std::string> repackRequestAddresses; + for(auto &j : jobs.elements) + { + privateRet->m_subrequestList.emplace_back(RepackArchiveFailureReportBatch::SubrequestInfo()); + auto & sr = privateRet->m_subrequestList.back(); + sr.repackInfo = j.repackInfo; + sr.archivedCopyNb = j.copyNb; + sr.archiveJobsStatusMap = j.archiveJobsStatusMap; + sr.archiveFile = j.archiveFile; + sr.subrequest.reset(j.archiveRequest.release()); + repackRequestAddresses.insert(j.repackInfo.repackRequestAddress); + } + // As we are popping from a single report queue, all requests should concern only one repack request. + if (repackRequestAddresses.size() != 1) { + std::stringstream err; + err << "In OStoreDB::getNextFailedArchiveRepackReportBatch(): reports for several repack requests in the same queue. "; + for (auto & rr: repackRequestAddresses) { err << rr << " "; } + throw exception::Exception(err.str()); + } + privateRet->m_repackRequest.setAddress(*repackRequestAddresses.begin()); + + return std::unique_ptr<SchedulerDatabase::RepackReportBatch>(privateRet.release()); + } + throw NoRepackReportBatchFound("In OStoreDB::getNextFailedArchiveRepackReportBatch(): no report found."); +} + //------------------------------------------------------------------------------ // OStoreDB::getNextSuccessfulRetrieveRepackReportBatch() //------------------------------------------------------------------------------ @@ -3637,7 +3682,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L // Now apply the decision. // TODO: this will probably need to be factored out. switch (enQueueingNextStep.nextStep) { - case NextStep::Nothing: { + case NextStep::Nothing: { m_archiveRequest.commit(); auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); log::ScopedParamContainer params(lc); @@ -3653,7 +3698,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L "In ArchiveJob::failTransfer(): left the request owned, to be garbage collected for retry at the end of the mount."); return; } - case NextStep::Delete: { + case NextStep::Delete: { auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); m_archiveRequest.remove(); log::ScopedParamContainer params(lc); @@ -3668,7 +3713,7 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L lc.log(log::INFO, "In ArchiveJob::failTransfer(): removed request"); return; } - case NextStep::EnqueueForReport: { + case NextStep::EnqueueForReportForUser: { m_archiveRequest.commit(); auto tapepool = m_archiveRequest.getTapePoolForJob(tapeFile.copyNb); auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); @@ -3691,7 +3736,31 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L lc.log(log::INFO, "In ArchiveJob::failTransfer(): enqueued job for reporting"); return; } - case NextStep::EnqueueForTransfer: { + case NextStep::EnqueueForReportForRepack:{ + m_archiveRequest.commit(); + auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); + std::string repackRequestAddress = m_archiveRequest.getRepackInfo().repackRequestAddress; + // Algorithms suppose the objects are not locked. + arl.release(); + typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToReportToRepackForFailure> CaAqtrtrff; + CaAqtrtrff caAqtrtrff(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); + CaAqtrtrff::InsertedElement::list insertedElements; + insertedElements.push_back(CaAqtrtrff::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, cta::nullopt, cta::nullopt }); + caAqtrtrff.referenceAndSwitchOwnership(repackRequestAddress, insertedElements, lc); + log::ScopedParamContainer params(lc); + params.add("fileId", archiveFile.archiveFileID) + .add("copyNb", tapeFile.copyNb) + .add("failureReason", failureReason) + .add("requestObject", m_archiveRequest.getAddressIfSet()) + .add("retriesWithinMount", retryStatus.retriesWithinMount) + .add("maxRetriesWithinMount", retryStatus.maxRetriesWithinMount) + .add("totalRetries", retryStatus.totalRetries) + .add("maxTotalRetries", retryStatus.maxTotalRetries); + lc.log(log::INFO, + "In ArchiveJob::failTransfer(): enqueued job for reporting for Repack"); + return; + } + case NextStep::EnqueueForTransferForUser: { m_archiveRequest.commit(); auto tapepool = m_archiveRequest.getTapePoolForJob(tapeFile.copyNb); auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); @@ -3715,6 +3784,31 @@ void OStoreDB::ArchiveJob::failTransfer(const std::string& failureReason, log::L "In ArchiveJob::failTransfer(): requeued job for (potentially in-mount) retry."); return; } + case NextStep::EnqueueForTransferForRepack:{ + m_archiveRequest.commit(); + auto tapepool = m_archiveRequest.getTapePoolForJob(tapeFile.copyNb); + auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); + auto mountPolicy = m_archiveRequest.getMountPolicy(); + // Algorithms suppose the objects are not locked. + arl.release(); + typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForRepack> CaAqtr; + CaAqtr caAqtr(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); + CaAqtr::InsertedElement::list insertedElements; + insertedElements.push_back(CaAqtr::InsertedElement{&m_archiveRequest, tapeFile.copyNb, archiveFile, mountPolicy, cta::nullopt }); + caAqtr.referenceAndSwitchOwnership(tapepool, insertedElements, lc); + log::ScopedParamContainer params(lc); + params.add("fileId", archiveFile.archiveFileID) + .add("copyNb", tapeFile.copyNb) + .add("failureReason", failureReason) + .add("requestObject", m_archiveRequest.getAddressIfSet()) + .add("retriesWithinMount", retryStatus.retriesWithinMount) + .add("maxRetriesWithinMount", retryStatus.maxRetriesWithinMount) + .add("totalRetries", retryStatus.totalRetries) + .add("maxTotalRetries", retryStatus.maxTotalRetries); + lc.log(log::INFO, + "In ArchiveJob::failTransfer(): requeued job for (potentially in-mount) retry."); + return; + } case NextStep::StoreInFailedJobsContainer: { m_archiveRequest.commit(); auto tapepool = m_archiveRequest.getTapePoolForJob(tapeFile.copyNb); @@ -3762,7 +3856,7 @@ void OStoreDB::ArchiveJob::failReport(const std::string& failureReason, log::Log // TODO: this will probably need to be factored out. switch (enQueueingNextStep.nextStep) { // We have a reduced set of supported next steps as some are not compatible with this event (see default). - case NextStep::EnqueueForReport: { + case NextStep::EnqueueForReportForUser: { m_archiveRequest.commit(); auto tapepool = m_archiveRequest.getTapePoolForJob(tapeFile.copyNb); auto retryStatus = m_archiveRequest.getRetryStatus(tapeFile.copyNb); @@ -3882,6 +3976,15 @@ void OStoreDB::RepackArchiveSuccessesReportBatch::report(log::LogContext& lc) { ssl.back().files = 1; ssl.back().fSeq = sri.repackInfo.fSeq; ssl.back().copyNb = sri.archivedCopyNb; + for(auto &j: sri.archiveJobsStatusMap){ + if(j.first != sri.archivedCopyNb && + (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Complete) && + (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Failed)){ + break; + } else { + ssl.back().subrequestDeleted = true; + } + } } // Record it. timingList.insertAndReset("successStatsPrepareTime", t); @@ -3990,6 +4093,137 @@ void OStoreDB::RepackArchiveSuccessesReportBatch::report(log::LogContext& lc) { lc.log(log::INFO, "In OStoreDB::RepackArchiveSuccessesReportBatch::report(): reported a batch of jobs."); } +void OStoreDB::RepackArchiveFailureReportBatch::report(log::LogContext& lc){ + utils::Timer t; + log::TimingList timingList; + + // 1) Update statistics. As the repack request is protected against double reporting, we can release its lock + // before the next (deletions). + { + // Prepare the report + objectstore::RepackRequest::SubrequestStatistics::List ssl; + for (auto &sri: m_subrequestList) { + ssl.push_back(objectstore::RepackRequest::SubrequestStatistics()); + ssl.back().bytes = sri.archiveFile.fileSize; + ssl.back().files = 1; + ssl.back().fSeq = sri.repackInfo.fSeq; + ssl.back().copyNb = sri.archivedCopyNb; + for(auto &j: sri.archiveJobsStatusMap){ + if(j.first != sri.archivedCopyNb && + (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Complete) && + (j.second != objectstore::serializers::ArchiveJobStatus::AJS_Failed)){ + break; + } else { + ssl.back().subrequestDeleted = true; + } + } + } + // Record it. + timingList.insertAndReset("failureStatsPrepareTime", t); + objectstore::ScopedExclusiveLock rrl(m_repackRequest); + timingList.insertAndReset("failureStatsLockTime", t); + m_repackRequest.fetch(); + timingList.insertAndReset("failureStatsFetchTime", t); + m_repackRequest.reportArchiveFailures(ssl); + timingList.insertAndReset("failureStatsUpdateTime", t); + m_repackRequest.commit(); + timingList.insertAndReset("failureStatsCommitTime", t); + } + // 2) For each job, determine if sibling jobs are complete or not. If so, delete, else just update status and set empty owner. + struct Deleters { + std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter> deleter; + RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> & subrequestInfo; + typedef std::list<Deleters> List; + }; + struct JobOwnerUpdaters { + std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> jobOwnerUpdater; + RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> & subrequestInfo; + typedef std::list<JobOwnerUpdaters> List; + }; + Deleters::List deletersList; + JobOwnerUpdaters::List jobOwnerUpdatersList; + for (auto &sri: m_subrequestList) { + bool moreJobsToDo = false; + for (auto &j: sri.archiveJobsStatusMap) { + if ((j.first != sri.archivedCopyNb) && + (j.second != serializers::ArchiveJobStatus::AJS_Complete) && + (j.second != serializers::ArchiveJobStatus::AJS_Failed)) { + moreJobsToDo = true; + break; + } + } + objectstore::ArchiveRequest & ar = *sri.subrequest; + if (moreJobsToDo) { + try { + jobOwnerUpdatersList.push_back(JobOwnerUpdaters{std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater> ( + ar.asyncUpdateJobOwner(sri.archivedCopyNb, "", m_oStoreDb.m_agentReference->getAgentAddress(), + serializers::ArchiveJobStatus::AJS_Failed)), + sri}); + } catch (cta::exception::Exception & ex) { + // Log the error + log::ScopedParamContainer params(lc); + params.add("fileId", sri.archiveFile.archiveFileID) + .add("subrequestAddress", sri.subrequest->getAddressIfSet()) + .add("exceptionMsg", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): failed to asyncUpdateJobOwner()"); + } + } else { + try { + deletersList.push_back({std::unique_ptr<objectstore::ArchiveRequest::AsyncRequestDeleter>(ar.asyncDeleteRequest()), sri}); + } catch (cta::exception::Exception & ex) { + // Log the error + log::ScopedParamContainer params(lc); + params.add("fileId", sri.archiveFile.archiveFileID) + .add("subrequestAddress", sri.subrequest->getAddressIfSet()) + .add("exceptionMsg", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): failed to asyncDelete()"); + } + } + } + timingList.insertAndReset("asyncUpdateOrDeleteLaunchTime", t); + for (auto & d: deletersList) { + try { + d.deleter->wait(); + log::ScopedParamContainer params(lc); + params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID) + .add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet()); + lc.log(log::INFO, "In OStoreDB::RepackArchiveFailureReportBatch::report(): deleted request."); + } catch (cta::exception::Exception & ex) { + // Log the error + log::ScopedParamContainer params(lc); + params.add("fileId", d.subrequestInfo.archiveFile.archiveFileID) + .add("subrequestAddress", d.subrequestInfo.subrequest->getAddressIfSet()) + .add("exceptionMsg", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): async deletion failed."); + } + } + for (auto & jou: jobOwnerUpdatersList) { + try { + jou.jobOwnerUpdater->wait(); + log::ScopedParamContainer params(lc); + params.add("fileId", jou.subrequestInfo.archiveFile.archiveFileID) + .add("subrequestAddress", jou.subrequestInfo.subrequest->getAddressIfSet()); + lc.log(log::INFO, "In OStoreDB::RepackArchiveFailureReportBatch::report(): async updated job."); + } catch (cta::exception::Exception & ex) { + // Log the error + log::ScopedParamContainer params(lc); + params.add("fileId", jou.subrequestInfo.archiveFile.archiveFileID) + .add("subrequestAddress", jou.subrequestInfo.subrequest->getAddressIfSet()) + .add("exceptionMsg", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::RepackArchiveFailureReportBatch::report(): async job update."); + } + } + timingList.insertAndReset("asyncUpdateOrDeleteCompletionTime", t); + // 3) Just remove all jobs from ownership + std::list<std::string> jobsToUnown; + for (auto sri: m_subrequestList) jobsToUnown.push_back(sri.subrequest->getAddressIfSet()); + m_oStoreDb.m_agentReference->removeBatchFromOwnership(jobsToUnown, m_oStoreDb.m_objectStore); + timingList.insertAndReset("ownershipRemoval", t); + log::ScopedParamContainer params(lc); + timingList.addToLog(params); + lc.log(log::INFO, "In OStoreDB::RepackArchiveFailureReportBatch::report(): reported a batch of jobs."); +} + //------------------------------------------------------------------------------ // OStoreDB::ArchiveJob::asyncDeleteRequest() //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 518e0fde1167b139187f1fc82ddf76f9bbaba876..2a40ccb030a7295749863194703689c61342a49b 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -463,6 +463,18 @@ public: SubrequestInfo::List m_subrequestList; }; + class RepackArchiveFailureReportBatch: public RepackReportBatch { + friend class OStoreDB; + + RepackArchiveFailureReportBatch(objectstore::Backend & backend, OStoreDB & oStoreDb): + RepackReportBatch(backend,oStoreDb) {} + public: + void report(log::LogContext& lc) override; + private: + typedef RepackReportBatch::SubrequestInfo<objectstore::ArchiveRequest> SubrequestInfo; + SubrequestInfo::List m_subrequestList; + }; + std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextRepackReportBatch(log::LogContext& lc) override; private: CTA_GENERATE_EXCEPTION_CLASS(NoRepackReportBatchFound); @@ -470,6 +482,7 @@ private: std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextSuccessfulRetrieveRepackReportBatch(log::LogContext& lc); std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextFailedRetrieveRepackReportBatch(log::LogContext& lc); std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextSuccessfulArchiveRepackReportBatch(log::LogContext& lc); + std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextFailedArchiveRepackReportBatch(log::LogContext &lc); public: /* === Drive state handling ============================================== */ diff --git a/scheduler/RetrieveJob.cpp b/scheduler/RetrieveJob.cpp index ccf0d7e4f51fb0e0b3ba2f4ca7a0807eb6f3acd2..21900bc27437670280eec013512ac9126b84a7e0 100644 --- a/scheduler/RetrieveJob.cpp +++ b/scheduler/RetrieveJob.cpp @@ -103,4 +103,3 @@ const cta::common::dataStructures::TapeFile& cta::RetrieveJob::selectedTapeFile( throw std::runtime_error(std::string("cta::RetrieveJob::selectedTapeFile(): ") + ex.what()); } } - diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 5c4c66642f39a8f7c16e4974e04e9c4b5d2715ed..4e1383b6fc286f1ffe730c24bf1ef123d1e99699 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -1659,7 +1659,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { } } -TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) { +TEST_P(SchedulerTest, expandRepackRequestRetrieveFailed) { using namespace cta; using namespace cta::objectstore; @@ -1831,7 +1831,6 @@ TEST_P(SchedulerTest, expandRepackRequestFailedRetrieve) { ASSERT_NE(nullptr, retrieveMount.get()); std::unique_ptr<cta::RetrieveJob> retrieveJob; - std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs; //For each tape we will see if the retrieve jobs are not null auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc); retrieveJob.reset(jobBatch.front().release()); @@ -2147,6 +2146,304 @@ TEST_P(SchedulerTest, expandRepackRequestArchiveSuccess) { } } +TEST_P(SchedulerTest, expandRepackRequestArchiveFailed) { + using namespace cta; + using namespace cta::objectstore; + + auto &catalogue = getCatalogue(); + auto &scheduler = getScheduler(); + auto &schedulerDB = getSchedulerDB(); + cta::objectstore::Backend& backend = schedulerDB.getBackend(); + setupDefaultCatalogue(); + +#ifdef STDOUT_LOGGING + log::StdoutLogger dl("dummy", "unitTest"); +#else + log::DummyLogger dl("", ""); +#endif + log::LogContext lc(dl); + + //Create an agent to represent this test process + cta::objectstore::AgentReference agentReference("expandRepackRequestTest", dl); + cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend); + agent.initialize(); + agent.setTimeout_us(0); + agent.insertAndRegisterSelf(lc); + + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; + const bool disabledValue = false; + const bool fullValue = false; + const std::string comment = "Create tape"; + cta::common::dataStructures::SecurityIdentity admin; + admin.username = "admin_user_name"; + admin.host = "admin_host"; + const std::string diskFileUser = "public_disk_user"; + const std::string diskFileGroup = "public_disk_group"; + + //Create a logical library in the catalogue + catalogue.createLogicalLibrary(admin, s_libraryName, "Create logical library"); + + std::ostringstream ossVid; + ossVid << s_vid << "_" << 1; + std::string vid = ossVid.str(); + catalogue.createTape(s_adminOnAdminHost,vid, s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes, + disabledValue, fullValue, comment); + + //Create a storage class in the catalogue + common::dataStructures::StorageClass storageClass; + storageClass.diskInstance = s_diskInstance; + storageClass.name = s_storageClassName; + storageClass.nbCopies = 2; + storageClass.comment = "Create storage class"; + + const std::string checksumType = "checksum_type"; + const std::string checksumValue = "checksum_value"; + const std::string tapeDrive = "tape_drive"; + const uint64_t nbArchiveFilesPerTape = 10; + const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000; + const uint64_t compressedFileSize = archiveFileSize; + + //Simulate the writing of 10 files per tape in the catalogue + std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1; + { + uint64_t archiveFileId = 1; + std::string currentVid = vid; + for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) { + std::ostringstream diskFileId; + diskFileId << (12345677 + archiveFileId); + std::ostringstream diskFilePath; + diskFilePath << "/public_dir/public_file_"<<1<<"_"<< j; + auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>(); + auto & fileWritten = *fileWrittenUP; + fileWritten.archiveFileId = archiveFileId++; + fileWritten.diskInstance = storageClass.diskInstance; + fileWritten.diskFileId = diskFileId.str(); + fileWritten.diskFilePath = diskFilePath.str(); + fileWritten.diskFileUser = diskFileUser; + fileWritten.diskFileGroup = diskFileGroup; + fileWritten.size = archiveFileSize; + fileWritten.checksumType = checksumType; + fileWritten.checksumValue = checksumValue; + fileWritten.storageClassName = s_storageClassName; + fileWritten.vid = currentVid; + fileWritten.fSeq = j; + fileWritten.blockId = j * 100; + fileWritten.compressedSize = compressedFileSize; + fileWritten.copyNb = 1; + fileWritten.tapeDrive = tapeDrive; + tapeFilesWrittenCopy1.emplace(fileWrittenUP.release()); + } + //update the DB tape + catalogue.filesWrittenToTape(tapeFilesWrittenCopy1); + tapeFilesWrittenCopy1.clear(); + } + //Test the expandRepackRequest method + scheduler.waitSchedulerDbSubthreadsComplete(); + + { + scheduler.queueRepack(admin,vid,"root://repackData/buffer",common::dataStructures::RepackInfo::Type::MoveOnly,lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + + log::TimingList tl; + utils::Timer t; + + scheduler.promoteRepackRequestsToToExpand(lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + + auto repackRequestToExpand = scheduler.getNextRepackRequestToExpand(); + + scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc); + scheduler.waitSchedulerDbSubthreadsComplete(); + } + { + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); + std::unique_ptr<cta::RetrieveMount> retrieveMount; + retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); + ASSERT_NE(nullptr, retrieveMount.get()); + std::unique_ptr<cta::RetrieveJob> retrieveJob; + + std::list<std::unique_ptr<cta::RetrieveJob>> executedJobs; + //For each tape we will see if the retrieve jobs are not null + for(uint64_t j = 1; j<=nbArchiveFilesPerTape; ++j) + { + auto jobBatch = retrieveMount->getNextJobBatch(1,archiveFileSize,lc); + retrieveJob.reset(jobBatch.front().release()); + ASSERT_NE(nullptr, retrieveJob.get()); + executedJobs.push_back(std::move(retrieveJob)); + } + //Now, report the retrieve jobs to be completed + castor::tape::tapeserver::daemon::RecallReportPacker rrp(retrieveMount.get(),lc); + + rrp.startThreads(); + + //Report all jobs as succeeded + for(auto it = executedJobs.begin(); it != executedJobs.end(); ++it) + { + rrp.reportCompletedJob(std::move(*it)); + } + + rrp.setDiskDone(); + rrp.setTapeDone(); + + rrp.reportDriveStatus(cta::common::dataStructures::DriveStatus::Unmounting); + + rrp.reportEndOfSession(); + rrp.waitThread(); + + ASSERT_TRUE(rrp.allThreadsDone()); + } + { + //Do the reporting of RetrieveJobs, will transform the Retrieve request in Archive requests + while (true) { + auto rep = schedulerDB.getNextRepackReportBatch(lc); + if (nullptr == rep) break; + rep->report(lc); + } + } + //All retrieve have been successfully executed, let's get all the ArchiveJobs generated from the succeeded RetrieveJobs of Repack + { + scheduler.waitSchedulerDbSubthreadsComplete(); + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); + + std::unique_ptr<cta::ArchiveMount> archiveMount; + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); + ASSERT_NE(nullptr, archiveMount.get()); + std::unique_ptr<cta::ArchiveJob> archiveJob; + + //Get all Archive jobs + std::list<std::unique_ptr<cta::ArchiveJob>> executedJobs; + for(uint64_t j = 1;j<=nbArchiveFilesPerTape;++j){ + auto jobBatch = archiveMount->getNextJobBatch(1,archiveFileSize,lc); + archiveJob.reset(jobBatch.front().release()); + archiveJob->tapeFile.blockId = j * 101; + archiveJob->tapeFile.checksumType = checksumType; + archiveJob->tapeFile.checksumValue = checksumValue; + archiveJob->tapeFile.compressedSize = compressedFileSize; + ASSERT_NE(nullptr,archiveJob.get()); + executedJobs.push_back(std::move(archiveJob)); + } + + { + castor::tape::tapeserver::daemon::MigrationReportPacker mrp(archiveMount.get(),lc); + mrp.startThreads(); + + //Report all archive jobs as succeeded except the first one + auto it = executedJobs.begin(); + mrp.reportSkippedJob(std::move(*it),"expandRepackRequestFailedArchive",lc); + it++; + while(it != executedJobs.end()){ + mrp.reportCompletedJob(std::move(*it),lc); + it++; + } + + castor::tape::tapeserver::drive::compressionStats compressStats; + mrp.reportFlush(compressStats,lc); + mrp.reportEndOfSession(lc); + mrp.reportTestGoingToEnd(lc); + mrp.waitThread(); + + { + //Test only 9 jobs are in the ArchiveQueueToReportToRepackForSuccess + cta::objectstore::RootEntry re(backend); + re.fetchNoLock(); + objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend()); + ri.fetchNoLock(); + + std::string archiveQueueToReportToRepackForSuccessAddress = re.getArchiveQueueAddress(ri.getRepackRequestAddress(vid),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); + cta::objectstore::ArchiveQueue aq(archiveQueueToReportToRepackForSuccessAddress,backend); + + aq.fetchNoLock(); + ASSERT_EQ(9,aq.dumpJobs().size()); + } + } + + for(int i = 0; i < 5; ++i){ + { + //The failed job should be queued into the ArchiveQueueToTransferForRepack + cta::objectstore::RootEntry re(backend); + re.fetchNoLock(); + + std::string archiveQueueToTransferForRepackAddress = re.getArchiveQueueAddress(s_tapePoolName,cta::objectstore::JobQueueType::JobsToTransferForRepack); + cta::objectstore::ArchiveQueue aq(archiveQueueToTransferForRepackAddress,backend); + + aq.fetchNoLock(); + + for(auto &job: aq.dumpJobs()){ + ASSERT_EQ(1,job.copyNb); + ASSERT_EQ(archiveFileSize,job.size); + } + } + std::unique_ptr<cta::TapeMount> mount; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); + std::unique_ptr<cta::ArchiveMount> archiveMount; + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); + ASSERT_NE(nullptr, archiveMount.get()); + std::unique_ptr<cta::ArchiveJob> archiveJob; + + auto jobBatch = archiveMount->getNextJobBatch(1,archiveFileSize,lc); + archiveJob.reset(jobBatch.front().release()); + ASSERT_NE(nullptr, archiveJob.get()); + + castor::tape::tapeserver::daemon::MigrationReportPacker mrp(archiveMount.get(),lc); + mrp.startThreads(); + + mrp.reportFailedJob(std::move(archiveJob),cta::exception::Exception("FailedJob expandRepackRequestFailedArchive"),lc); + + castor::tape::tapeserver::drive::compressionStats compressStats; + mrp.reportFlush(compressStats,lc); + mrp.reportEndOfSession(lc); + mrp.reportTestGoingToEnd(lc); + mrp.waitThread(); + } + + //Test that the failed job is queued in the ArchiveQueueToReportToRepackForFailure + { + cta::objectstore::RootEntry re(backend); + re.fetchNoLock(); + objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend()); + ri.fetchNoLock(); + + std::string archiveQueueToReportToRepackForFailureAddress = re.getArchiveQueueAddress(ri.getRepackRequestAddress(vid),cta::objectstore::JobQueueType::JobsToReportToRepackForFailure); + cta::objectstore::ArchiveQueue aq(archiveQueueToReportToRepackForFailureAddress,backend); + + aq.fetchNoLock(); + + for(auto &job: aq.dumpJobs()){ + ASSERT_EQ(1,job.copyNb); + ASSERT_EQ(archiveFileSize,job.size); + } + } + { + //Do the reporting of the ArchiveJobs + while (true) { + auto rep = schedulerDB.getNextRepackReportBatch(lc); + if (nullptr == rep) break; + rep->report(lc); + } + } + { + scheduler.waitSchedulerDbSubthreadsComplete(); + //Test that the repackRequestStatus is set as Failed. + cta::objectstore::RootEntry re(backend); + cta::objectstore::ScopedExclusiveLock sel(re); + re.fetch(); + objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend()); + ri.fetchNoLock(); + cta::objectstore::RepackRequest rr(ri.getRepackRequestAddress(vid),backend); + rr.fetchNoLock(); + ASSERT_EQ(common::dataStructures::RepackInfo::Status::Failed,rr.getInfo().status); + } + } +} + #undef TEST_MOCK_DB #ifdef TEST_MOCK_DB static cta::MockSchedulerDatabaseFactory mockDbFactory;