diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index a3190d136c65d54bc420b621505c372cae8318e9..15c60917e880eb577aba19830a407a0bddb90c6c 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -84,17 +84,19 @@ bool ArchiveQueue::checkMapsAndShardsCoherency() { bytesFromShardPointers += aqs.shardbytescount(); jobsExpectedFromShardsPointers += aqs.shardjobscount(); } + uint64_t totalBytes = m_payload.archivejobstotalsize(); + uint64_t totalJobs = m_payload.archivejobscount(); // The sum of shards should be equal to the summary - if (m_payload.archivejobstotalsize() != bytesFromShardPointers || - m_payload.archivejobscount() != jobsExpectedFromShardsPointers) + if (totalBytes != bytesFromShardPointers || + totalJobs != jobsExpectedFromShardsPointers) return false; // Check that we have coherent queue summaries ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); - if (maxDriveAllowedMap.total() != m_payload.archivejobstotalsize() || - priorityMap.total() != m_payload.archivejobstotalsize() || - minArchiveRequestAgeMap.total() != m_payload.archivejobstotalsize()) + if (maxDriveAllowedMap.total() != m_payload.archivejobscount() || + priorityMap.total() != m_payload.archivejobscount() || + minArchiveRequestAgeMap.total() != m_payload.archivejobscount()) return false; return true; } @@ -331,6 +333,7 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefere ValueCountMap priorityMap(m_payload.mutable_prioritymap()); ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); while (nextJob != jobsToAdd.end() && aqsp->shardjobscount() < c_maxShardSize) { + // Update stats and global counters. maxDriveAllowedMap.incCount(nextJob->policy.maxDrivesAllowed); priorityMap.incCount(nextJob->policy.archivePriority); minArchiveRequestAgeMap.incCount(nextJob->policy.archiveMinRequestAge); @@ -342,7 +345,8 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefere m_payload.set_archivejobstotalsize(nextJob->fileSize); m_payload.set_oldestjobcreationtime(nextJob->startTime); } - // Add the job to shard, update pointer counts. + m_payload.set_archivejobscount(m_payload.archivejobscount()+1); + // Add the job to shard, update pointer counts and queue summary. aqsp->set_shardjobscount(aqs.addJob(*nextJob)); aqsp->set_shardbytescount(aqsp->shardbytescount() + nextJob->fileSize); // And move to the next job @@ -364,7 +368,7 @@ void ArchiveQueue::addJobsAndCommit(std::list<JobToAdd> & jobsToAdd, AgentRefere auto ArchiveQueue::getJobsSummary() -> JobsSummary { checkPayloadReadable(); JobsSummary ret; - ret.jobs = m_payload.archivejobstotalsize(); + ret.jobs = m_payload.archivejobscount(); ret.bytes = m_payload.archivejobstotalsize(); ret.oldestJobStartTime = m_payload.oldestjobcreationtime(); if (ret.jobs) { @@ -442,8 +446,10 @@ void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemov auto localJobsToRemove = jobsToRemove; // The jobs are expected to be removed from the front shards first. // Remove jobs until there are no more jobs or no more shards. - auto shardPointer = m_payload.mutable_archivequeuesshards()->begin(); - while (localJobsToRemove.size() && shardPointer != m_payload.mutable_archivequeuesshards()->end()) { + ssize_t shardIndex=0; + auto * mutableArchiveQueueShards= m_payload.mutable_archivequeuesshards(); + while (localJobsToRemove.size() && shardIndex < mutableArchiveQueueShards->size()) { + auto * shardPointer = mutableArchiveQueueShards->Mutable(shardIndex); // Get hold of the shard ArchiveQueueShard aqs(shardPointer->address(), m_objectStore); m_exclusiveLock->includeSubObject(aqs); @@ -463,31 +469,38 @@ void ArchiveQueue::removeJobsAndCommit(const std::list<std::string>& jobsToRemov priorityMap.decCount(j.priority); minArchiveRequestAgeMap.decCount(j.minArchiveRequestAge); } + // In all cases, we should update the global statistics. + m_payload.set_archivejobscount(m_payload.archivejobscount() - removalResult.jobsRemoved); + m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() - removalResult.bytesRemoved); // If the shard is still around, we shall update its pointer's stats too. if (removalResult.jobsAfter) { - m_payload.set_archivejobscount(m_payload.archivejobscount() - removalResult.jobsRemoved); - m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() - removalResult.bytesRemoved); // Also update the shard pointers's stats. In case of mismatch, we will trigger a rebuild. shardPointer->set_shardbytescount(shardPointer->shardbytescount() - removalResult.bytesRemoved); shardPointer->set_shardjobscount(shardPointer->shardjobscount() - removalResult.jobsRemoved); - if (shardPointer->shardbytescount() != removalResult.bytesRemoved - || shardPointer->shardjobscount() != removalResult.jobsAfter) + if (shardPointer->shardbytescount() != removalResult.bytesAfter + || shardPointer->shardjobscount() != removalResult.jobsAfter) { rebuild(); + } // We will commit when exiting anyway... - shardPointer++; + shardIndex++; } else { // Shard's gone, so should the pointer. Push it to the end of the queue and // trim it. - auto shardPointerToRemove = shardPointer; - // Update the pointer for the next iteration. - shardPointer++; - while ((shardPointerToRemove + 1) != m_payload.mutable_archivequeuesshards()->end()) { - // Swap wants a pointer as a parameter, which the iterator is not, so we convert it to pointer - // with &* - shardPointerToRemove->Swap(&*++shardPointerToRemove); + for (auto i=shardIndex; i<mutableArchiveQueueShards->size()-1; i++) { + mutableArchiveQueueShards->SwapElements(i, i+1); } m_payload.mutable_archivequeuesshards()->RemoveLast(); } + // We should also trim the removed jobs from our list. + localJobsToRemove.remove_if( + [&removalResult](const std::string & ja){ + return std::count_if(removalResult.removedJobs.begin(), removalResult.removedJobs.end(), + [&ja](ArchiveQueueShard::JobInfo & j) { + return j.address == ja; + } + ); + } + ); // end of remove_if // And commit the queue (once per shard should not hurt performance). commit(); } diff --git a/objectstore/ArchiveQueueShard.cpp b/objectstore/ArchiveQueueShard.cpp index d79873629fd7be0de12418d80225e595129dad05..345e4ea9f7570648acd4e9cede8d04108bdb4c7b 100644 --- a/objectstore/ArchiveQueueShard.cpp +++ b/objectstore/ArchiveQueueShard.cpp @@ -18,6 +18,9 @@ */ #include "ArchiveQueueShard.hpp" +#include "GenericObject.hpp" +#include <google/protobuf/util/json_util.h> + namespace cta { namespace objectstore { @@ -28,6 +31,14 @@ ArchiveQueueShard::ArchiveQueueShard(Backend& os): ArchiveQueueShard::ArchiveQueueShard(const std::string& address, Backend& os): ObjectOps<serializers::ArchiveQueueShard, serializers::ArchiveQueueShard_t>(os, address) { } +ArchiveQueueShard::ArchiveQueueShard(GenericObject& go): + ObjectOps<serializers::ArchiveQueueShard, serializers::ArchiveQueueShard_t>(go.objectStore()) { + // Here we transplant the generic object into the new object + go.transplantHeader(*this); + // And interpret the header. + getPayloadFromHeader(); +} + void ArchiveQueueShard::rebuild() { checkPayloadWritable(); uint64_t totalSize=0; @@ -37,6 +48,16 @@ void ArchiveQueueShard::rebuild() { m_payload.set_archivejobstotalsize(totalSize); } +std::string ArchiveQueueShard::dump() { + checkPayloadReadable(); + google::protobuf::util::JsonPrintOptions options; + options.add_whitespace = true; + options.always_print_primitive_fields = true; + std::string headerDump; + google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options); + return headerDump; +} + void ArchiveQueueShard::garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) { throw exception::Exception("In ArchiveQueueShard::garbageCollect(): garbage collection should not be necessary for this type of object."); } @@ -124,6 +145,7 @@ auto ArchiveQueueShard::getJobsSummary() -> JobsSummary { checkPayloadReadable(); JobsSummary ret; ret.bytes = m_payload.archivejobstotalsize(); + ret.jobs = m_payload.archivejobs_size(); return ret; } diff --git a/objectstore/ArchiveQueueShard.hpp b/objectstore/ArchiveQueueShard.hpp index 03a36c008f9ca6050bd458c162c724a38b2f8555..1a2074305da88d48db600bc8e0a110ecfb66b280 100644 --- a/objectstore/ArchiveQueueShard.hpp +++ b/objectstore/ArchiveQueueShard.hpp @@ -30,12 +30,18 @@ public: // Constructor ArchiveQueueShard(const std::string & address, Backend & os); + // Upgrader form generic object + ArchiveQueueShard(GenericObject & go); + // Forbid/hide base initializer void initialize() = delete; // Initializer void initialize(const std::string & owner); + // dumper + std::string dump(); + void garbageCollect(const std::string& presumedOwner, AgentReference& agentReference, log::LogContext& lc, cta::catalogue::Catalogue& catalogue) override; struct JobInfo { diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 63a58bd5294345d3f5924ecd9a6be340f6647547..0462e33ac7dd0335981f512c413bc533ecae0228 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -59,8 +59,6 @@ void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber, j->set_status(serializers::ArchiveJobStatus::AJS_LinkingToArchiveQueue); j->set_tapepool(tapepool); j->set_owner(archivequeueaddress); - // XXX This field (archivequeueaddress) is a leftover from a past layout when tape pools were static - // in the object store, and should be eventually removed. j->set_archivequeueaddress(""); j->set_totalretries(0); j->set_retrieswithinmount(0); diff --git a/objectstore/GenericObject.cpp b/objectstore/GenericObject.cpp index 5eddf92380716316ccb26d085bf8326f3512fe9c..d980678369bac1987caa442665750ca6d65a910f 100644 --- a/objectstore/GenericObject.cpp +++ b/objectstore/GenericObject.cpp @@ -25,6 +25,7 @@ #include "RootEntry.hpp" #include "SchedulerGlobalLock.hpp" #include "ArchiveQueue.hpp" +#include "ArchiveQueueShard.hpp" #include "RetrieveQueue.hpp" #include "DriveRegister.hpp" #include <stdexcept> @@ -182,6 +183,9 @@ std::string GenericObject::dump() { case serializers::ArchiveQueue_t: bodyDump = dumpWithType<cta::objectstore::ArchiveQueue>(this); break; + case serializers::ArchiveQueueShard_t: + bodyDump = dumpWithType<cta::objectstore::ArchiveQueueShard>(this); + break; case serializers::RetrieveQueue_t: bodyDump = dumpWithType<cta::objectstore::RetrieveQueue>(this); break; diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index 1bd01c3ac038923dec303c2ad4b084ff8b9608e9..dd0bbaf3adcd537a97009781bc56047a65b8f817 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -342,13 +342,13 @@ public: } void setObjectLocked(ObjectOpsBase* objectOps) override { - m_objectOps->m_locksCount++; - m_objectOps->m_locksForWriteCount++; + objectOps->m_locksCount++; + objectOps->m_locksForWriteCount++; } void setObjectUnlocked(ObjectOpsBase* objectOps) override { - m_objectOps->m_locksCount--; - m_objectOps->m_locksForWriteCount--; + objectOps->m_locksCount--; + objectOps->m_locksForWriteCount--; } void lock(ObjectOpsBase & oo, uint64_t timeout_us = 0) {