diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 74d785882f0cf49e6838a774225dad525ac85cd4..051eaec752f1046ee17a84e9ab79cf58dfe111c6 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -69,17 +69,17 @@ void ArchiveQueue::commit() { ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); - if (maxDriveAllowedMap.total() != (uint64_t)m_payload.pendingarchivejobs_size() || - priorityMap.total() != (uint64_t)m_payload.pendingarchivejobs_size() || - minArchiveRequestAgeMap.total() != (uint64_t)m_payload.pendingarchivejobs_size()) { + if (maxDriveAllowedMap.total() != (uint64_t)m_payload.archivejobs_size() || + priorityMap.total() != (uint64_t)m_payload.archivejobs_size() || + minArchiveRequestAgeMap.total() != (uint64_t)m_payload.archivejobs_size()) { // The maps counts are off: recompute them. maxDriveAllowedMap.clear(); priorityMap.clear(); minArchiveRequestAgeMap.clear(); - for (size_t i=0; i<(size_t)m_payload.pendingarchivejobs_size(); i++) { - maxDriveAllowedMap.incCount(m_payload.pendingarchivejobs(i).maxdrivesallowed()); - priorityMap.incCount(m_payload.pendingarchivejobs(i).priority()); - minArchiveRequestAgeMap.incCount(m_payload.pendingarchivejobs(i).priority()); + for (size_t i=0; i<(size_t)m_payload.archivejobs_size(); i++) { + maxDriveAllowedMap.incCount(m_payload.archivejobs(i).maxdrivesallowed()); + priorityMap.incCount(m_payload.archivejobs(i).priority()); + minArchiveRequestAgeMap.incCount(m_payload.archivejobs(i).priority()); } m_payload.set_mapsrebuildcount(m_payload.mapsrebuildcount()+1); } @@ -90,9 +90,7 @@ void ArchiveQueue::commit() { bool ArchiveQueue::isEmpty() { checkPayloadReadable(); // Check we have no archive jobs pending - if (m_payload.pendingarchivejobs_size() - || m_payload.orphanedarchivejobsnscreation_size() - || m_payload.orphanedarchivejobsnsdeletion_size()) + if (m_payload.archivejobs_size()) return false; // If we made it to here, it seems the pool is indeed empty. return true; @@ -154,7 +152,7 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd) { priorityMap.incCount(jta.policy.archivePriority); ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); minArchiveRequestAgeMap.incCount(jta.policy.archiveMinRequestAge); - if (m_payload.pendingarchivejobs_size()) { + if (m_payload.archivejobs_size()) { if ((uint64_t)jta.startTime < m_payload.oldestjobcreationtime()) m_payload.set_oldestjobcreationtime(jta.startTime); m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + jta.fileSize); @@ -162,7 +160,7 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd) { m_payload.set_archivejobstotalsize(jta.fileSize); m_payload.set_oldestjobcreationtime(jta.startTime); } - auto * j = m_payload.add_pendingarchivejobs(); + auto * j = m_payload.add_archivejobs(); j->set_address(jta.archiveRequestAddress); j->set_size(jta.fileSize); j->set_fileid(jta.archiveFileId); @@ -177,7 +175,7 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd) { auto ArchiveQueue::getJobsSummary() -> JobsSummary { checkPayloadReadable(); JobsSummary ret; - ret.files = m_payload.pendingarchivejobs_size(); + ret.files = m_payload.archivejobs_size(); ret.bytes = m_payload.archivejobstotalsize(); ret.oldestJobStartTime = m_payload.oldestjobcreationtime(); if (ret.files) { @@ -202,7 +200,7 @@ ArchiveQueue::AdditionSummary ArchiveQueue::addJobsIfNecessaryAndCommit(std::lis ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); AdditionSummary ret; for (auto & jta: jobsToAdd) { - auto & jl=m_payload.pendingarchivejobs(); + auto & jl=m_payload.archivejobs(); for (auto j=jl.begin(); j!= jl.end(); j++) { if (j->address() == jta.archiveRequestAddress) goto skipInsertion; @@ -212,7 +210,7 @@ ArchiveQueue::AdditionSummary ArchiveQueue::addJobsIfNecessaryAndCommit(std::lis maxDriveAllowedMap.incCount(jta.policy.maxDrivesAllowed); priorityMap.incCount(jta.policy.archivePriority); minArchiveRequestAgeMap.incCount(jta.policy.archiveMinRequestAge); - if (m_payload.pendingarchivejobs_size()) { + if (m_payload.archivejobs_size()) { if ((uint64_t)jta.startTime < m_payload.oldestjobcreationtime()) m_payload.set_oldestjobcreationtime(jta.startTime); m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + jta.fileSize); @@ -220,7 +218,7 @@ ArchiveQueue::AdditionSummary ArchiveQueue::addJobsIfNecessaryAndCommit(std::lis m_payload.set_archivejobstotalsize(jta.fileSize); m_payload.set_oldestjobcreationtime(jta.startTime); } - auto * j = m_payload.add_pendingarchivejobs(); + auto * j = m_payload.add_archivejobs(); j->set_address(jta.archiveRequestAddress); j->set_size(jta.fileSize); j->set_fileid(jta.archiveFileId); @@ -243,7 +241,7 @@ void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& requestsToR ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); - auto * jl=m_payload.mutable_pendingarchivejobs(); + auto * jl=m_payload.mutable_archivejobs(); bool jobRemoved=false; for (auto &rrt: requestsToRemove) { bool found = false; @@ -275,7 +273,7 @@ void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& requestsToR auto ArchiveQueue::dumpJobs() -> std::list<JobDump> { checkPayloadReadable(); std::list<JobDump> ret; - auto & jl=m_payload.pendingarchivejobs(); + auto & jl=m_payload.archivejobs(); for (auto j=jl.begin(); j!=jl.end(); j++) { ret.push_back(JobDump()); JobDump & jd = ret.back(); @@ -290,8 +288,8 @@ auto ArchiveQueue::getCandidateList(uint64_t maxBytes, uint64_t maxFiles, std::s checkPayloadReadable(); CandidateJobList ret; ret.remainingBytesAfterCandidates = m_payload.archivejobstotalsize(); - ret.remainingFilesAfterCandidates = m_payload.pendingarchivejobs_size(); - for (auto & j: m_payload.pendingarchivejobs()) { + ret.remainingFilesAfterCandidates = m_payload.archivejobs_size(); + for (auto & j: m_payload.archivejobs()) { if (!archiveRequestsToSkip.count(j.address())) { ret.candidates.push_back({j.size(), j.address(), (uint16_t)j.copynb()}); ret.candidateBytes += j.size(); diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 91d580f13c2875976ae65fd3ee93aea45b3b9ade..68f392a76c5d91c1e09eaa167e3f79887b656644 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -389,9 +389,7 @@ message ValueCountPair { message ArchiveQueue { required string tapepool = 10000; - repeated ArchiveJobPointer pendingarchivejobs = 10010; - repeated ArchiveJobPointer orphanedarchivejobsnscreation = 10020; - repeated ArchiveJobPointer orphanedarchivejobsnsdeletion = 10030; + repeated ArchiveJobPointer archivejobs = 10010; repeated ValueCountPair prioritymap = 10031; repeated ValueCountPair minarchiverequestagemap = 10032; repeated ValueCountPair maxdrivesallowedmap = 10033;