diff --git a/common/dataStructures/ArchiveFile.cpp b/common/dataStructures/ArchiveFile.cpp index b302c57b729ad8250c839b29270ecd264f7b871f..c8ee856db28d35af4805bcfe72b2c004bfff4fdb 100644 --- a/common/dataStructures/ArchiveFile.cpp +++ b/common/dataStructures/ArchiveFile.cpp @@ -59,7 +59,7 @@ bool ArchiveFile::operator!=(const ArchiveFile &rhs) const { //------------------------------------------------------------------------------ // operator<< //------------------------------------------------------------------------------ -std::ostream &operator<<(std::ostream &os, ArchiveFile &obj) { +std::ostream &operator<<(std::ostream &os, const ArchiveFile &obj) { os << "{" "archiveFileID=" << obj.archiveFileID << "," @@ -69,8 +69,8 @@ std::ostream &operator<<(std::ostream &os, ArchiveFile &obj) { "checksumType=" << obj.checksumType << "," "checksumValue=" << obj.checksumValue << "," "storageClass=" << obj.storageClass << "," - "diskFileInfo=" << obj.diskFileInfo << "," - "tapeFiles=" << obj.tapeFiles << "," + "diskFileInfo=" << obj.diskFileInfo << "," + "tapeFiles=" << obj.tapeFiles << "," "creationTime=" << obj.creationTime << "," "reconciliationTime=" << obj.reconciliationTime << "}"; diff --git a/common/dataStructures/ArchiveFile.hpp b/common/dataStructures/ArchiveFile.hpp index 725b83aa7c7aaa9a566cecdbcc02951242399187..f2875077e5eb02a5ee0e6d3ac82eaf026af19ca5 100644 --- a/common/dataStructures/ArchiveFile.hpp +++ b/common/dataStructures/ArchiveFile.hpp @@ -68,8 +68,8 @@ struct ArchiveFile { * to be listed by the operator. For example, if the listing requested is * for a single tape, the map will contain only one element. */ - std::map<uint64_t,TapeFile> tapeFiles; - time_t creationTime; + std::map<uint32_t,TapeFile> tapeFiles; + time_t creationTime; time_t reconciliationTime; }; // struct ArchiveFile diff --git a/common/dataStructures/ArchiveRoute.hpp b/common/dataStructures/ArchiveRoute.hpp index 6f668cede55dee8eeedc81c57c5be95743ee6be8..454af64f406b8a962537e1f46596de3a05c10052 100644 --- a/common/dataStructures/ArchiveRoute.hpp +++ b/common/dataStructures/ArchiveRoute.hpp @@ -61,6 +61,9 @@ struct ArchiveRoute { EntryLog creationLog; EntryLog lastModificationLog; std::string comment; + + typedef std::map<uint32_t, ArchiveRoute> StorageClassMap; + typedef std::map<std::tuple<std::string/*disk instance*/, std::string /*storage class*/>, StorageClassMap> FullMap; }; // struct ArchiveRoute diff --git a/common/dataStructures/RepackInfo.hpp b/common/dataStructures/RepackInfo.hpp index 4d2f47c55b09655db3e20a25310b4df696f7ec45..aeb5ea92fda96a979812fca93b7a404cc7875670 100644 --- a/common/dataStructures/RepackInfo.hpp +++ b/common/dataStructures/RepackInfo.hpp @@ -30,6 +30,7 @@ namespace dataStructures { struct RepackInfo { std::string vid; + std::string repackBufferBaseURL; enum class Type { ExpandAndRepack, ExpandOnly, diff --git a/common/dataStructures/RetrieveJob.hpp b/common/dataStructures/RetrieveJob.hpp index 188ddda69da6182c99e596fc219ca8e94fdb3266..c1958bc656ce7d5f71b4e6a108ceefb3e93b9371 100644 --- a/common/dataStructures/RetrieveJob.hpp +++ b/common/dataStructures/RetrieveJob.hpp @@ -43,7 +43,7 @@ struct RetrieveJob { RetrieveRequest request; uint64_t fileSize; - std::map<std::string,std::pair<uint64_t,TapeFile>> tapeCopies; + std::map<std::string,std::pair<uint32_t,TapeFile>> tapeCopies; std::list<std::string> failurelogs; }; // struct RetrieveJob diff --git a/common/dataStructures/RetrieveRequest.cpp b/common/dataStructures/RetrieveRequest.cpp index 35154d638f6db0dda4b8a70efdce0aa5967abca6..43d170d5de4d015bd71c2501caa6afbf28df14a7 100644 --- a/common/dataStructures/RetrieveRequest.cpp +++ b/common/dataStructures/RetrieveRequest.cpp @@ -27,7 +27,7 @@ namespace dataStructures { //------------------------------------------------------------------------------ // constructor //------------------------------------------------------------------------------ -RetrieveRequest::RetrieveRequest(): archiveFileID(0),isRepack(false) {} +RetrieveRequest::RetrieveRequest(): archiveFileID(0) {} //------------------------------------------------------------------------------ // operator== @@ -37,8 +37,7 @@ bool RetrieveRequest::operator==(const RetrieveRequest &rhs) const { && archiveFileID==rhs.archiveFileID && dstURL==rhs.dstURL && diskFileInfo==rhs.diskFileInfo - && creationLog==rhs.creationLog - && isRepack == rhs.isRepack; + && creationLog==rhs.creationLog; } //------------------------------------------------------------------------------ @@ -56,8 +55,7 @@ std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj) { << " archiveFileID=" << obj.archiveFileID << " dstURL=" << obj.dstURL << " diskFileInfo=" << obj.diskFileInfo - << " creationLog=" << obj.creationLog - << " isRepack=" << obj.isRepack<<")"; + << " creationLog=" << obj.creationLog <<")"; return os; } diff --git a/common/dataStructures/RetrieveRequest.hpp b/common/dataStructures/RetrieveRequest.hpp index a4525452865899d736ed85af55bef9049fb7542c..f319ec08a0675b43b6dc4fc57713df41c5746127 100644 --- a/common/dataStructures/RetrieveRequest.hpp +++ b/common/dataStructures/RetrieveRequest.hpp @@ -26,6 +26,7 @@ #include "common/dataStructures/DiskFileInfo.hpp" #include "common/dataStructures/EntryLog.hpp" #include "common/dataStructures/UserIdentity.hpp" +#include "common/dataStructures/ArchiveRoute.hpp" namespace cta { namespace common { @@ -48,8 +49,6 @@ struct RetrieveRequest { std::string errorReportURL; DiskFileInfo diskFileInfo; EntryLog creationLog; - bool isRepack; - std::string tapePool; }; // struct RetrieveRequest std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj); diff --git a/common/dataStructures/utils.cpp b/common/dataStructures/utils.cpp index 181052e7516292b76e1c2339e639f4ac355ecae9..689026cd7ca999cfe201dd942b2086c9486618b2 100644 --- a/common/dataStructures/utils.cpp +++ b/common/dataStructures/utils.cpp @@ -22,7 +22,7 @@ namespace cta { namespace common { namespace dataStructures { -std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,TapeFile> &map) { +std::ostream &operator<<(std::ostream &os, const std::map<uint32_t,TapeFile> &map) { os << "("; for(auto it = map.begin(); it != map.end(); it++) { os << " key=" << it->first << " value=" << it->second << " "; @@ -54,7 +54,7 @@ std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,std::pair<std return os; } -std::ostream &operator<<(std::ostream &os, const std::map<std::string,std::pair<uint64_t,TapeFile>> &map) { +std::ostream &operator<<(std::ostream &os, const std::map<std::string,std::pair<uint32_t,TapeFile>> &map) { os << "("; for(auto it = map.begin(); it != map.end(); it++) { os << " key=" << it->first << " value.first=" << it->second.first << " value.second=" << it->second.second; diff --git a/common/dataStructures/utils.hpp b/common/dataStructures/utils.hpp index b7fb7ea76e334129637c06007e192678b74102f4..c3e989a42d4c198ae7ecd8595823986fc42a08e9 100644 --- a/common/dataStructures/utils.hpp +++ b/common/dataStructures/utils.hpp @@ -27,11 +27,11 @@ namespace cta { namespace common { namespace dataStructures { -std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,TapeFile> &map); +std::ostream &operator<<(std::ostream &os, const std::map<uint32_t,TapeFile> &map); std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,std::string> &map); std::ostream &operator<<(std::ostream &os, const std::pair<std::string,std::string> &pair); std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,std::pair<std::string,std::string>> &map); -std::ostream &operator<<(std::ostream &os, const std::map<std::string,std::pair<uint64_t,TapeFile>> &map); +std::ostream &operator<<(std::ostream &os, const std::map<std::string,std::pair<uint32_t,TapeFile>> &map); std::ostream &operator<<(std::ostream &os, const std::map<uint64_t,std::pair<std::string,std::string>> &map); } // namespace dataStructures diff --git a/objectstore/Helpers.cpp b/objectstore/Helpers.cpp index 4e3328961fd1bac094372e10a4fe50c850941d42..cef85a9fb649b077840ea9949080dee964c50c4f 100644 --- a/objectstore/Helpers.cpp +++ b/objectstore/Helpers.cpp @@ -197,6 +197,8 @@ void Helpers::getLockedAndFetchedJobQueue<RetrieveQueue>(RetrieveQueue& retrieve queueFetchTime = t.secs(utils::Timer::resetCounter); log::ScopedParamContainer params(lc); params.add("attemptNb", i+1) + .add("queueName", vid.value()) + .add("queueType", toString(queueType)) .add("queueObject", retrieveQueue.getAddressIfSet()) .add("rootFetchNoLockTime", rootFetchNoLockTime) .add("rootRelockExclusiveTime", rootRelockExclusiveTime) diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index 5fe5868fdeef8eb23cdfdb3fef4a9c6d68970518..aceb9d623037c760c513ebb87833858bf8301ec3 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -496,6 +496,7 @@ public: AsyncInserter * asyncInsert() { std::unique_ptr<AsyncInserter> ret; + ret.reset(new AsyncInserter(*this)); // Current simplification: the parsing of the header/payload is synchronous. // This could be delegated to the backend. // Check that we are not dealing with an existing object diff --git a/objectstore/RepackRequest.cpp b/objectstore/RepackRequest.cpp index 979891c5f0f81ec4b39d936895dead24fc4dac2d..bf6e210984e40efa8ee8b46caf6a577f07ed3542 100644 --- a/objectstore/RepackRequest.cpp +++ b/objectstore/RepackRequest.cpp @@ -59,7 +59,7 @@ void RepackRequest::initialize() { m_payload.set_retrievedbytes(0); m_payload.set_archivedfiles(0); m_payload.set_archivedbytes(0); - m_payload.set_failedtoretievefiles(0); + m_payload.set_failedtoretrievefiles(0); m_payload.set_failedtoretrievebytes(0); m_payload.set_failedtoarchivefiles(0); m_payload.set_failedtoarchivebytes(0); @@ -98,6 +98,16 @@ void RepackRequest::setType(common::dataStructures::RepackInfo::Type repackType) } } +//------------------------------------------------------------------------------ +// RepackRequest::setStatus() +//------------------------------------------------------------------------------ +void RepackRequest::setStatus(common::dataStructures::RepackInfo::Status repackStatus) { + checkPayloadWritable(); + // common::dataStructures::RepackInfo::Status and serializers::RepackRequestStatus are defined using the same values, + // hence the cast. + m_payload.set_status((serializers::RepackRequestStatus)repackStatus); +} + //------------------------------------------------------------------------------ // RepackRequest::getInfo() //------------------------------------------------------------------------------ @@ -107,6 +117,7 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() { RepackInfo ret; ret.vid = m_payload.vid(); ret.status = (RepackInfo::Status) m_payload.status(); + ret.repackBufferBaseURL = m_payload.buffer_url(); if (m_payload.repackmode()) { if (m_payload.expandmode()) { ret.type = RepackInfo::Type::ExpandAndRepack; @@ -121,6 +132,14 @@ common::dataStructures::RepackInfo RepackRequest::getInfo() { return ret; } +//------------------------------------------------------------------------------ +// RepackRequest::setBufferURL() +//------------------------------------------------------------------------------ +void RepackRequest::setBufferURL(const std::string& bufferURL) { + checkPayloadWritable(); + m_payload.set_buffer_url(bufferURL); +} + //------------------------------------------------------------------------------ // RepackRequest::RepackSubRequestPointer::serialize() //------------------------------------------------------------------------------ @@ -148,7 +167,7 @@ void RepackRequest::RepackSubRequestPointer::deserialize(const serializers::Repa //------------------------------------------------------------------------------ // RepackRequest::getOrPrepareSubrequestInfo() //------------------------------------------------------------------------------ -auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint32_t> fSeqs, AgentReference& agentRef) +auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint64_t> fSeqs, AgentReference& agentRef) -> SubrequestInfo::set { checkPayloadWritable(); RepackSubRequestPointer::Map pointerMap; @@ -165,7 +184,7 @@ auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint32_t> fSeqs, AgentRe retInfo.fSeq = srp.fSeq; retInfo.subrequestDeleted = srp.subrequestDeleted; } catch (std::out_of_range &) { - retInfo.address = agentRef.nextId("repackSubRequest"); + retInfo.address = agentRef.nextId("RepackSubRequest"); retInfo.fSeq = fs; retInfo.subrequestDeleted = false; auto & p = pointerMap[fs]; @@ -179,7 +198,7 @@ auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint32_t> fSeqs, AgentRe // Record changes, if any. if (newElementCreated) { m_payload.mutable_subrequests()->Clear(); - for (auto & p: pointerMap) p.second.deserialize(*m_payload.mutable_subrequests()->Add()); + for (auto & p: pointerMap) p.second.serialize(*m_payload.mutable_subrequests()->Add()); } return ret; } @@ -188,7 +207,7 @@ auto RepackRequest::getOrPrepareSubrequestInfo(std::set<uint32_t> fSeqs, AgentRe // RepackRequest::setLastExpandedFSeq() //------------------------------------------------------------------------------ void RepackRequest::setLastExpandedFSeq(uint64_t lastExpandedFSeq) { - checkWritable(); + checkPayloadWritable(); m_payload.set_lastexpandedfseq(lastExpandedFSeq); } @@ -243,7 +262,7 @@ void RepackRequest::reportRetriveFailures(SubrequestStatistics::List& retrieveFa if (!p.failureAccounted) { p.failureAccounted = true; m_payload.set_failedtoretrievebytes(m_payload.failedtoretrievebytes() + rs.bytes); - m_payload.set_failedtoretievefiles(m_payload.failedtoretievefiles() + rs.files); + m_payload.set_failedtoretrievefiles(m_payload.failedtoretrievefiles() + rs.files); didUpdate = true; } } catch (std::out_of_range &) { @@ -338,6 +357,30 @@ void RepackRequest::reportSubRequestsForDeletion(std::list<uint64_t>& fSeqs) { } } +//------------------------------------------------------------------------------ +// RepackRequest::reportSubRequestsForDeletion() +//------------------------------------------------------------------------------ +auto RepackRequest::getStats() -> std::map<StatsType, StatsValues> { + checkPayloadReadable(); + std::map<StatsType, StatsValues> ret; + ret[StatsType::ArchiveTotal].files = m_payload.totalfilestoarchive(); + ret[StatsType::ArchiveTotal].bytes = m_payload.totalbytestoarchive(); + ret[StatsType::RetrieveTotal].files = m_payload.totalfilestoretrieve(); + ret[StatsType::RetrieveTotal].bytes = m_payload.totalbytestoretrieve(); + ret[StatsType::UserProvided].files = m_payload.userprovidedfiles(); + ret[StatsType::UserProvided].bytes = m_payload.userprovidedbytes(); + ret[StatsType::RetrieveFailure].files = m_payload.failedtoretrievefiles(); + ret[StatsType::RetrieveFailure].bytes = m_payload.failedtoretrievebytes(); + ret[StatsType::RetrieveSuccess].files = m_payload.retrievedfiles(); + ret[StatsType::RetrieveSuccess].bytes = m_payload.retrievedbytes(); + ret[StatsType::ArchiveFailure].files = m_payload.failedtoarchivefiles(); + ret[StatsType::ArchiveFailure].bytes = m_payload.failedtoarchivebytes(); + ret[StatsType::ArchiveSuccess].files = m_payload.archivedfiles(); + ret[StatsType::ArchiveSuccess].bytes = m_payload.archivedbytes(); + return ret; +} + + //------------------------------------------------------------------------------ // RepackRequest::garbageCollect() //------------------------------------------------------------------------------ @@ -387,6 +430,7 @@ RepackRequest::AsyncOwnerAndStatusUpdater* RepackRequest::asyncUpdateOwnerAndSta typedef common::dataStructures::RepackInfo RepackInfo; retRef.m_repackInfo.status = (RepackInfo::Status) payload.status(); retRef.m_repackInfo.vid = payload.vid(); + retRef.m_repackInfo.repackBufferBaseURL = payload.buffer_url(); if (payload.repackmode()) { if (payload.expandmode()) { retRef.m_repackInfo.type = RepackInfo::Type::ExpandAndRepack; diff --git a/objectstore/RepackRequest.hpp b/objectstore/RepackRequest.hpp index 1cf2a9a3b00448e14734d9553647fdd1264d6e64..8513caa2af5bdd6473a1f8ebfd3a3b037f7fa458 100644 --- a/objectstore/RepackRequest.hpp +++ b/objectstore/RepackRequest.hpp @@ -41,6 +41,7 @@ public: void setType(common::dataStructures::RepackInfo::Type repackType); void setStatus(common::dataStructures::RepackInfo::Status repackStatus); common::dataStructures::RepackInfo getInfo(); + void setBufferURL(const std::string & bufferURL); // Sub request management struct SubrequestInfo { @@ -59,7 +60,7 @@ public: * yet not update the object to reflect the last fSeq created. * This function implicitly records the information it generates (commit up t the caller); */ - SubrequestInfo::set getOrPrepareSubrequestInfo (std::set<uint32_t> fSeqs, AgentReference & agentRef); + SubrequestInfo::set getOrPrepareSubrequestInfo (std::set<uint64_t> fSeqs, AgentReference & agentRef); private: struct RepackSubRequestPointer { std::string address; @@ -90,6 +91,20 @@ public: void reportArchiveSuccesses (SubrequestStatistics::List & archiveSuccesses); void reportArchiveFailures (SubrequestStatistics::List & archiveFailures); void reportSubRequestsForDeletion (std::list<uint64_t>& fSeqs); + enum class StatsType: uint8_t { + UserProvided, + RetrieveSuccess, + RetrieveFailure, + RetrieveTotal, + ArchiveSuccess, + ArchiveFailure, + ArchiveTotal, + }; + struct StatsValues { + uint64_t files = 0; + uint64_t bytes = 0; + }; + std::map<StatsType, StatsValues> getStats(); void garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc, cta::catalogue::Catalogue & catalogue) override; diff --git a/objectstore/RetrieveQueueAlgorithms.hpp b/objectstore/RetrieveQueueAlgorithms.hpp index 02532eb0a295749f65925c9855f7a80c359ea4ad..1cb0627f45442cd1a7a1887668d95a170d900ecf 100644 --- a/objectstore/RetrieveQueueAlgorithms.hpp +++ b/objectstore/RetrieveQueueAlgorithms.hpp @@ -55,6 +55,7 @@ struct ContainerTraits<RetrieveQueue,C> common::dataStructures::RetrieveRequest rr; std::string errorReportURL; SchedulerDatabase::RetrieveJob::ReportType reportType; + RetrieveRequest::RepackInfo repackInfo; }; struct PoppedElementsSummary; struct PopCriteria { @@ -382,6 +383,7 @@ switchElementsOwnership(PoppedElementsBatch &poppedElementBatch, const Container u.get()->wait(); e.archiveFile = u.get()->getArchiveFile(); e.rr = u.get()->getRetrieveRequest(); + e.repackInfo = u.get()->getRepackInfo(); switch(u.get()->getJobStatus()) { case serializers::RetrieveJobStatus::RJS_ToReportToUserForFailure: e.reportType = SchedulerDatabase::RetrieveJob::ReportType::FailureReport; @@ -486,7 +488,8 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, cjfq.size, common::dataStructures::ArchiveFile(), common::dataStructures::RetrieveRequest(), - "", SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired + "", SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired, + RetrieveRequest::RepackInfo() }); ret.summary.files++; } diff --git a/objectstore/RetrieveQueueToTransferAlgorithms.cpp b/objectstore/RetrieveQueueToTransferAlgorithms.cpp index 2367b0920e4628a616174dad305bbc504a1e2359..2d80b46b33cf75d87886f1b2572b3f7739547096 100644 --- a/objectstore/RetrieveQueueToTransferAlgorithms.cpp +++ b/objectstore/RetrieveQueueToTransferAlgorithms.cpp @@ -55,7 +55,8 @@ getPoppingElementsCandidates(Container &cont, PopCriteria &unfulfilledCriteria, common::dataStructures::ArchiveFile(), common::dataStructures::RetrieveRequest(), "", - SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired + SchedulerDatabase::RetrieveJob::ReportType::NoReportRequired, + RetrieveRequest::RepackInfo() }); ret.summary.bytes += cjfq.size; ret.summary.files++; diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 5bedfaa4735814019f1a0ffd77ee009fd9c942eb..a46aa7358df853790872cbb5de37b5ca727b91ee 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -58,7 +58,6 @@ void RetrieveRequest::initialize() { m_payload.set_failurereportlog(""); m_payload.set_failurereporturl(""); m_payload.set_isrepack(false); - m_payload.set_tapepool(""); // This object is good to go (to storage) m_payloadInterpreted = true; } @@ -372,7 +371,6 @@ std::string RetrieveRequest::getLastActiveVid() { return m_payload.archivefile().tapefiles(0).vid(); } - //------------------------------------------------------------------------------ // RetrieveRequest::setSchedulerRequest() //------------------------------------------------------------------------------ @@ -402,8 +400,6 @@ cta::common::dataStructures::RetrieveRequest RetrieveRequest::getSchedulerReques objectstore::EntryLogSerDeser el(ret.creationLog); el.deserialize(m_payload.schedulerrequest().entrylog()); ret.dstURL = m_payload.schedulerrequest().dsturl(); - ret.isRepack = m_payload.isrepack(); - ret.tapePool = m_payload.tapepool(); ret.errorReportURL = m_payload.schedulerrequest().retrieveerrorreporturl(); objectstore::DiskFileInfoSerDeser dfisd; dfisd.deserialize(m_payload.schedulerrequest().diskfileinfo()); @@ -516,6 +512,37 @@ bool RetrieveRequest::addJobFailure(uint32_t copyNumber, uint64_t mountId, throw NoSuchJob ("In RetrieveRequest::addJobFailure(): could not find job"); } +//------------------------------------------------------------------------------ +// RetrieveRequest::setRepackInfo() +//------------------------------------------------------------------------------ +void RetrieveRequest::setRepackInfo(const RepackInfo& repackInfo) { + checkPayloadWritable(); + m_payload.set_isrepack(repackInfo.isRepack); + if (repackInfo.isRepack) { + for (auto & ar: repackInfo.archiveRouteMap) { + auto * plar=m_payload.mutable_repack_info()->mutable_archive_routes()->Add(); + plar->set_copynb(ar.first); + plar->set_tapepool(ar.second); + } + for (auto cntr: repackInfo.copyNbsToRearchive) { + m_payload.mutable_repack_info()->mutable_copy_nbs_to_rearchive()->Add(cntr); + } + m_payload.mutable_repack_info()->set_file_buffer_url(repackInfo.fileBufferURL); + m_payload.mutable_repack_info()->set_repack_request_address(repackInfo.repackRequestAddress); + } +} + +//------------------------------------------------------------------------------ +// RetrieveRequest::getRepackInfo() +//------------------------------------------------------------------------------ +RetrieveRequest::RepackInfo RetrieveRequest::getRepackInfo() { + checkPayloadReadable(); + RepackInfoSerDeser ret; + if (m_payload.isrepack()) + ret.deserialize(m_payload.repack_info()); + return ret; +} + //------------------------------------------------------------------------------ // RetrieveRequest::getRetryStatus() //------------------------------------------------------------------------------ @@ -744,8 +771,6 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string dfi.deserialize(payload.schedulerrequest().diskfileinfo()); retRef.m_retrieveRequest.diskFileInfo = dfi; retRef.m_retrieveRequest.dstURL = payload.schedulerrequest().dsturl(); - retRef.m_retrieveRequest.isRepack = payload.isrepack(); - retRef.m_retrieveRequest.tapePool = payload.tapepool(); retRef.m_retrieveRequest.errorReportURL = payload.schedulerrequest().retrieveerrorreporturl(); retRef.m_retrieveRequest.requester.name = payload.schedulerrequest().requester().name(); retRef.m_retrieveRequest.requester.group = payload.schedulerrequest().requester().group(); @@ -753,6 +778,18 @@ auto RetrieveRequest::asyncUpdateJobOwner(uint32_t copyNumber, const std::string af.deserialize(payload.archivefile()); retRef.m_archiveFile = af; retRef.m_jobStatus = j.status(); + if (payload.isrepack()) { + RetrieveRequest::RepackInfo & ri = retRef.m_repackInfo; + for (auto &ar: payload.repack_info().archive_routes()) { + ri.archiveRouteMap[ar.copynb()] = ar.tapepool(); + } + for (auto cntr: payload.repack_info().copy_nbs_to_rearchive()) { + ri.copyNbsToRearchive.insert(cntr); + } + ri.fileBufferURL = payload.repack_info().file_buffer_url(); + ri.isRepack = true; + ri.repackRequestAddress = payload.repack_info().repack_request_address(); + } // TODO serialization of payload maybe not necessary oh.set_payload(payload.SerializePartialAsString()); return oh.SerializeAsString(); @@ -779,6 +816,13 @@ const common::dataStructures::ArchiveFile& RetrieveRequest::AsyncJobOwnerUpdater return m_archiveFile; } +//------------------------------------------------------------------------------ +// RetrieveRequest::AsyncJobOwnerUpdater::getRepackInfo() +//------------------------------------------------------------------------------ +const RetrieveRequest::RepackInfo& RetrieveRequest::AsyncJobOwnerUpdater::getRepackInfo() { + return m_repackInfo; +} + //------------------------------------------------------------------------------ // RetrieveRequest::AsyncJobOwnerUpdater::getRetrieveRequest() //------------------------------------------------------------------------------ @@ -970,24 +1014,29 @@ RetrieveRequest::AsyncRetrieveToArchiveTransformer * RetrieveRequest::asyncTrans //TODO : Should creation log just be initialized or should it be copied from the retrieveRequest ? cta::objectstore::serializers::EntryLog *archiveRequestCL = archiveRequestPayload.mutable_creationlog(); archiveRequestCL->CopyFrom(retrieveRequestMP.creationlog()); - //Add the jobs of the old RetrieveRequest to the new ArchiveRequest - for(auto retrieveJob: retrieveRequestPayload.jobs()){ + //Create archive jobs for each copyNb ro rearchive + RetrieveRequest::RepackInfoSerDeser repackInfoSerDeser; + repackInfoSerDeser.deserialize(retrieveRequestPayload.repack_info()); + // TODO: for the moment we just clone the retrieve request's policy. + auto maxRetriesWithinMount = retrieveRequestPayload.jobs(0).maxretrieswithinmount(); + auto maxTotalRetries = retrieveRequestPayload.jobs(0).maxtotalretries(); + auto maxReportRetries = retrieveRequestPayload.jobs(0).maxreportretries(); + for(auto cntr: repackInfoSerDeser.copyNbsToRearchive) { auto *archiveJob = archiveRequestPayload.add_jobs(); archiveJob->set_status(cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForUser); - archiveJob->set_copynb(retrieveJob.copynb()); + archiveJob->set_copynb(cntr); archiveJob->set_archivequeueaddress(""); archiveJob->set_totalreportretries(0); archiveJob->set_lastmountwithfailure(0); archiveJob->set_totalretries(0); archiveJob->set_retrieswithinmount(0); - archiveJob->set_maxretrieswithinmount(retrieveJob.maxretrieswithinmount()); //TODO : should we put the same value as the retrieveJob ? + archiveJob->set_maxretrieswithinmount(maxRetriesWithinMount); //TODO : should we put the same value as the retrieveJob ? archiveJob->set_totalreportretries(0); - archiveJob->set_maxtotalretries(retrieveJob.maxtotalretries()); //TODO : should we put the same value as the retrieveJob ? - archiveJob->set_maxreportretries(retrieveJob.maxreportretries()); //TODO : should we put the same value as the retrieveJob ? - archiveJob->set_tapepool(retrieveRequestPayload.tapepool()); + archiveJob->set_maxtotalretries(maxTotalRetries); //TODO : should we put the same value as the retrieveJob ? + archiveJob->set_maxreportretries(maxReportRetries); //TODO : should we put the same value as the retrieveJob ? + archiveJob->set_tapepool(repackInfoSerDeser.archiveRouteMap[cntr]); archiveJob->set_owner(processAgentAddress); } - //Serialize the new ArchiveRequest so that it replaces the RetrieveRequest oh.set_payload(archiveRequestPayload.SerializeAsString()); //Change the type of the RetrieveRequest to ArchiveRequest @@ -1044,24 +1093,4 @@ void RetrieveRequest::setJobStatus(uint32_t copyNumber, const serializers::Retri throw exception::Exception("In RetrieveRequest::setJobStatus(): job not found."); } -bool RetrieveRequest::isRepack(){ - checkPayloadReadable(); - return m_payload.isrepack(); -} - -void RetrieveRequest::setIsRepack(bool isRepack){ - checkPayloadWritable(); - m_payload.set_isrepack(isRepack); -} - -std::string RetrieveRequest::getTapePool(){ - checkPayloadReadable(); - return m_payload.tapepool(); -} - -void RetrieveRequest::setTapePool(const std::string tapePool) -{ - checkPayloadWritable(); - m_payload.set_tapepool(tapePool); -} }} // namespace cta::objectstore diff --git a/objectstore/RetrieveRequest.hpp b/objectstore/RetrieveRequest.hpp index 3a55e8706e30c32f135255abf328e31ae622e153..330642d08010c04c06399fc954b5d708d6d96689 100644 --- a/objectstore/RetrieveRequest.hpp +++ b/objectstore/RetrieveRequest.hpp @@ -143,11 +143,38 @@ public: //! The copy number to enqueue. It could be different from the updated one in mixed success/failure scenario. serializers::RetrieveJobStatus nextStatus; }; - bool isRepack(); - void setIsRepack(bool isRepack); - std::string getTapePool(); - void setTapePool(const std::string tapePool); + struct RepackInfo { + bool isRepack = false; + std::map<uint32_t, std::string> archiveRouteMap; + std::set<uint32_t> copyNbsToRearchive; + std::string repackRequestAddress; + std::string fileBufferURL; + }; + void setRepackInfo(const RepackInfo & repackInfo); + RepackInfo getRepackInfo(); + struct RepackInfoSerDeser: public RepackInfo { + operator RepackInfo() { return RepackInfo(*this); } + void serialize(cta::objectstore::serializers::RetrieveRequestRepackInfo & rrri) { + if (!isRepack) throw exception::Exception("In RetrieveRequest::RepackInfoSerDeser::serialize(): isRepack is false."); + for (auto &route: archiveRouteMap) { + auto * ar = rrri.mutable_archive_routes()->Add(); + ar->set_copynb(route.first); + ar->set_tapepool(route.second); + } + for (auto cntr: copyNbsToRearchive) rrri.mutable_copy_nbs_to_rearchive()->Add(cntr); + rrri.set_file_buffer_url(fileBufferURL); + rrri.set_repack_request_address(repackRequestAddress); + } + + void deserialize(const cta::objectstore::serializers::RetrieveRequestRepackInfo & rrri) { + isRepack = true; + for(auto &route: rrri.archive_routes()) { archiveRouteMap[route.copynb()] = route.tapepool(); } + for(auto &cntr: rrri.copy_nbs_to_rearchive()) { copyNbsToRearchive.insert(cntr); } + fileBufferURL = rrri.file_buffer_url(); + repackRequestAddress = rrri.repack_request_address(); + } + }; private: /*! * Determine and set the new status of the job. @@ -185,11 +212,13 @@ public: serializers::RetrieveJobStatus getJobStatus() { return m_jobStatus; } const common::dataStructures::RetrieveRequest &getRetrieveRequest(); const common::dataStructures::ArchiveFile &getArchiveFile(); + const RepackInfo &getRepackInfo(); private: std::function<std::string(const std::string &)> m_updaterCallback; std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; common::dataStructures::RetrieveRequest m_retrieveRequest; common::dataStructures::ArchiveFile m_archiveFile; + RepackInfo m_repackInfo; serializers::RetrieveJobStatus m_jobStatus; }; // An owner updater factory. The owner MUST be previousOwner for the update to be executed. diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp index 43cce9b1259a99660755cc3c9c44a4568a63383f..7f8402094096e815f81f0e8178ca2dfb5f9a30cf 100644 --- a/objectstore/RootEntry.cpp +++ b/objectstore/RootEntry.cpp @@ -427,7 +427,8 @@ std::string RootEntry::getRetrieveQueueAddress(const std::string& vid, JobQueueT auto & rqp = serializers::findElement(retrieveQueuePointers(queueType), vid); return rqp.address(); } catch (serializers::NotFound &) { - throw NoSuchRetrieveQueue("In RootEntry::getRetreveQueueAddress: retrieve queue not allocated"); + throw NoSuchRetrieveQueue(std::string("In RootEntry::getRetreveQueueAddress: retrieve queue not allocated ")+ + vid+"/"+toString(queueType)); } } diff --git a/objectstore/cta.proto b/objectstore/cta.proto index a3b889779517b2224a988b7b57189a22cc706cc7..a8e17d816dca0db4fe2e8d8d150988398f5fda3e 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -367,6 +367,18 @@ message RetrieveJob { repeated string reportfailurelogs = 9210; } +message RetrieveRequestArchiveRoute { + required uint32 copynb = 9400; + required string tapepool = 9410; +} + +message RetrieveRequestRepackInfo { + repeated RetrieveRequestArchiveRoute archive_routes = 9500; + repeated uint32 copy_nbs_to_rearchive = 9510; + required string repack_request_address = 9520; + required string file_buffer_url = 9530; +} + message RetrieveRequest { required SchedulerRetrieveRequest schedulerrequest = 9150; required MountPolicy mountpolicy = 9151; @@ -375,8 +387,8 @@ message RetrieveRequest { repeated RetrieveJob jobs = 9154; required string failurereporturl = 9155; required string failurereportlog = 9156; - required bool isrepack = 9157; //In protobuf, default values for bool is false - optional string tapepool = 9158; + required bool isrepack = 9157; + optional RetrieveRequestRepackInfo repack_info = 9158; } message ValueCountPair { @@ -492,6 +504,7 @@ message RepackSubRequestPointer { message RepackRequest { required string vid = 11000; + required string buffer_url = 11005; required RepackRequestStatus status = 11010; required bool expandmode = 11400; required bool repackmode = 11410; @@ -507,7 +520,7 @@ message RepackRequest { required uint64 retrievedbytes = 11490; required uint64 archivedfiles = 11500; required uint64 archivedbytes = 11510; - required uint64 failedtoretievefiles = 11520; + required uint64 failedtoretrievefiles = 11520; required uint64 failedtoretrievebytes = 11530; required uint64 failedtoarchivefiles = 11540; required uint64 failedtoarchivebytes = 11550; diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 33d893edef5076124df8e078adfc2745dde132dd..db6b9abebcd64344213e924c0faa7c966ec4b54a 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -28,6 +28,7 @@ #include "objectstore/RepackRequest.hpp" #include "objectstore/RepackIndex.hpp" #include "objectstore/RepackQueue.hpp" +#include "objectstore/Sorter.hpp" #include "objectstore/Helpers.hpp" #include "common/exception/Exception.hpp" #include "common/utils/utils.hpp" @@ -1062,7 +1063,6 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR rReq->initialize(); rReq->setSchedulerRequest(rqst); rReq->setRetrieveFileQueueCriteria(criteria); - rReq->setTapePool(rqst.tapePool); // Find the job corresponding to the vid (and check we indeed have one). auto jobs = rReq->getJobs(); objectstore::RetrieveRequest::JobDump job; @@ -1088,8 +1088,6 @@ std::string OStoreDB::queueRetrieve(const cta::common::dataStructures::RetrieveR rReq->setOwner(m_agentReference->getAgentAddress()); // "Select" an arbitrary copy number. This is needed to serialize the object. rReq->setActiveCopyNumber(criteria.archiveFile.tapeFiles.begin()->second.copyNb); - rReq->setIsRepack(rqst.isRepack); - rReq->setTapePool(rqst.tapePool); rReq->insert(); double insertionTime = timer.secs(cta::utils::Timer::reset_t::resetCounter); m_taskQueueSize++; @@ -1294,6 +1292,7 @@ void OStoreDB::queueRepack(const std::string& vid, const std::string& bufferURL, rr->setOwner(m_agentReference->getAgentAddress()); rr->setVid(vid); rr->setType(repackType); + rr->setBufferURL(bufferURL); // Try to reference the object in the index (will fail if there is already a request with this VID. try { Helpers::registerRepackRequestToIndex(vid, rr->getAddressIfSet(), *m_agentReference, m_objectStore, lc); @@ -1511,14 +1510,270 @@ std::unique_ptr<SchedulerDatabase::RepackRequest> OStoreDB::getNextRepackJobToEx auto repackInfo = jobs.elements.front().repackInfo; //build the repackRequest with the repack infos std::unique_ptr<OStoreDB::RepackRequest> ret; - ret.reset(new OStoreDB::RepackRequest(repackRequest->getAddressIfSet(),*this)); + ret.reset(new OStoreDB::RepackRequest(repackRequest->getAddressIfSet(), *this)); ret->repackInfo.vid = repackInfo.vid; ret->repackInfo.type = repackInfo.type; ret->repackInfo.status = repackInfo.status; + ret->repackInfo.repackBufferBaseURL = repackInfo.repackBufferBaseURL; return std::move(ret); } } +//------------------------------------------------------------------------------ +// OStoreDB::RepackRequest::getLastExpandedFSeq() +//------------------------------------------------------------------------------ +uint64_t OStoreDB::RepackRequest::getLastExpandedFSeq() { + // We own the repack request, so we are only users of it. + m_repackRequest.fetchNoLock(); + return m_repackRequest.getLastExpandedFSeq(); +} + +//------------------------------------------------------------------------------ +// OStoreDB::RepackRequest::addSubrequests() +//------------------------------------------------------------------------------ +void OStoreDB::RepackRequest::addSubrequests(std::list<Subrequest>& repackSubrequests, cta::common::dataStructures::ArchiveRoute::FullMap& archiveRoutesMap, uint64_t maxFSeqLowBound, log::LogContext& lc) { + // We need to prepare retrieve requests names and reference them, create them, enqueue them. + objectstore::ScopedExclusiveLock rrl (m_repackRequest); + m_repackRequest.fetch(); + std::set<uint64_t> fSeqs; + for (auto rsr: repackSubrequests) fSeqs.insert(rsr.fSeq); + auto subrequestsNames = m_repackRequest.getOrPrepareSubrequestInfo(fSeqs, *m_oStoreDB.m_agentReference); + // We make sure the references to subrequests exist persistently before creating them. + m_repackRequest.commit(); + // We keep holding the repack request lock: we need to ensure de deleted boolean of each subrequest does + // not change while we attempt creating them (or we would face double creation). + // Sort the naming results in a fSeq->requestName map for easier access. + std::map<uint64_t, objectstore::RepackRequest::SubrequestInfo> subReqInfoMap; + for (auto &rn: subrequestsNames) { subReqInfoMap[rn.fSeq] = rn; } + // Try to create the retrieve subrequests (owned by this process, to be queued in a second step) + // subrequests can already fail at that point if we cannot find a copy on a valid tape. + std::list<uint64_t> failedFSeqs; + uint64_t failedFiles = 0; + uint64_t failedBytes = 0; + // First loop: we will issue the async insertions of the subrequests. + struct AsyncInsertionInfo { + Subrequest & rsr; + std::shared_ptr<RetrieveRequest> request; + std::shared_ptr<RetrieveRequest::AsyncInserter> inserter; + std::string bestVid; + uint32_t activeCopyNb; + }; + std::list<AsyncInsertionInfo> asyncInsertionInfoList; + for (auto &rsr: repackSubrequests) { + // Requests marked as deleted are guaranteed to have already been created => we will not re-attempt. + if (!subReqInfoMap.at(rsr.fSeq).subrequestDeleted) { + // We need to try and create the subrequest. + // Create the sub request (it's a retrieve request now). + auto rr=std::make_shared<objectstore::RetrieveRequest>(subReqInfoMap.at(rsr.fSeq).address, m_oStoreDB.m_objectStore); + rr->initialize(); + // Set the file info + common::dataStructures::RetrieveRequest schedReq; + schedReq.archiveFileID = rsr.archiveFile.archiveFileID; + schedReq.dstURL = rsr.fileBufferURL; + schedReq.diskFileInfo = rsr.archiveFile.diskFileInfo; + // dsrr.errorReportURL: We leave this bank as the reporting will be done to the repack request, + // stored in the repack info. + rr->setSchedulerRequest(schedReq); + // Set the repack info. + RetrieveRequest::RepackInfo rRRepackInfo; + try { + for (auto & ar: archiveRoutesMap.at(std::make_tuple(rsr.archiveFile.diskInstance, rsr.archiveFile.storageClass))) { + rRRepackInfo.archiveRouteMap[ar.second.copyNb] = ar.second.tapePoolName; + } + } catch (std::out_of_range &) { + failedFSeqs.emplace_back(rsr.fSeq); + failedFiles++; + failedBytes += rsr.archiveFile.fileSize; + log::ScopedParamContainer params(lc); + params.add("fileID", rsr.archiveFile.archiveFileID) + .add("diskInstance", rsr.archiveFile.diskInstance) + .add("storageClass", rsr.archiveFile.storageClass); + std::stringstream storageClassList; + bool first=true; + for (auto & sc: archiveRoutesMap) { + std::string diskInstance, storageClass; + std::tie(diskInstance, storageClass) = sc.first; + storageClassList << (first?"":" ") << "di=" << diskInstance << " sc=" << storageClass << " rc=" << sc.second.size(); + } + params.add("storageClassList", storageClassList.str()); + lc.log(log::ERR, "In OStoreDB::RepackRequest::addSubrequests(): not such archive route."); + continue; + } + rRRepackInfo.copyNbsToRearchive = rsr.copyNbsToRearchive; + rRRepackInfo.fileBufferURL = rsr.fileBufferURL; + rRRepackInfo.isRepack = true; + rRRepackInfo.repackRequestAddress = m_repackRequest.getAddressIfSet(); + rr->setRepackInfo(rRRepackInfo); + // Set the queueing parameters + common::dataStructures::RetrieveFileQueueCriteria fileQueueCriteria; + fileQueueCriteria.archiveFile = rsr.archiveFile; + fileQueueCriteria.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack; + rr->setRetrieveFileQueueCriteria(fileQueueCriteria); + // Decide of which vid we are going to retrieve from. Here, if we can retrieve from the repack VID, we + // will set the initial recall on it. Retries will we requeue to best VID as usual if needed. + std::string bestVid; + uint32_t activeCopyNumber = 0; + // Make sure we have a copy on the vid we intend to repack. + for (auto & tc: rsr.archiveFile.tapeFiles) { + if (tc.second.vid == repackInfo.vid) { + try { + // Try to select the repack VID from a one-vid list. + Helpers::selectBestRetrieveQueue({repackInfo.vid}, m_oStoreDB.m_catalogue, m_oStoreDB.m_objectStore); + bestVid = repackInfo.vid; + activeCopyNumber = tc.second.copyNb; + } catch (Helpers::NoTapeAvailableForRetrieve &) {} + break; + } + } + // The repack vid was not appropriate, let's try all candidates. + if (bestVid.empty()) { + std::set<std::string> candidateVids; + for (auto & tc: rsr.archiveFile.tapeFiles) candidateVids.insert(tc.second.vid); + try { + bestVid = Helpers::selectBestRetrieveQueue(candidateVids, m_oStoreDB.m_catalogue, m_oStoreDB.m_objectStore); + } catch (Helpers::NoTapeAvailableForRetrieve &) { + // Count the failure for this subrequest. + failedFSeqs.emplace_back(rsr.fSeq); + failedFiles++; + failedBytes += rsr.archiveFile.fileSize; + log::ScopedParamContainer params(lc); + params.add("fileId", rsr.archiveFile.archiveFileID) + .add("repackVid", repackInfo.vid); + lc.log(log::ERR, + "In OStoreDB::RepackRequest::addSubrequests(): could not queue a retrieve subrequest. Subrequest failed."); + continue; + } + } + for (auto &tc: rsr.archiveFile.tapeFiles) + if (tc.second.vid == bestVid) { + activeCopyNumber = tc.second.copyNb; + goto copyNbFound; + } + { + // Count the failure for this subrequest. + failedFSeqs.emplace_back(rsr.fSeq); + failedFiles++; + failedBytes += rsr.archiveFile.fileSize; + log::ScopedParamContainer params(lc); + params.add("fileId", rsr.archiveFile.archiveFileID) + .add("repackVid", repackInfo.vid) + .add("bestVid", bestVid); + lc.log(log::ERR, + "In OStoreDB::RepackRequest::addSubrequests(): could not find the copyNb for the chosen VID. Subrequest failed."); + continue; + } + copyNbFound:; + // We have the best VID. The request is ready to be created after comleting its information. + rr->setOwner(m_oStoreDB.m_agentReference->getAgentAddress()); + rr->setActiveCopyNumber(activeCopyNumber); + // We can now try to insert the request. It could alredy have been created (in which case it must exist). + // We hold the lock to the repack request, no better not waste time, so we async create. + try { + std::shared_ptr<objectstore::RetrieveRequest::AsyncInserter> rrai(rr->asyncInsert()); + asyncInsertionInfoList.emplace_back(AsyncInsertionInfo{rsr, rr, rrai, bestVid, activeCopyNumber}); + } catch (exception::Exception & ex) { + // We can fail to serialize here... + // Count the failure for this subrequest. + failedFSeqs.emplace_back(rsr.fSeq); + failedFiles++; + failedBytes += rsr.archiveFile.fileSize; + failedFSeqs.emplace_back(rsr.fSeq); + log::ScopedParamContainer params(lc); + params.add("fileId", rsr.archiveFile) + .add("repackVid", repackInfo.vid) + .add("bestVid", bestVid) + .add("ExceptionMessage", ex.getMessageValue()); + lc.log(log::ERR, + "In OStoreDB::RepackRequest::addSubrequests(): could not asyncInsert the subrequest."); + } + } + } + // We can now check the subrequests creations succeeded, and prepare their queueing. + struct AsyncInsertedSubrequestInfo { + Subrequest & rsr; + std::string bestVid; + uint32_t activeCopyNb; + std::shared_ptr<RetrieveRequest> request; + }; + std::list <AsyncInsertedSubrequestInfo> asyncInsertedSubrequestInfoList; + for (auto & aii: asyncInsertionInfoList) { + // Check the insertion succeeded. + try { + aii.inserter->wait(); + log::ScopedParamContainer params(lc); + params.add("fileID", aii.rsr.archiveFile.archiveFileID); + std::stringstream copyNbList; + bool first = true; + for (auto cn: aii.rsr.copyNbsToRearchive) { copyNbList << (first?"":" ") << cn; first = true; } + params.add("copyNbsToRearchive", copyNbList.str()) + .add("subrequestObject", aii.request->getAddressIfSet()) + .add("fileBufferURL", aii.rsr.fileBufferURL); + lc.log(log::INFO, "In OStoreDB::RepackRequest::addSubrequests(): subrequest created."); + asyncInsertedSubrequestInfoList.emplace_back(AsyncInsertedSubrequestInfo{aii.rsr, aii.bestVid, aii.activeCopyNb, aii.request}); + } catch (exception::Exception & ex) { + // Count the failure for this subrequest. + failedFSeqs.emplace_back(aii.rsr.fSeq); + failedFiles++; + failedBytes += aii.rsr.archiveFile.fileSize; + log::ScopedParamContainer params(lc); + params.add("fileId", aii.rsr.archiveFile) + .add("repackVid", repackInfo.vid) + .add("bestVid", aii.bestVid) + .add("bestCopyNb", aii.activeCopyNb) + .add("ExceptionMessage", ex.getMessageValue()); + lc.log(log::ERR, + "In OStoreDB::RepackRequest::addSubrequests(): could not asyncInsert the subrequest."); + } + } + // We now have created the subrequests. Time to enqueue. + { + objectstore::Sorter sorter(*m_oStoreDB.m_agentReference, m_oStoreDB.m_objectStore, m_oStoreDB.m_catalogue); + std::list<std::unique_ptr<objectstore::ScopedExclusiveLock>> locks; + for (auto &is: asyncInsertedSubrequestInfoList) { + locks.push_back(cta::make_unique<objectstore::ScopedExclusiveLock>(*is.request)); + is.request->fetch(); + sorter.insertRetrieveRequest(is.request, *m_oStoreDB.m_agentReference, is.activeCopyNb, lc); + } + locks.clear(); + sorter.flushAll(lc); + } + +} + +//------------------------------------------------------------------------------ +// OStoreDB::RepackRequest::expandDone() +//------------------------------------------------------------------------------ +void OStoreDB::RepackRequest::expandDone() { + // We are now done with the repack request. We can set its status. + ScopedSharedLock rrl(m_repackRequest); + m_repackRequest.fetch(); + // After expansion, 2 statuses are possible: starting (nothing reported as done) or running (anything reported as done). + // We can find that out from the statistics... + typedef objectstore::RepackRequest::StatsType StatsType; + bool running=false; + auto stats=m_repackRequest.getStats(); + for (auto t: {StatsType::ArchiveFailure, StatsType::ArchiveSuccess, StatsType::RetrieveSuccess, StatsType::RetrieveFailure}) { + if (stats.at(t).files) { + running=true; + break; + } + } + typedef common::dataStructures::RepackInfo::Status Status; + m_repackRequest.setStatus(running? Status::Running: Status::Starting); + m_repackRequest.commit(); +} + +//------------------------------------------------------------------------------ +// OStoreDB::RepackRequest::fail() +//------------------------------------------------------------------------------ +void OStoreDB::RepackRequest::fail() { + ScopedExclusiveLock rrl(m_repackRequest); + m_repackRequest.fetch(); + m_repackRequest.setStatus(common::dataStructures::RepackInfo::Status::Failed); + m_repackRequest.commit(); +} + + //------------------------------------------------------------------------------ // OStoreDB::cancelRepack() //------------------------------------------------------------------------------ @@ -1601,6 +1856,7 @@ getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logCo rj->selectedCopyNb = j.copyNb; rj->errorReportURL = j.errorReportURL; rj->reportType = j.reportType; + rj->m_repackInfo = j.repackInfo; rj->setJobOwned(); ret.emplace_back(std::move(rj)); } @@ -2490,6 +2746,8 @@ getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContex rj->archiveFile = j.archiveFile; rj->retrieveRequest = j.rr; rj->selectedCopyNb = j.copyNb; + rj->isRepack = j.repackInfo.isRepack; + rj->m_repackInfo = j.repackInfo; rj->m_jobOwned = true; rj->m_mountId = mountInfo.mountId; ret.emplace_back(std::move(rj)); @@ -2580,46 +2838,101 @@ OStoreDB::RetrieveJob * OStoreDB::castFromSchedDBJob(SchedulerDatabase::Retrieve } //------------------------------------------------------------------------------ -// OStoreDB::RetrieveMount::waitAndFinishSettingJobsBatchSuccessful() +// OStoreDB::RetrieveMount::flushAsyncSuccessReports() //------------------------------------------------------------------------------ -std::set<cta::SchedulerDatabase::RetrieveJob*> OStoreDB::RetrieveMount::finishSettingJobsBatchSuccessful( - std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, log::LogContext& lc) { - std::set<cta::SchedulerDatabase::RetrieveJob*> ret; +void OStoreDB::RetrieveMount::flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, + log::LogContext& lc) { std::list<std::string> rjToUnown; - // We will wait on the asynchronously started reports of jobs and remove them from - // ownership. + std::map<std::string, std::list<OStoreDB::RetrieveJob*>> jobsToRequeueForRepackMap; + // We will wait on the asynchronously started reports of jobs, queue the retrieve jobs + // for report and remove them from ownership. + // 1) Check the async update result. for (auto & sDBJob: jobsBatch) { auto osdbJob = castFromSchedDBJob(sDBJob); - rjToUnown.push_back(osdbJob->m_retrieveRequest.getAddressIfSet()); - ret.insert(sDBJob); + if (osdbJob->isRepack) { + try { + osdbJob->m_jobSucceedForRepackReporter->wait(); + jobsToRequeueForRepackMap[osdbJob->m_repackInfo.repackRequestAddress].emplace_back(osdbJob); + } catch (cta::exception::Exception & ex) { + log::ScopedParamContainer params(lc); + params.add("fileId", osdbJob->archiveFile.archiveFileID) + .add("requestObject", osdbJob->m_retrieveRequest.getAddressIfSet()) + .add("exceptionMessage", ex.getMessageValue()); + lc.log(log::ERR, + "In OStoreDB::RetrieveMount::flushAsyncSuccessReports(): async status update failed. " + "Will leave job to garbage collection."); + } + } else { + try { + osdbJob->m_jobDelete->wait(); + rjToUnown.push_back(osdbJob->m_retrieveRequest.getAddressIfSet()); + } catch (cta::exception::Exception & ex) { + log::ScopedParamContainer params(lc); + params.add("fileId", osdbJob->archiveFile.archiveFileID) + .add("requestObject", osdbJob->m_retrieveRequest.getAddressIfSet()) + .add("exceptionMessage", ex.getMessageValue()); + lc.log(log::ERR, + "In OStoreDB::RetrieveMount::flushAsyncSuccessReports(): async deletion failed. " + "Will leave job to garbage collection."); + } + } + } + // 2) Queue the repack requests for repack. + for (auto & repackRequestQueue: jobsToRequeueForRepackMap) { + typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToReportToRepackForSuccess> RQTRTRFSAlgo; + RQTRTRFSAlgo::InsertedElement::list insertedRequests; + // Keep a map of objectstore request -> sDBJob to handle errors. + std::map<objectstore::RetrieveRequest *, OStoreDB::RetrieveJob *> requestToJobMap; + for (auto & req: repackRequestQueue.second) { + insertedRequests.push_back(RQTRTRFSAlgo::InsertedElement{&req->m_retrieveRequest, req->selectedCopyNb, + req->archiveFile.tapeFiles[req->selectedCopyNb].fSeq, req->archiveFile.fileSize, + cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack, + serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess}); + requestToJobMap[&req->m_retrieveRequest] = req; + } + RQTRTRFSAlgo rQTRTRFSAlgo(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); + try { + rQTRTRFSAlgo.referenceAndSwitchOwnership(repackRequestQueue.first, insertedRequests, lc); + // In case all goes well, we can remove ownership of all requests. + for (auto & req: repackRequestQueue.second) { rjToUnown.push_back(req->m_retrieveRequest.getAddressIfSet()); } + } catch (RQTRTRFSAlgo::OwnershipSwitchFailure & failure) { + // Some requests did not make it to the queue. Log and leave them for GC to sort out (leave them in ownership). + std::set<std::string> failedElements; + for (auto & fe: failure.failedElements) { + // Log error. + log::ScopedParamContainer params(lc); + params.add("fileId", requestToJobMap.at(fe.element->retrieveRequest)->archiveFile.archiveFileID) + .add("copyNb", fe.element->copyNb) + .add("requestObject", fe.element->retrieveRequest->getAddressIfSet()); + try { + std::rethrow_exception(fe.failure); + } catch (cta::exception::Exception & ex) { + params.add("exeptionMessage", ex.getMessageValue()); + } catch (std::exception & ex) { + params.add("exceptionWhat", ex.what()) + .add("exceptionTypeName", typeid(ex).name()); + } + lc.log(log::ERR, "In OStoreDB::RetrieveMount::flushAsyncSuccessReports(): failed to queue request to report for repack." + "Leaving request to be garbage collected."); + // Add the failed request to the set. + failedElements.insert(fe.element->retrieveRequest->getAddressIfSet()); + } + for (auto & req: repackRequestQueue.second) { + if (!failedElements.count(req->m_retrieveRequest.getAddressIfSet())) { + rjToUnown.push_back(req->m_retrieveRequest.getAddressIfSet()); + } + } + } catch (exception::Exception & ex) { + // Something else happened. We just log the error and let the garbage collector go through all the requests. + log::ScopedParamContainer params(lc); + params.add("exceptionMessage", ex.getMessageValue()); + lc.log(log::ERR, "In OStoreDB::RetrieveMount::flushAsyncSuccessReports(): failed to queue a batch of requests."); + } } + // 3) Remove requests from ownership. m_oStoreDB.m_agentReference->removeBatchFromOwnership(rjToUnown, m_oStoreDB.m_objectStore); - return ret; } -//------------------------------------------------------------------------------ -// OStoreDB::RetrieveMount::batchSucceedRetrieveForRepack() -//------------------------------------------------------------------------------ -std::set<cta::SchedulerDatabase::RetrieveJob *> OStoreDB::RetrieveMount::batchSucceedRetrieveForRepack( - std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, cta::log::LogContext & lc) -{ - std::set<cta::SchedulerDatabase::RetrieveJob *> ret; - typedef objectstore::ContainerAlgorithms<objectstore::RetrieveQueue,objectstore::RetrieveQueueToReportToRepackForSuccess> AqtrtrfsCa; - AqtrtrfsCa aqtrtrfsCa(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference); - AqtrtrfsCa::InsertedElement::list insertedElementsLists; - std::string vid; - for(auto & retrieveJob : jobsBatch){ - auto osdbJob = castFromSchedDBJob(retrieveJob); - ret.insert(retrieveJob); - osdbJob->asyncReportSucceedForRepack(); - osdbJob->checkReportSucceedForRepack(); - auto & tapeFile = osdbJob->archiveFile.tapeFiles[osdbJob->selectedCopyNb]; - vid = osdbJob->m_retrieveMount->mountInfo.vid; - insertedElementsLists.push_back(AqtrtrfsCa::InsertedElement{&osdbJob->m_retrieveRequest, (uint16_t)osdbJob->selectedCopyNb, tapeFile.fSeq,osdbJob->archiveFile.fileSize,cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack,serializers::RetrieveJobStatus::RJS_ToReportToRepackForSuccess}); - } - aqtrtrfsCa.referenceAndSwitchOwnership(vid,insertedElementsLists,lc); - return ret; -} //------------------------------------------------------------------------------ // OStoreDB::ArchiveMount::setDriveStatus() //------------------------------------------------------------------------------ @@ -3276,39 +3589,22 @@ OStoreDB::RetrieveJob::~RetrieveJob() { } //------------------------------------------------------------------------------ -// OStoreDB::RetrieveJob::asyncSucceed() -//------------------------------------------------------------------------------ -void OStoreDB::RetrieveJob::asyncSucceed() { - // set the request as successful (delete it). - m_jobDelete.reset(m_retrieveRequest.asyncDeleteJob()); -} - -//------------------------------------------------------------------------------ -// OStoreDB::RetrieveJob::checkSucceed() +// OStoreDB::RetrieveJob::asyncSetSuccessful() //------------------------------------------------------------------------------ -void OStoreDB::RetrieveJob::checkSucceed() { - m_jobDelete->wait(); - m_retrieveRequest.resetValues(); - // We no more own the job (which could be gone) - m_jobOwned = false; - // Ownership will be removed from agent by caller through retrieve mount object. -} - -//------------------------------------------------------------------------------ -// OStoreDB::RetrieveJob::asyncReportSucceedForRepack() -//------------------------------------------------------------------------------ -void OStoreDB::RetrieveJob::asyncReportSucceedForRepack() -{ - m_jobSucceedForRepackReporter.reset(m_retrieveRequest.asyncReportSucceedForRepack(this->selectedCopyNb)); +void OStoreDB::RetrieveJob::asyncSetSuccessful() { + if (isRepack) { + // If the job is from a repack subrequest, we change its status (to report + // for repack success). Queueing will be done in batch in + m_jobSucceedForRepackReporter.reset(m_retrieveRequest.asyncReportSucceedForRepack(this->selectedCopyNb)); + } else { + // set the user transfer request as successful (delete it). + m_jobDelete.reset(m_retrieveRequest.asyncDeleteJob()); + } } //------------------------------------------------------------------------------ -// OStoreDB::RetrieveJob::checkReportSucceedForRepack() +// OStoreDB::getNextSucceededRetrieveRequestForRepackBatch() //------------------------------------------------------------------------------ -void OStoreDB::RetrieveJob::checkReportSucceedForRepack(){ - m_jobSucceedForRepackReporter->wait(); -} - std::list<std::unique_ptr<cta::objectstore::RetrieveRequest>> OStoreDB::getNextSucceededRetrieveRequestForRepackBatch(uint64_t filesRequested, log::LogContext& lc) { std::list<std::unique_ptr<cta::objectstore::RetrieveRequest>> ret; diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index dbe000f714156b3ad1e2fae8555a83c7bbbb8a9b..d9b364fe0d7217959e085bc1b150ff0f1d2292ab 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -216,8 +216,7 @@ public: void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override; void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override; public: - std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, log::LogContext& lc) override; - std::set<cta::SchedulerDatabase::RetrieveJob*> batchSucceedRetrieveForRepack(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override; + void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, log::LogContext& lc) override; }; friend class RetrieveMount; @@ -228,15 +227,10 @@ public: public: CTA_GENERATE_EXCEPTION_CLASS(JobNotOwned); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); - virtual void asyncSucceed() override; - virtual void checkSucceed() override; - /** - * Allows to asynchronously report this RetrieveJob as success - * for repack. It will call the retrieveRequest.asyncReportSucceedForRepack(this->selectedCopyNb) method - * that will set the status of the Job as RJS_Succeeded + /** Async set job successful. Either delete (user transfer) or change status (repack) + * Wait will happen in RetrieveMount::flushAsyncSuccessReports(). */ - virtual void asyncReportSucceedForRepack() override; - virtual void checkReportSucceedForRepack() override; + virtual void asyncSetSuccessful() override; void failTransfer(const std::string& failureReason, log::LogContext& lc) override; void failReport(const std::string& failureReason, log::LogContext& lc) override; virtual ~RetrieveJob() override; @@ -256,6 +250,7 @@ public: OStoreDB::RetrieveMount *m_retrieveMount; std::unique_ptr<objectstore::RetrieveRequest::AsyncJobDeleter> m_jobDelete; std::unique_ptr<objectstore::RetrieveRequest::AsyncJobSucceedForRepackReporter> m_jobSucceedForRepackReporter; + objectstore::RetrieveRequest::RepackInfo m_repackInfo; }; static RetrieveJob * castFromSchedDBJob(SchedulerDatabase::RetrieveJob * job); @@ -347,18 +342,19 @@ public: class RepackRequest: public SchedulerDatabase::RepackRequest { friend class OStoreDB; - public: - RepackRequest(const std::string &jobAddress, OStoreDB &oStoreDB) : - m_jobOwned(false), m_oStoreDB(oStoreDB), - m_repackRequest(jobAddress, m_oStoreDB.m_objectStore){} - void setJobOwned(bool b = true) { m_jobOwned = b; } - + public: + RepackRequest(const std::string &jobAddress, OStoreDB &oStoreDB) : + m_oStoreDB(oStoreDB), m_repackRequest(jobAddress, m_oStoreDB.m_objectStore){} + void addSubrequests(std::list<Subrequest>& repackSubrequests, cta::common::dataStructures::ArchiveRoute::FullMap& archiveRoutesMap, + uint64_t maxFSeqLowBound, log::LogContext& lc) override; + void expandDone() override; + void fail() override; + uint64_t getLastExpandedFSeq() override; private: - bool m_jobOwned; - uint64_t m_mountId; OStoreDB & m_oStoreDB; objectstore::RepackRequest m_repackRequest; }; + friend class RepackRequest; /** * A class holding a lock on the pending repack request queue. This is the first @@ -378,7 +374,7 @@ public: objectstore::ScopedExclusiveLock m_lockOnPendingRepackRequestsQueue; }; - class RepackRequestPromotionStatisticsNoLock: public SchedulerDatabase::RepackRequestStatistics { + class RepackRequestPromotionStatisticsNoLock: public SchedulerDatabase::RepackRequestStatistics {\ friend class OStoreDB; public: PromotionToToExpandResult promotePendingRequestsForExpansion(size_t requestCount, log::LogContext &lc) override { diff --git a/scheduler/RepackRequest.cpp b/scheduler/RepackRequest.cpp index 4fd5d440fc6f3099385102ba68267e7b822879fe..49c73ec07e3e57e2458f64d7ce7a6488717fd7e0 100644 --- a/scheduler/RepackRequest.cpp +++ b/scheduler/RepackRequest.cpp @@ -1,8 +1,37 @@ +/* + * The CERN Tape Archive(CTA) project + * Copyright(C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + *(at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY { + +} without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + + #include "RepackRequest.hpp" -cta::RepackRequest::RepackRequest(){} +namespace cta { const cta::common::dataStructures::RepackInfo cta::RepackRequest::getRepackInfo() const { return m_dbReq->repackInfo; } + +void RepackRequest::fail() { + m_dbReq->fail(); +} + + +} // namespace cta + diff --git a/scheduler/RepackRequest.hpp b/scheduler/RepackRequest.hpp index 836069cfc6c84a80c2d6f0f4fba465fd7fb9cf21..8a87735debb0200dcccd5d37da8cb3e84fac6f72 100644 --- a/scheduler/RepackRequest.hpp +++ b/scheduler/RepackRequest.hpp @@ -30,10 +30,9 @@ namespace cta { class RepackRequest { friend class Scheduler; public: - RepackRequest(); void expand(); const cta::common::dataStructures::RepackInfo getRepackInfo() const; - + void fail(); protected: std::unique_ptr<cta::SchedulerDatabase::RepackRequest> m_dbReq; }; // class RepackRequest diff --git a/scheduler/RetrieveJob.cpp b/scheduler/RetrieveJob.cpp index a50443aa3ca3e44a97d38b61eed7f3281f219337..ccf0d7e4f51fb0e0b3ba2f4ca7a0807eb6f3acd2 100644 --- a/scheduler/RetrieveJob.cpp +++ b/scheduler/RetrieveJob.cpp @@ -47,15 +47,8 @@ cta::RetrieveJob::RetrieveJob(RetrieveMount *mount, //------------------------------------------------------------------------------ // asyncComplete //------------------------------------------------------------------------------ -void cta::RetrieveJob::asyncComplete() { - m_dbJob->asyncSucceed(); -} - -//------------------------------------------------------------------------------ -// checkComplete -//------------------------------------------------------------------------------ -void cta::RetrieveJob::checkComplete() { - m_dbJob->checkSucceed(); +void cta::RetrieveJob::asyncSetSuccessful() { + m_dbJob->asyncSetSuccessful(); } //------------------------------------------------------------------------------ diff --git a/scheduler/RetrieveJob.hpp b/scheduler/RetrieveJob.hpp index f0cf494d17f1d6ada5c8d4608521cf74bf77c09f..8db9586fb09fde3d2ec7e2f70de73d18bd0501f0 100644 --- a/scheduler/RetrieveJob.hpp +++ b/scheduler/RetrieveJob.hpp @@ -79,14 +79,9 @@ public: * The checksum and the size of the transfer should already stored in the * object beforehand. Result setting and calling complete are done in 2 * different threads (disk write and reporter thread, respectively). + * Completion will be checked implicitly in RetrieveMount::flushAsyncSuccessReports() */ - virtual void asyncComplete(); - - /** - * Check that asynchronous complete is finished and cleanup the job structures - * - */ - virtual void checkComplete(); + virtual void asyncSetSuccessful(); /** * Triggers a scheduler update following the failure of the job. Retry policy will diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp index 24478acf212ae72e68cc0774ee9f94b1968dbb69..754d8cfb198c825a0c938f91f8861c2cd9821fcc 100644 --- a/scheduler/RetrieveMount.cpp +++ b/scheduler/RetrieveMount.cpp @@ -144,10 +144,9 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc //------------------------------------------------------------------------------ // waitAndFinishSettingJobsBatchRetrieved() //------------------------------------------------------------------------------ -void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::unique_ptr<cta::RetrieveJob> >& successfulRetrieveJobs, cta::log::LogContext& logContext) { +void cta::RetrieveMount::flushAsyncSuccessReports(std::queue<std::unique_ptr<cta::RetrieveJob> >& successfulRetrieveJobs, cta::log::LogContext& logContext) { std::list<std::unique_ptr<cta::RetrieveJob> > validatedSuccessfulRetrieveJobs; //List to ensure the destruction of the retrieve jobs at the end of this method std::list<cta::SchedulerDatabase::RetrieveJob *> validatedSuccessfulDBRetrieveJobs; - std::list<cta::SchedulerDatabase::RetrieveJob *> retrieveRepackJobs; std::unique_ptr<cta::RetrieveJob> job; double waitUpdateCompletionTime=0; double jobBatchFinishingTime=0; @@ -163,21 +162,14 @@ void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std:: if (!job.get()) continue; files++; bytes+=job->archiveFile.fileSize; - bool isRepack = job->m_dbJob->retrieveRequest.isRepack; - if(!isRepack){ - job->checkComplete(); - validatedSuccessfulDBRetrieveJobs.emplace_back(job->m_dbJob.get()); - } else { - retrieveRepackJobs.emplace_back(job->m_dbJob.get()); - } + validatedSuccessfulDBRetrieveJobs.emplace_back(job->m_dbJob.get()); validatedSuccessfulRetrieveJobs.emplace_back(std::move(job)); job.reset(); } waitUpdateCompletionTime=t.secs(utils::Timer::resetCounter); tl.insertAndReset("waitUpdateCompletionTime",t); // Complete the cleaning up of the jobs in the mount - m_dbMount->finishSettingJobsBatchSuccessful(validatedSuccessfulDBRetrieveJobs, logContext); - m_dbMount->batchSucceedRetrieveForRepack(retrieveRepackJobs,logContext); + m_dbMount->flushAsyncSuccessReports(validatedSuccessfulDBRetrieveJobs, logContext); jobBatchFinishingTime=t.secs(); tl.insertOrIncrement("jobBatchFinishingTime",jobBatchFinishingTime); schedulerDbTime=jobBatchFinishingTime + waitUpdateCompletionTime; diff --git a/scheduler/RetrieveMount.hpp b/scheduler/RetrieveMount.hpp index f2c5e54bbbb5ae1f7002904ff31f01be7704892a..753056d8949a5b5b6379f37df549dbb52d648694 100644 --- a/scheduler/RetrieveMount.hpp +++ b/scheduler/RetrieveMount.hpp @@ -173,7 +173,7 @@ namespace cta { * @param successfulRetrieveJobs the jobs to report * @param logContext */ - virtual void waitAndFinishSettingJobsBatchRetrieved(std::queue<std::unique_ptr<cta::RetrieveJob> > & successfulRetrieveJobs, cta::log::LogContext &logContext); + virtual void flushAsyncSuccessReports(std::queue<std::unique_ptr<cta::RetrieveJob> > & successfulRetrieveJobs, cta::log::LogContext &logContext); /** diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 7057a6b30939b34891fba046ec373dbf256ae73c..7d2e6e45e4bf5b618933fd52c99569c42e015995 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -33,6 +33,7 @@ #include <iostream> #include <sstream> +#include <iomanip> #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> @@ -208,13 +209,7 @@ void Scheduler::queueRetrieve( utils::Timer t; // Get the queue criteria common::dataStructures::RetrieveFileQueueCriteria queueCriteria; - if(!request.isRepack){ - queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, lc); - } else { - //Repack does not need policy - queueCriteria.archiveFile = m_catalogue.getArchiveFileById(request.archiveFileID); - queueCriteria.mountPolicy = common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack; - } + queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, lc); auto catalogueTime = t.secs(cta::utils::Timer::resetCounter); std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, lc); auto schedulerDbTime = t.secs(); @@ -251,13 +246,11 @@ void Scheduler::queueRetrieve( } spc.add("selectedVid", selectedVid) .add("catalogueTime", catalogueTime) - .add("schedulerDbTime", schedulerDbTime); - if(!request.isRepack){ - spc.add("policyName", queueCriteria.mountPolicy.name) + .add("schedulerDbTime", schedulerDbTime) + .add("policyName", queueCriteria.mountPolicy.name) .add("policyMaxDrives", queueCriteria.mountPolicy.maxDrivesAllowed) .add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge) .add("policyPriority", queueCriteria.mountPolicy.retrievePriority); - } lc.log(log::INFO, "Queued retrieve request"); } @@ -339,7 +332,8 @@ void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliI tl.insertAndReset("schedulerDbTime", t); log::ScopedParamContainer params(lc); params.add("tapeVid", vid) - .add("repackType", toString(repackType)); + .add("repackType", toString(repackType)) + .add("bufferURL", bufferURL); tl.addToLog(params); lc.log(log::INFO, "In Scheduler::queueRepack(): success."); } @@ -431,35 +425,80 @@ const std::string Scheduler::generateRetrieveDstURL(const cta::common::dataStruc // expandRepackRequest //------------------------------------------------------------------------------ void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList&, utils::Timer&, log::LogContext& lc) { - uint64_t fseq = c_defaultFseqForRepack; std::list<common::dataStructures::ArchiveFile> files; - auto vid = repackRequest->getRepackInfo().vid; + auto repackInfo = repackRequest->getRepackInfo(); + typedef cta::common::dataStructures::RepackInfo::Type RepackType; + if (repackInfo.type != RepackType::RepackOnly) { + log::ScopedParamContainer params(lc); + params.add("tapeVid", repackInfo.vid); + lc.log(log::ERR, "In Scheduler::expandRepackRequest(): failing repack request with unsupported (yet) type."); + repackRequest->fail(); + return; + } //We need to get the ArchiveRoutes to allow the retrieval of the tapePool in which the //tape where the file is is located std::list<common::dataStructures::ArchiveRoute> routes = m_catalogue.getArchiveRoutes(); //To identify the routes, we need to have both the dist instance name and the storage class name //thus, the key of the map is a pair of string - std::map<std::pair<std::string, std::string>,common::dataStructures::ArchiveRoute> mapRoutes; + cta::common::dataStructures::ArchiveRoute::FullMap archiveRoutesMap; for(auto route: routes){ //insert the route into the map to allow a quick retrieval - mapRoutes[std::make_pair(route.storageClassName,route.diskInstanceName)] = route; + archiveRoutesMap[std::make_pair(route.diskInstanceName,route.storageClassName)][route.copyNb] = route; } - while(true) { - files = m_catalogue.getFilesForRepack(vid,fseq,c_defaultMaxNbFilesForRepack); - for(auto &archiveFile : files) + uint64_t fSeq = repackRequest->m_dbReq->getLastExpandedFSeq() + 1; + cta::catalogue::ArchiveFileItor archiveFilesForCatalogue = m_catalogue.getArchiveFilesForRepackItor(repackInfo.vid, fSeq); + while(archiveFilesForCatalogue.hasMore()) { + size_t filesCount = 0; + uint64_t maxAddedFSeq = 0; + std::list<SchedulerDatabase::RepackRequest::Subrequest> retrieveSubrequests; + while(filesCount < c_defaultMaxNbFilesForRepack && archiveFilesForCatalogue.hasMore()) { - cta::common::dataStructures::RetrieveRequest retrieveRequest; - retrieveRequest.archiveFileID = archiveFile.archiveFileID; - retrieveRequest.diskFileInfo = archiveFile.diskFileInfo; - retrieveRequest.dstURL = generateRetrieveDstURL(archiveFile.diskFileInfo); - retrieveRequest.isRepack = true; - retrieveRequest.tapePool = mapRoutes[std::make_pair(archiveFile.storageClass,archiveFile.diskInstance)].tapePoolName; - queueRetrieve(archiveFile.diskInstance,retrieveRequest,lc); + auto archiveFile = archiveFilesForCatalogue.next(); + filesCount++; + fSeq++; + retrieveSubrequests.push_back(cta::SchedulerDatabase::RepackRequest::Subrequest()); + auto & rsr = retrieveSubrequests.back(); + rsr.archiveFile = archiveFile; + rsr.fSeq = std::numeric_limits<decltype(rsr.fSeq)>::max(); + // We have to determine which copynbs we want to rearchive, and under which fSeq we record this file. + if (repackInfo.type == RepackType::ExpandAndRepack || repackInfo.type == RepackType::RepackOnly) { + // determine which fSeq(s) (normally only one) lives on this tape. + for (auto & tc: archiveFile.tapeFiles) if (tc.second.vid == repackInfo.vid) { + rsr.copyNbsToRearchive.insert(tc.second.copyNb); + // We make the (reasonable) assumption that the archive file only has one copy on this tape. + // If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape. + // This will prevent double subrequest creation (we already have such a mechanism in case of crash and + // restart of expansion. + rsr.fSeq = std::min(tc.second.fSeq, rsr.fSeq); + maxAddedFSeq = std::max(maxAddedFSeq, rsr.fSeq); + } + } + if (repackInfo.type == RepackType::ExpandAndRepack || repackInfo.type == RepackType::ExpandOnly) { + // We should not get here are the type is filtered at the beginning of the function. + // TODO: add support for expand. + throw cta::exception::Exception("In Scheduler::expandRepackRequest(): expand not yet supported."); + } + if ((rsr.fSeq == std::numeric_limits<decltype(rsr.fSeq)>::max()) || rsr.copyNbsToRearchive.empty()) { + log::ScopedParamContainer params(lc); + params.add("fileId", rsr.archiveFile.archiveFileID) + .add("repackVid", repackInfo.vid); + lc.log(log::ERR, "In Scheduler::expandRepackRequest(): no fSeq found for this file on this tape."); + retrieveSubrequests.pop_back(); + } else { + // We found some copies to rearchive. We still have to decide which file path we are going to use. + // File path will be base URL + /<VID>/<fSeq> + std::stringstream fileBufferURL; + fileBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/" + << std::setw(9) << std::setfill('0') << rsr.fSeq; + rsr.fileBufferURL = fileBufferURL.str(); + } } - if (files.size()) { - auto & tf=files.back().tapeFiles; - fseq = std::find_if(tf.cbegin(), tf.cend(), [vid](decltype(*(tf.cbegin())) &f){ return f.second.vid == vid; })->second.fSeq + 1; - } else break; + // Note: the highest fSeq will be recorded internally in the following call. + // We know that the fSeq processed on the tape are >= initial fSeq + filesCount - 1 (or fSeq - 1 as we counted). + // We pass this information to the db for recording in the repack request. This will allow restarting from the right + // value in case of crash. + repackRequest->m_dbReq->addSubrequests(retrieveSubrequests, archiveRoutesMap, fSeq - 1, lc); + fSeq = std::max(fSeq, maxAddedFSeq + 1); } } diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 606e72b5608fa89cff363bc2a8eaffb1d666406d..9bbbc751ee50f6410e08a51133032b3d00bad3a8 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -290,7 +290,6 @@ public: /*============== Actual mount scheduling and queue status reporting ========*/ private: - const uint64_t c_defaultFseqForRepack = 1; const size_t c_defaultMaxNbFilesForRepack = 500; typedef std::pair<std::string, common::dataStructures::MountType> tpType; diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index d6c9457cf969127ab80c0fa0325a864ddf194d48..b437b1290a11e394f641522f28ca1e5b671eca44 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -25,6 +25,7 @@ #include "common/dataStructures/ArchiveFile.hpp" #include "common/dataStructures/ArchiveRequest.hpp" #include "common/dataStructures/ArchiveFileQueueCriteriaAndFileId.hpp" +#include "common/dataStructures/ArchiveRoute.hpp" #include "common/dataStructures/DriveInfo.hpp" #include "common/dataStructures/MountType.hpp" #include "common/dataStructures/MountPolicy.hpp" @@ -357,10 +358,7 @@ public: virtual void complete(time_t completionTime) = 0; virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0; virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0; - virtual std::set<cta::SchedulerDatabase::RetrieveJob *> finishSettingJobsBatchSuccessful( - std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, log::LogContext & lc) = 0; - virtual std::set<cta::SchedulerDatabase::RetrieveJob *> batchSucceedRetrieveForRepack( - std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, cta::log::LogContext & lc) = 0; + virtual void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, log::LogContext & lc) = 0; virtual ~RetrieveMount() {} uint32_t nbFilesCurrentlyOnTape; }; @@ -377,14 +375,14 @@ public: } reportType; cta::common::dataStructures::ArchiveFile archiveFile; cta::common::dataStructures::RetrieveRequest retrieveRequest; - uint64_t selectedCopyNb; - virtual void asyncSucceed() = 0; - virtual void checkSucceed() = 0; - virtual void asyncReportSucceedForRepack() = 0; - virtual void checkReportSucceedForRepack() = 0; + uint32_t selectedCopyNb; + bool isRepack = false; + /** Set the job successful (async). Wait() and end of report happen in RetrieveMount::flushAsyncSuccessReports() */ + virtual void asyncSetSuccessful() = 0; virtual void failTransfer(const std::string &failureReason, log::LogContext &lc) = 0; virtual void failReport(const std::string &failureReason, log::LogContext &lc) = 0; virtual ~RetrieveJob() {} + private: }; /*============ Repack management: user side ================================*/ @@ -429,13 +427,23 @@ public: /** * A class providing the per repack request interface. It is also used to create the per file - * requests in the object store. + * subrequests in the object store. */ class RepackRequest { public: cta::common::dataStructures::RepackInfo repackInfo; - uint64_t getLastExpandedFseq(); - void addFileRequestsBatch(); + virtual uint64_t getLastExpandedFSeq() = 0; + struct Subrequest { + uint64_t fSeq; + cta::common::dataStructures::ArchiveFile archiveFile; + std::set<uint32_t> copyNbsToRearchive; + std::string fileBufferURL; + }; + virtual void addSubrequests(std::list<Subrequest>& repackSubrequests, + cta::common::dataStructures::ArchiveRoute::FullMap & archiveRoutesMap, uint64_t maxFSeqLowBound, log::LogContext & lc) = 0; + virtual void expandDone() = 0; + virtual void fail() = 0; + virtual ~RepackRequest() {} }; /***/ diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 03c9ecde355500a0ff1da0bf0c2c9364df499512..efb7a06f79b7ebf746e66fc3f634cfa20597aee4 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -36,6 +36,7 @@ #include "objectstore/BackendRadosTestSwitch.hpp" #include "objectstore/RootEntry.hpp" #include "objectstore/JobQueueType.hpp" +#include "objectstore/RepackIndex.hpp" #include "tests/TestsCompileTimeSwitches.hpp" #include "common/Timer.hpp" #include "tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.hpp" @@ -565,8 +566,10 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) { ASSERT_EQ(1, jobBatch.size()); retrieveJob.reset(jobBatch.front().release()); ASSERT_NE(nullptr, retrieveJob.get()); - retrieveJob->asyncComplete(); - retrieveJob->checkComplete(); + retrieveJob->asyncSetSuccessful(); + std::queue<std::unique_ptr<cta::RetrieveJob> > jobQueue; + jobQueue.push(std::move(retrieveJob)); + retrieveMount->flushAsyncSuccessReports(jobQueue, lc); jobBatch = retrieveMount->getNextJobBatch(1,1,lc); ASSERT_EQ(0, jobBatch.size()); } @@ -1347,13 +1350,16 @@ TEST_P(SchedulerTest, expandRepackRequest) { setupDefaultCatalogue(); - //cta::log::StdoutLogger dummyLogger("dummy","dummy"); - cta::log::DummyLogger dummyLogger("dummy","dummy"); - log::LogContext lc(dummyLogger); +#ifdef STDOUT_LOGGING + log::StdoutLogger dl("dummy", "unitTest"); +#else + log::DummyLogger dl("", ""); +#endif + log::LogContext lc(dl); //Create an agent to represent this test process std::string agentReferenceName = "expandRepackRequestTest"; - std::unique_ptr<objectstore::AgentReference> agentReference(new objectstore::AgentReference(agentReferenceName, dummyLogger)); + std::unique_ptr<objectstore::AgentReference> agentReference(new objectstore::AgentReference(agentReferenceName, dl)); const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; @@ -1440,7 +1446,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { scheduler.waitSchedulerDbSubthreadsComplete(); { for(uint64_t i = 0; i < nbTapesToRepack ; ++i) { - scheduler.queueRepack(admin,allVid.at(i),"bufferURL",common::dataStructures::RepackInfo::Type::ExpandOnly,lc); + scheduler.queueRepack(admin,allVid.at(i),"root://repackData/buffer",common::dataStructures::RepackInfo::Type::RepackOnly,lc); } scheduler.waitSchedulerDbSubthreadsComplete(); //scheduler.waitSchedulerDbSubthreadsComplete(); @@ -1473,11 +1479,11 @@ TEST_P(SchedulerTest, expandRepackRequest) { int j = 1; for(auto retrieveJob : retrieveJobs){ //Test that the informations are correct for each file - ASSERT_EQ(retrieveJob.request.tapePool,s_tapePoolName); + //ASSERT_EQ(retrieveJob.request.tapePool,s_tapePoolName); ASSERT_EQ(retrieveJob.request.archiveFileID,archiveFileId++); ASSERT_EQ(retrieveJob.fileSize,compressedFileSize); std::stringstream ss; - ss<<"repack://public_dir/public_file_"<<i<<"_"<<j; + ss<<"root://repackData/buffer/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j; ASSERT_EQ(retrieveJob.request.dstURL, ss.str()); ASSERT_EQ(retrieveJob.tapeCopies[vid].second.copyNb,1); ASSERT_EQ(retrieveJob.tapeCopies[vid].second.checksumType,checksumType); @@ -1545,7 +1551,10 @@ TEST_P(SchedulerTest, expandRepackRequest) { re.fetch(); //Get the retrieveQueueToReportToRepackForSuccess - std::string retrieveQueueToReportToRepackForSuccessAddress = re.getRetrieveQueueAddress(allVid.at(i-1),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); + // The queue is named after the repack request: we need to query the repack index + objectstore::RepackIndex ri(re.getRepackIndexAddress(), schedulerDB.getBackend()); + ri.fetchNoLock(); + std::string retrieveQueueToReportToRepackForSuccessAddress = re.getRetrieveQueueAddress(ri.getRepackRequestAddress(allVid.at(i-1)),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); cta::objectstore::RetrieveQueue rq(retrieveQueueToReportToRepackForSuccessAddress,schedulerDB.getBackend()); //Fetch the queue so that we can get the retrieveRequests from it @@ -1575,16 +1584,16 @@ TEST_P(SchedulerTest, expandRepackRequest) { //Testing scheduler retrieve request ASSERT_EQ(schedulerRetrieveRequest.archiveFileID,archiveFileId++); std::stringstream ss; - ss<<"repack://public_dir/public_file_"<<i<<"_"<<j; + ss<<"root://repackData/buffer/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j; ASSERT_EQ(schedulerRetrieveRequest.dstURL,ss.str()); - ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); - ASSERT_EQ(schedulerRetrieveRequest.tapePool,s_tapePoolName); + // TODO ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); + // TODO ASSERT_EQ(schedulerRetrieveRequest.tapePool,s_tapePoolName); std::ostringstream diskFilePath; diskFilePath << "/public_dir/public_file_"<<i<<"_"<<j; ASSERT_EQ(schedulerRetrieveRequest.diskFileInfo.path,diskFilePath.str()); //Testing the retrieve request - ASSERT_EQ(retrieveRequest.isRepack(),true); - ASSERT_EQ(retrieveRequest.getTapePool(),s_tapePoolName); + ASSERT_EQ(retrieveRequest.getRepackInfo().isRepack,true); + // TODO ASSERT_EQ(retrieveRequest.getTapePool(),s_tapePoolName); ASSERT_EQ(retrieveRequest.getQueueType(),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); ASSERT_EQ(retrieveRequest.getRetrieveFileQueueCriteria().mountPolicy,cta::common::dataStructures::MountPolicy::s_defaultMountPolicyForRepack); ASSERT_EQ(retrieveRequest.getActiveCopyNumber(),1); @@ -1622,7 +1631,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { //Testing RetrieveRequest common::dataStructures::RetrieveRequest schedulerRetrieveRequest = retrieveRequest->getSchedulerRequest(); - ASSERT_EQ(retrieveRequest->getTapePool(),s_tapePoolName); + // TODO ASSERT_EQ(retrieveRequest->getTapePool(),s_tapePoolName); ASSERT_EQ(retrieveRequest->getJobs().size(),1); ASSERT_EQ(retrieveRequest->getLastActiveVid(),currentVid); ASSERT_EQ(retrieveRequest->getQueueType(),cta::objectstore::JobQueueType::JobsToReportToRepackForSuccess); @@ -1639,15 +1648,15 @@ TEST_P(SchedulerTest, expandRepackRequest) { //Testing scheduler retrieve request ASSERT_EQ(schedulerRetrieveRequest.archiveFileID,archiveFileId++); std::stringstream ss; - ss<<"repack://public_dir/public_file_"<<i<<"_"<<j; + ss<<"root://repackData/buffer/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j; ASSERT_EQ(schedulerRetrieveRequest.dstURL,ss.str()); - ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); - ASSERT_EQ(schedulerRetrieveRequest.tapePool,s_tapePoolName); + // TODO ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); + // TODO ASSERT_EQ(schedulerRetrieveRequest.tapePool,s_tapePoolName); std::ostringstream diskFilePath; diskFilePath << "/public_dir/public_file_"<<i<<"_"<<j; ASSERT_EQ(schedulerRetrieveRequest.diskFileInfo.path,diskFilePath.str()); //Testing the retrieve request - ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); + // TODO ASSERT_EQ(schedulerRetrieveRequest.isRepack,true); //Testing the archive file associated to the retrieve request ASSERT_EQ(archiveFile.storageClass,storageClass.name); @@ -1702,7 +1711,7 @@ TEST_P(SchedulerTest, expandRepackRequest) { ASSERT_EQ(archiveFile.fileSize,archiveFileSize); ASSERT_EQ(archiveFile.storageClass,s_storageClassName); std::stringstream ss; - ss<<"repack://public_dir/public_file_"<<i<<"_"<<j; + ss<<"root://repackData/buffer/"<<allVid.at(i-1)<<"/"<<std::setw(9)<<std::setfill('0')<<j; ASSERT_EQ(archiveRequest.getSrcURL(),ss.str()); for(auto archiveJob : archiveRequest.dumpJobs()){ ASSERT_EQ(archiveJob.status,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForUser); diff --git a/scheduler/testingMocks/MockRetrieveJob.hpp b/scheduler/testingMocks/MockRetrieveJob.hpp index 94a7b71bb247b8dc968d29a35de8dbf261c90794..fb8128aadde9f806bab026ae55f6df51a9bf76ab 100644 --- a/scheduler/testingMocks/MockRetrieveJob.hpp +++ b/scheduler/testingMocks/MockRetrieveJob.hpp @@ -33,8 +33,7 @@ namespace cta { cta::PositioningMethod::ByBlock), completes(0), failures(0) { archiveFile.tapeFiles[1]; } - virtual void asyncComplete() override { completes++; } - virtual void checkComplete() override {} + virtual void asyncSetSuccessful() override { completes++; } void transferFailed(const std::string &failureReason, cta::log::LogContext&) override { failures++; }; ~MockRetrieveJob() throw() {} diff --git a/scheduler/testingMocks/MockRetrieveMount.hpp b/scheduler/testingMocks/MockRetrieveMount.hpp index 9625ad650b6dfa4463cfe5bee9e6566726ca7370..4753487bc702efd67854073720189c1b6b7b84ef 100644 --- a/scheduler/testingMocks/MockRetrieveMount.hpp +++ b/scheduler/testingMocks/MockRetrieveMount.hpp @@ -68,7 +68,7 @@ namespace cta { void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override {}; - void waitAndFinishSettingJobsBatchRetrieved(std::queue<std::unique_ptr<cta::RetrieveJob> >& successfulRetrieveJobs, cta::log::LogContext& logContext) override {}; + void flushAsyncSuccessReports(std::queue<std::unique_ptr<cta::RetrieveJob> >& successfulRetrieveJobs, cta::log::LogContext& logContext) override {}; private: diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp index 348daddd53999848d982a42a2b2c4a28acd1c8a5..9165dbac7e6f919275202f251c2e3ddf9311117a 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp @@ -46,8 +46,7 @@ namespace unitTests{ void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } - std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } - std::set<cta::SchedulerDatabase::RetrieveJob*> batchSucceedRetrieveForRepack(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } + void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } }; class TestingRetrieveMount: public cta::RetrieveMount { diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp index 7866aceb8199e07bfcbd27e4d6f10c00876b471d..92b9206f0676838abe2e4c0b0799079e42556392 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp @@ -40,8 +40,7 @@ namespace unitTests{ void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } - std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } - std::set<cta::SchedulerDatabase::RetrieveJob*> batchSucceedRetrieveForRepack(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } + void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } }; class TestingRetrieveMount: public cta::RetrieveMount { diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp index e6f218c0dd3db2ba87dfc5e4771b8d2c90d1b455..39d6285b2659ba40fd9a00e74e7366b5e9c4595f 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPacker.cpp @@ -115,9 +115,7 @@ void RecallReportPacker::reportTestGoingToEnd(){ //ReportSuccessful::execute //------------------------------------------------------------------------------ void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){ - if(!m_successfulRetrieveJob->retrieveRequest.isRepack){ - m_successfulRetrieveJob->asyncComplete(); - } + m_successfulRetrieveJob->asyncSetSuccessful(); parent.m_successfulRetrieveJobs.push(std::move(m_successfulRetrieveJob)); } @@ -366,7 +364,7 @@ bool RecallReportPacker::errorHappened() { //fullCheckAndFinishAsyncExecute() //------------------------------------------------------------------------------ void RecallReportPacker::fullCheckAndFinishAsyncExecute() { - m_retrieveMount->waitAndFinishSettingJobsBatchRetrieved(m_successfulRetrieveJobs, m_lc); + m_retrieveMount->flushAsyncSuccessReports(m_successfulRetrieveJobs, m_lc); } //------------------------------------------------------------------------------ diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp index ea231fa23226862ced06cf4004f363df03512c68..152b2f4831b1d19215b3c774b4c4bcc4a0f80b33 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallReportPackerTest.cpp @@ -52,11 +52,9 @@ protected: MockRetrieveJobExternalStats(cta::RetrieveMount & rm, int & completes, int &failures): MockRetrieveJob(rm), completesRef(completes), failuresRef(failures) {} - void asyncComplete() override { + void asyncSetSuccessful() override { completesRef++; } - - void checkComplete() override {} void transferFailed(const std::string &failureReason, cta::log::LogContext&) override { failuresRef++; diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp index 68065bd4c4abe49e42263871511d1b73e7b2f5b7..4a331e51d2c971ee1be25443cbe905e5f9702b72 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp @@ -135,9 +135,7 @@ namespace unitTests void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } - std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } - std::set<cta::SchedulerDatabase::RetrieveJob*> batchSucceedRetrieveForRepack(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } - + void flushAsyncSuccessReports(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); } }; TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNominal) {