diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index f35f687df9e2db84a4ad66215c4f416b5f92aa08..449e7798fcb499f1f29f4311da9398b092db58a1 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -29,6 +29,13 @@ namespace cta { namespace objectstore { RepackRequest::RepackRequest(const std::string& address, Backend& os): ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t> (os, address) { } +//------------------------------------------------------------------------------ +// Constructor +//------------------------------------------------------------------------------ +RepackRequest::RepackRequest(Backend& os): + ObjectOps<serializers::RepackRequest, serializers::RepackRequest_t> (os) { } + + //------------------------------------------------------------------------------ // RepackRequest::RepackRequest() //------------------------------------------------------------------------------ @@ -146,10 +153,10 @@ void RepackRequest::setBufferURL(const std::string& bufferURL) { void RepackRequest::RepackSubRequestPointer::serialize(serializers::RepackSubRequestPointer& rsrp) { rsrp.set_address(address); rsrp.set_fseq(fSeq); - rsrp.set_retrieveaccounted(retrieveAccounted); - rsrp.set_archiveaccounted(archiveAccounted); - rsrp.set_failureaccounted(failureAccounted); - rsrp.set_subrequestdeleted(subrequestDeleted); + rsrp.set_retrieve_accounted(retrieveAccounted); + rsrp.mutable_archive_copynb_accounted()->Clear(); + for (auto cna: archiveCopyNbsAccounted) { rsrp.mutable_archive_copynb_accounted()->Add(cna); } + rsrp.set_subrequest_deleted(subrequestDeleted); } //------------------------------------------------------------------------------ @@ -158,10 +165,10 @@ void RepackRequest::RepackSubRequestPointer::serialize(serializers::RepackSubReq void RepackRequest::RepackSubRequestPointer::deserialize(const serializers::RepackSubRequestPointer& rsrp) { address = rsrp.address(); fSeq = rsrp.fseq(); - retrieveAccounted = rsrp.retrieveaccounted(); - archiveAccounted = rsrp.archiveaccounted(); - failureAccounted = rsrp.failureaccounted(); - subrequestDeleted = rsrp.subrequestdeleted(); + retrieveAccounted = rsrp.retrieve_accounted(); + archiveCopyNbsAccounted.clear(); + for (auto acna: rsrp.archive_copynb_accounted()) { archiveCopyNbsAccounted.insert(acna); } + subrequestDeleted = rsrp.subrequest_deleted(); } //------------------------------------------------------------------------------ @@ -190,7 +197,8 @@ auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint64_t> fSeqs, AgentRe auto & p = pointerMap[fs]; p.address = retInfo.address; p.fSeq = fs; - p.archiveAccounted = p.retrieveAccounted = p.failureAccounted = p.subrequestDeleted = false; + p.retrieveAccounted = p.subrequestDeleted = false; + p.archiveCopyNbsAccounted.clear(); newElementCreated = true; } ret.emplace(retInfo); @@ -256,13 +264,13 @@ void RepackRequest::reportRetriveFailures(SubrequestStatistics::List& retrieveFa // Read the map for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp); bool didUpdate = false; - for (auto & rs: retrieveFailures) { + for (auto & rf: retrieveFailures) { try { - auto & p = pointerMap.at(rs.fSeq); - if (!p.failureAccounted) { - p.failureAccounted = true; - m_payload.set_failedtoretrievebytes(m_payload.failedtoretrievebytes() + rs.bytes); - m_payload.set_failedtoretrievefiles(m_payload.failedtoretrievefiles() + rs.files); + auto & p = pointerMap.at(rf.fSeq); + if (!p.retrieveAccounted) { + p.retrieveAccounted = true; + m_payload.set_failedtoretrievebytes(m_payload.failedtoretrievebytes() + rf.bytes); + m_payload.set_failedtoretrievefiles(m_payload.failedtoretrievefiles() + rf.files); didUpdate = true; } } catch (std::out_of_range &) { @@ -287,8 +295,8 @@ void RepackRequest::reportArchiveSuccesses(SubrequestStatistics::List& archiveSu for (auto & as: archiveSuccesses) { try { auto & p = pointerMap.at(as.fSeq); - if (!p.archiveAccounted) { - p.archiveAccounted = true; + if (!p.archiveCopyNbsAccounted.count(as.copyNb)) { + p.archiveCopyNbsAccounted.insert(as.copyNb); m_payload.set_archivedbytes(m_payload.archivedbytes() + as.bytes); m_payload.set_archivedfiles(m_payload.archivedfiles() + as.files); didUpdate = true; @@ -312,13 +320,13 @@ void RepackRequest::reportArchiveFailures(SubrequestStatistics::List& archiveFai // Read the map for (auto &rsrp: m_payload.subrequests()) pointerMap[rsrp.fseq()].deserialize(rsrp); bool didUpdate = false; - for (auto & rs: archiveFailures) { + for (auto & af: archiveFailures) { try { - auto & p = pointerMap.at(rs.fSeq); - if (!p.failureAccounted) { - p.failureAccounted = true; - m_payload.set_failedtoarchivebytes(m_payload.failedtoarchivebytes() + rs.bytes); - m_payload.set_failedtoarchivefiles(m_payload.failedtoarchivefiles() + rs.files); + auto & p = pointerMap.at(af.fSeq); + if (!p.archiveCopyNbsAccounted.count(af.copyNb)) { + p.archiveCopyNbsAccounted.insert(af.copyNb); + m_payload.set_failedtoarchivebytes(m_payload.failedtoarchivebytes() + af.bytes); + m_payload.set_failedtoarchivefiles(m_payload.failedtoarchivefiles() + af.files); didUpdate = true; } } catch (std::out_of_range &) { diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp index 8513caa2af5bdd6473a1f8ebfd3a3b037f7fa458..c0d28598d1f3aae11f2502c801f6e7299d4cd245 100644 --- a/objectstore/RepackRequest.hpp +++ b/objectstore/RepackRequest.hpp @@ -66,8 +66,7 @@ private: std::string address; uint64_t fSeq; bool retrieveAccounted; - bool archiveAccounted; - bool failureAccounted; + std::set<uint32_t> archiveCopyNbsAccounted; bool subrequestDeleted; typedef std::map<uint64_t, RepackSubRequestPointer> Map; void serialize (serializers::RepackSubRequestPointer & rsrp); @@ -83,6 +82,8 @@ public: uint64_t fSeq; uint64_t files = 1; uint64_t bytes; + /// CopyNb is needed to record archive jobs statistics (we can have several archive jobs for the same fSeq) + uint32_t copyNb = 0; typedef std::list<SubrequestStatistics> List; bool operator< (const SubrequestStatistics & o) const { return fSeq < o.fSeq; } }; diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index a46aa7358df853790872cbb5de37b5ca727b91ee..7b288bc5e1065f839a7bc01cc9b80b73fbbc0fe6 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -529,6 +529,7 @@ void RetrieveRequest::setRepackInfo(const RepackInfo& repackInfo) { } m_payload.mutable_repack_info()->set_file_buffer_url(repackInfo.fileBufferURL); m_payload.mutable_repack_info()->set_repack_request_address(repackInfo.repackRequestAddress); + m_payload.mutable_repack_info()->set_fseq(repackInfo.fSeq); } } @@ -789,6 +790,7 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string ri.fileBufferURL = payload.repack_info().file_buffer_url(); ri.isRepack = true; ri.repackRequestAddress = payload.repack_info().repack_request_address(); + ri.fSeq = payload.repack_info().fseq(); } // TODO serialization of payload maybe not necessary oh.set_payload(payload.SerializePartialAsString()); diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 330642d08010c04c06399fc954b5d708d6d96689..003af3fe3fca815d0e55504b24169870a29f4c7f 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -56,7 +56,7 @@ public: uint32_t copyNb; serializers::RetrieveJobStatus status; }; - // An asynchronous job ownership updating class. + // An asynchronous request deleting class. class AsyncJobDeleter { friend class RetrieveRequest; public: @@ -148,6 +148,7 @@ public: std::map<uint32_t, std::string> archiveRouteMap; std::set<uint32_t> copyNbsToRearchive; std::string repackRequestAddress; + uint64_t fSeq; std::string fileBufferURL; }; void setRepackInfo(const RepackInfo & repackInfo); @@ -165,6 +166,7 @@ public: for (auto cntr: copyNbsToRearchive) rrri.mutable_copy_nbs_to_rearchive()->Add(cntr); rrri.set_file_buffer_url(fileBufferURL); rrri.set_repack_request_address(repackRequestAddress); + rrri.set_fseq(fSeq); } void deserialize(const cta::objectstore::serializers::RetrieveRequestRepackInfo & rrri) { @@ -173,6 +175,7 @@ public: for(auto &cntr: rrri.copy_nbs_to_rearchive()) { copyNbsToRearchive.insert(cntr); } fileBufferURL = rrri.file_buffer_url(); repackRequestAddress = rrri.repack_request_address(); + fSeq = rrri.fseq(); } }; private: diff --git a/objectstore/cta.proto b/objectstore/cta.proto index ed5b3414ea9ae3bd1c844d01c3234a4935f8c952..08c040577cda9f58320332bfb22c06365cac9b78 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -377,6 +377,7 @@ message RetrieveRequestRepackInfo { repeated uint32 copy_nbs_to_rearchive = 9510; required string repack_request_address = 9520; required string file_buffer_url = 9530; + required uint64 fseq = 9540; } message RetrieveRequest { @@ -491,15 +492,15 @@ enum RepackRequestStatus { // sub request should be interpreted as an unfulfilled creation intent (deleted=false) and create the // missing sub request or the completion of the request (which can happen anytime after sub request // creation). -// Likewise, the "accounted" booleans will prevent double counting in case a report (for success or failure) -// need to be retried after a process failure. +// Likewise, the "accounted" booleans or copyNbs will prevent double counting in case a report (for success or failure) +// need to be retried after a process failure. The same flag is used for both success and failure. Archive requires a +// set of copyNbs as a single repack can lead to several archives (in case we create new copies). message RepackSubRequestPointer { required string address = 10500; required uint64 fseq = 10510; - required bool retrieveaccounted = 10530; - required bool archiveaccounted = 10534; - required bool failureaccounted = 10537; - required bool subrequestdeleted = 10540; + required bool retrieve_accounted = 10530; + repeated uint32 archive_copynb_accounted = 10534; + required bool subrequest_deleted = 10540; } message RepackRequest { diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index e400149d6ec69e29e1863f364f2706188dcbfa91..ccd0907a45511ef6ac2af531955c57e8345a80dd 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1523,9 +1523,227 @@ std::unique_ptr<SchedulerDatabase::RepackRequest> OStoreDB::getNextRepackJobToEx // OStoreDB::getNextRepackJobToExpand() //------------------------------------------------------------------------------ std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextRepackReportBatch(log::LogContext& lc) { + try { + return getNextSuccessfulRetrieveRepackReportBatch(lc); + } catch (NoRepackReportBatchFound &) {} return nullptr; } +//------------------------------------------------------------------------------ +// OStoreDB::getNextSuccessfulRetrieveRepackReportBatch() +//------------------------------------------------------------------------------ +std::unique_ptr<SchedulerDatabase::RepackReportBatch> OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(log::LogContext& lc) { + typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess> Carqtrtrfs; + Carqtrtrfs algo(this->m_objectStore, *m_agentReference); + // Decide from which queue we are going to pop. + RootEntry re(m_objectStore); + re.fetchNoLock(); + while(true) { + auto queueList = re.dumpRetrieveQueues(JobQueueType::JobsToReportToRepackForSuccess); + if (queueList.empty()) throw NoRepackReportBatchFound("In OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(): no queue found."); + + // Try to get jobs from the first queue. If it is empty, it will be trimmed, so we can go for another round. + Carqtrtrfs::PopCriteria criteria; + criteria.files = c_repackReportBatchSize; + auto jobs = algo.popNextBatch(queueList.front().vid, criteria, lc); + if(jobs.elements.empty()) continue; + std::unique_ptr<RepackRetrieveSuccessesReportBatch> privateRet; + privateRet.reset(new RepackRetrieveSuccessesReportBatch(m_objectStore, *this)); + std::set<std::string> repackRequestAddresses; + for(auto &j : jobs.elements) + { + privateRet->m_subrequestList.emplace_back(RepackRetrieveSuccessesReportBatch::SubrequestInfo()); + auto & sr = privateRet->m_subrequestList.back(); + sr.repackInfo = j.repackInfo; + sr.archiveFile = j.archiveFile; + sr.subrequest.reset(j.retrieveRequest.release()); + repackRequestAddresses.insert(j.repackInfo.repackRequestAddress); + } + // As we are popping from a single report queue, all requests should concern only one repack request. + if (repackRequestAddresses.size() != 1) { + std::stringstream err; + err << "In OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(): reports for several repack requests in the same queue. "; + for (auto & rr: repackRequestAddresses) { err << rr << " "; } + throw exception::Exception(err.str()); + } + privateRet->m_repackRequest.setAddress(*repackRequestAddresses.begin()); + + return std::unique_ptr<SchedulerDatabase::RepackReportBatch>(privateRet.release()); + } + throw NoRepackReportBatchFound("In OStoreDB::getNextSuccessfulRetrieveRepackReportBatch(): no report found."); +} + +//------------------------------------------------------------------------------ +// OStoreDB::getNextSuccessfulRetrieveRepackReportBatch() +//------------------------------------------------------------------------------ +void OStoreDB::RepackRetrieveSuccessesReportBatch::report(log::LogContext& lc) { + // We have a batch of popped requests to report. We will first record them in the repack requests (update statistics), + // then transform the requests (retrieve to archives) and finally queue the archive jobs in the right queues. + // As usual there are many opportunities for failure. + utils::Timer t; + log::TimingList timingList; + + // 1) Update statistics. As the repack request is protected against double reporting, we can release its lock + // before the next step. + { + // Prepare the report + objectstore::RepackRequest::SubrequestStatistics::List ssl; + for (auto &rr: m_subrequestList) { + ssl.push_back(objectstore::RepackRequest::SubrequestStatistics()); + ssl.back().bytes = rr.archiveFile.fileSize; + ssl.back().files = 1; + ssl.back().fSeq = rr.repackInfo.fSeq; + } + // Record it. + timingList.insertAndReset("successStatsPrepareTime", t); + objectstore::ScopedExclusiveLock rrl(m_repackRequest); + timingList.insertAndReset("successStatsLockTime", t); + m_repackRequest.fetch(); + timingList.insertAndReset("successStatsFetchTime", t); + m_repackRequest.reportRetriveSuccesses(ssl); + timingList.insertAndReset("successStatsUpdateTime", t); + m_repackRequest.commit(); + timingList.insertAndReset("successStatsCommitTime", t); + } + + // 2) We should async transform the retrieve requests into archive requests. + // From this point on, failing to transform is counted as a failure to archive. + { + objectstore::RepackRequest::SubrequestStatistics::List failedArchiveSSL; + std::list<SubrequestInfo *> failedSubrequests; + struct AsyncTransformerAndReq { + SubrequestInfo & subrequestInfo; + std::unique_ptr<objectstore::RetrieveRequest::AsyncRetrieveToArchiveTransformer> transformer; + }; + std::list<AsyncTransformerAndReq> asyncTransformsAndReqs; + for (auto &rr: m_subrequestList) { + try { + asyncTransformsAndReqs.push_back({ + rr, + std::unique_ptr<objectstore::RetrieveRequest::AsyncRetrieveToArchiveTransformer>( + rr.subrequest->asyncTransformToArchiveRequest(*m_oStoreDb.m_agentReference) + ) + }); + } catch (exception::Exception & ex) { + // We failed to archive the file (to create the request, in fact). So all the copyNbs + // can be counted as failed. + for (auto cnbtr: rr.repackInfo.copyNbsToRearchive) { + failedArchiveSSL.push_back(objectstore::RepackRequest::SubrequestStatistics()); + auto & fassl = failedArchiveSSL.back(); + fassl.bytes = rr.archiveFile.fileSize; + fassl.files = 1; + fassl.fSeq = rr.repackInfo.fSeq; + fassl.copyNb = cnbtr; + } + // We will need to delete the request too. + failedSubrequests.push_back(&rr); + // Log the error + log::ScopedParamContainer params(lc); + params.add("fileId", rr.archiveFile.archiveFileID) + .add("subrequestAddress", rr.subrequest->getAddressIfSet()) + .add("exceptionMsg", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): failed to asyncTransformToArchiveRequest()."); + } + } + timingList.insertAndReset("asyncTransformLaunchTime", t); + + // 2. b. Deal with transformation results (and log the transformation... + for (auto &atar: asyncTransformsAndReqs) { + try { + atar.transformer->wait(); + // Log the transformation + log::ScopedParamContainer params(lc); + params.add("fileId", atar.subrequestInfo.archiveFile.archiveFileID) + .add("subrequestAddress", atar.subrequestInfo.subrequest->getAddressIfSet()); + lc.log(log::INFO, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(), turned successful retrieve request in archive request."); + } catch (exception::Exception & ex) { + // We failed to archive the file (to create the request, in fact). So all the copyNbs + // can be counted as failed. + for (auto cnbtr: atar.subrequestInfo.repackInfo.copyNbsToRearchive) { + failedArchiveSSL.push_back(objectstore::RepackRequest::SubrequestStatistics()); + auto & fassl = failedArchiveSSL.back(); + fassl.bytes = atar.subrequestInfo.archiveFile.fileSize; + fassl.files = 1; + fassl.fSeq = atar.subrequestInfo.repackInfo.fSeq; + fassl.copyNb = cnbtr; + } + // We will need to delete the request too. + failedSubrequests.push_back(&atar.subrequestInfo); + // Log the error + log::ScopedParamContainer params(lc); + params.add("fileId", atar.subrequestInfo.archiveFile.archiveFileID) + .add("subrequestAddress", atar.subrequestInfo.subrequest->getAddressIfSet()) + .add("exceptionMsg", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): async transformation failed on wait()."); + } + } + timingList.insertAndReset("asyncTransformCompletionTime", t); + + // 3) Deal with transformation failures (and delete the requests) : + // - record the deletion intent in + status in repack request results + // - async delete + // - wait deletions + if (failedSubrequests.size()) { + // Record the stats (before deleting the requests, otherwise we could leak some counting + // in case of failure). + objectstore::ScopedExclusiveLock rrl(m_repackRequest); + timingList.insertAndReset("failureStatsLockTime", t); + m_repackRequest.fetch(); + timingList.insertAndReset("failureStatsFetchTime", t); + m_repackRequest.reportArchiveFailures(failedArchiveSSL); + timingList.insertAndReset("failureStatsUpdateTime", t); + m_repackRequest.commit(); + timingList.insertAndReset("failureStatsCommitTime", t); + // And now delete the requests + struct AsyncDeleteAndReq { + SubrequestInfo & subrequestInfo; + std::unique_ptr<RetrieveRequest::AsyncJobDeleter> deleter; + }; + std::list<std::string> retrieveRequestsToUnown; + std::list<AsyncDeleteAndReq> asyncDeleterAndReqs; + for (auto &fs: failedSubrequests) { + // This is the end of error handling. If we fail to delete a request, so be it. + retrieveRequestsToUnown.push_back(fs->subrequest->getAddressIfSet()); + try { + asyncDeleterAndReqs.push_back({*fs, + std::unique_ptr<RetrieveRequest::AsyncJobDeleter>(fs->subrequest->asyncDeleteJob())}); + } catch (cta::exception::Exception &ex) { + // Log the failure to delete. + log::ScopedParamContainer params(lc); + params.add("fileId", fs->archiveFile.archiveFileID) + .add("subrequestAddress", fs->subrequest->getAddressIfSet()) + .add("excepitonMsg", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): failed to asyncDelete() retrieve request."); + } + } + timingList.insertAndReset("asyncDeleteRetrieveLaunchTime", t); + for (auto &adar: asyncDeleterAndReqs) { + try { + adar.deleter->wait(); + // Log the deletion + log::ScopedParamContainer params(lc); + params.add("fileId", adar.subrequestInfo.archiveFile.archiveFileID) + .add("subrequestAddress", adar.subrequestInfo.subrequest->getAddressIfSet()); + lc.log(log::INFO, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): deleted retrieve request after failure to transform in archive request."); + } catch (cta::exception::Exception & ex) { + // Log the failure to delete. + log::ScopedParamContainer params(lc); + params.add("fileId", adar.subrequestInfo.archiveFile.archiveFileID) + .add("subrequestAddress", adar.subrequestInfo.subrequest->getAddressIfSet()) + .add("excepitonMsg", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::RepackRetrieveSuccessesReportBatch::report(): async deletion of retrieve request failed on wait()."); + } + } + timingList.insertAndReset("asyncDeleteRetrieveWaitTime", t); + m_oStoreDb.m_agentReference->removeBatchFromOwnership(retrieveRequestsToUnown, m_oStoreDb.m_objectStore); + timingList.insertAndReset("removeDeletedRetrieveFromOwnershipTime", t); + } + } + + // 3. We now just need to queue the freshly created archive jobs into their respective queues + // XXX: TODO +} + //------------------------------------------------------------------------------ // OStoreDB::RepackRequest::getLastExpandedFSeq() //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index 5638b95b33c5adcfee246c952caa4739503dec67..7dff16c9e8a26cbca6baa0e573048f2bde98705e 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -396,8 +396,56 @@ public: * @return a unique_ptr holding the RepackRequest */ std::unique_ptr<SchedulerDatabase::RepackRequest> getNextRepackJobToExpand() override; + + class RepackRetrieveSuccessesReportBatch; + class RepackRetrieveFailureReportBatch; + class RepackArchiveSuccessesReportBatch; + class RepackArchiveFailureReportBatch; + friend class RepackRetrieveSuccessesReportBatch; + friend class RepackRetrieveFailureReportBatch; + friend class RepackArchiveSuccessesReportBatch; + friend class RepackArchiveFailureReportBatch; + /** + * Base class handling the commonalities + */ + class RepackReportBatch: public SchedulerDatabase::RepackReportBatch { + friend class OStoreDB; + friend class RepackRetrieveSuccessesReportBatch; + friend class RepackRetrieveFailureReportBatch; + friend class RepackArchiveSuccessesReportBatch; + friend class RepackArchiveFailureReportBatch; + RepackReportBatch(objectstore::Backend & backend, OStoreDB & oStoreDb): m_repackRequest(backend), m_oStoreDb(oStoreDb) {} + protected: + objectstore::RepackRequest m_repackRequest; + OStoreDB & m_oStoreDb; + template <class SR> + struct SubrequestInfo { + /// CopyNb is only useful for archive requests where we want to distinguish several jobs. + uint32_t archivedCopyNb = 0; + std::shared_ptr<SR> subrequest; + common::dataStructures::ArchiveFile archiveFile; + objectstore::RetrieveRequest::RepackInfo repackInfo; + typedef std::list<SubrequestInfo> List; + }; + }; - std::unique_ptr<RepackReportBatch> getNextRepackReportBatch(log::LogContext& lc) override; + class RepackRetrieveSuccessesReportBatch: public RepackReportBatch { + friend class OStoreDB; + RepackRetrieveSuccessesReportBatch(objectstore::Backend & backend, OStoreDB & oStoreDb): + RepackReportBatch(backend,oStoreDb) {} + public: + void report(log::LogContext& lc) override; + private: + typedef RepackReportBatch::SubrequestInfo<objectstore::RetrieveRequest> SubrequestInfo; + SubrequestInfo::List m_subrequestList; + }; + + std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextRepackReportBatch(log::LogContext& lc) override; +private: + CTA_GENERATE_EXCEPTION_CLASS(NoRepackReportBatchFound); + const size_t c_repackReportBatchSize = 500; + std::unique_ptr<SchedulerDatabase::RepackReportBatch> getNextSuccessfulRetrieveRepackReportBatch(log::LogContext& lc); +public: /* === Drive state handling ============================================== */ /**