diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 0d2565001a8acbfcf49e71adf067624bad4c3ebe..febdba87722cfa5aa078a83ab8fa0b30adb7af0e 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1648,6 +1648,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun double asyncUpdateLaunchTime = 0; double jobsUpdateTime = 0; double queueProcessAndCommitTime = 0; + double queueRemovalTime = 0; double ownershipRemovalTime = 0; // Find the next files to archive // First, check we should not forcibly go down. In such an occasion, we just find noting to do. @@ -1683,6 +1684,16 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun uint64_t currentFiles=0; size_t iterationCount=0; while (true) { + double localFindQueueTime = 0; + double localLockFetchQueueTime = 0; + double localEmptyCleanupQueueTime = 0; + double localJobSelectionTime = 0; + double localOwnershipAdditionTime = 0; + double localAsyncLaunchTime = 0; + double localQueueProcessAndCommitTime = 0; + double localOwnershipRemovalTime = 0; + double localJobsUpdateTime = 0; + double localQueueRemovalTime = 0; iterationCount++; uint64_t beforeBytes=currentBytes; uint64_t beforeFiles=currentFiles; @@ -1699,12 +1710,12 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun // try and lock the archive queue. Any failure from here on means the end of the getting jobs. objectstore::ArchiveQueue aq(aqAddress, m_oStoreDB.m_objectStore); objectstore::ScopedExclusiveLock aqlock; - findQueueTime += t.secs(utils::Timer::resetCounter); + findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter); try { try { aqlock.lock(aq); aq.fetch(); - lockFetchQueueTime += t.secs(utils::Timer::resetCounter); + lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter); } catch (cta::exception::Exception & ex) { // The queue is now absent. We can remove its reference in the root entry. // A new queue could have been added in the mean time, and be non-empty. @@ -1731,15 +1742,18 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun .add("queueObject", aq.getAddressIfSet()); logContext.log(log::DEBUG, "In ArchiveMount::getNextJobBatch(): could not de-referenced missing queue from root entry: already done."); } - emptyQueueCleanupTime += t.secs(utils::Timer::resetCounter); + emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter); continue; } // We now have the queue. + auto queueObject = aq.getAddressIfSet(); + auto queueSummaryBefore = aq.getJobsSummary(); { log::ScopedParamContainer params(logContext); params.add("tapepool", mountInfo.tapePool) - .add("queueObject", aq.getAddressIfSet()) - .add("queueSize", aq.getJobsSummary().jobs); + .add("queueObject", queueObject) + .add("jobs", queueSummaryBefore.jobs) + .add("bytes", queueSummaryBefore.bytes); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): archive queue found."); } // The queue will give us a list of files to try and grab. We will attempt to @@ -1766,20 +1780,20 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun .add("requestedBytes", bytesRequested); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): will process a set of candidate jobs."); } - jobSelectionTime += t.secs(utils::Timer::resetCounter); + jobSelectionTime += localJobSelectionTime = t.secs(utils::Timer::resetCounter); // We now have a batch of jobs to try and dequeue. Should not be empty. // First add the jobs to the owned list of the agent. std::list<std::string> addedJobs; for (const auto &j: candidateJobs) addedJobs.emplace_back(j->m_archiveRequest.getAddressIfSet()); m_oStoreDB.m_agentReference->addBatchToOwnership(addedJobs, m_oStoreDB.m_objectStore); - ownershipAdditionTime += t.secs(utils::Timer::resetCounter); + ownershipAdditionTime += localOwnershipAdditionTime = t.secs(utils::Timer::resetCounter); // We can now attempt to switch the ownership of the jobs. Depending on the type of failure (if any) we // will adapt the rest. // First, start the parallel updates of jobs std::list<std::unique_ptr<objectstore::ArchiveRequest::AsyncJobOwnerUpdater>> jobUpdates; for (const auto &j: candidateJobs) jobUpdates.emplace_back( j->m_archiveRequest.asyncUpdateJobOwner(j->tapeFile.copyNb, m_oStoreDB.m_agentReference->getAgentAddress(), aqAddress)); - asyncUpdateLaunchTime += t.secs(utils::Timer::resetCounter); + asyncUpdateLaunchTime += localAsyncLaunchTime = t.secs(utils::Timer::resetCounter); // Now run through the results of the asynchronous updates. Non-success results come in the form of exceptions. std::list<std::string> jobsToForget; // The jobs either absent or not owned, for which we should just remove references (agent). std::list<std::string> jobsToDequeue; // The jobs that should not be queued anymore. All of them indeed (invalid or successfully poped). @@ -1875,17 +1889,17 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun currentFiles--; currentBytes-=(*j)->archiveFile.fileSize; } - jobsUpdateTime += t.secs(utils::Timer::resetCounter); + jobsUpdateTime += localJobsUpdateTime = t.secs(utils::Timer::resetCounter); // In all cases: move to the nexts. ju=jobUpdates.erase(ju); j=candidateJobs.erase(j); } // All (most) jobs are now officially owned by our agent. We can hence remove them from the queue. aq.removeJobsAndCommit(jobsToDequeue); - queueProcessAndCommitTime += t.secs(utils::Timer::resetCounter); + queueProcessAndCommitTime += localQueueProcessAndCommitTime = t.secs(utils::Timer::resetCounter); if (jobsToForget.size()) { m_oStoreDB.m_agentReference->removeBatchFromOwnership(jobsToForget, m_oStoreDB.m_objectStore); - ownershipRemovalTime += t.secs(utils::Timer::resetCounter); + ownershipRemovalTime += localOwnershipRemovalTime = t.secs(utils::Timer::resetCounter); } // We can now add the validated jobs to the return value. auto vj = validatedJobs.begin(); @@ -1894,10 +1908,10 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun vj=validatedJobs.erase(vj); } // Before going for another round, we can release the queue and delete it if we emptied it. - auto remainingJobs=aq.dumpJobs().size(); + auto remainingJobsSummary=aq.getJobsSummary(); aqlock.release(); // If the queue is empty, we can get rid of it. - if (!remainingJobs) { + if (!remainingJobsSummary.jobs) { try { // The queue should be removed as it is empty. ScopedExclusiveLock rexl(re); @@ -1914,6 +1928,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun .add("Message", ex.getMessageValue()); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): could not delete a presumably empty queue"); } + queueRemovalTime += localQueueRemovalTime = t.secs(utils::Timer::resetCounter); } // We can now summarize this round { @@ -1926,6 +1941,20 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun .add("bytesBefore", beforeBytes) .add("filesAfter", currentFiles) .add("bytesAfter", currentBytes) + .add("findQueueTime", localFindQueueTime) + .add("lockFetchQueueTime", localLockFetchQueueTime) + .add("emptyQueueCleanupTime", localEmptyCleanupQueueTime) + .add("jobSelectionTime", localJobSelectionTime) + .add("ownershipAdditionTime", localOwnershipAdditionTime) + .add("asyncUpdateLaunchTime", localAsyncLaunchTime) + .add("jobsUpdateTime", localJobsUpdateTime) + .add("queueProcessAndCommitTime", localQueueProcessAndCommitTime) + .add("ownershipRemovalTime", localOwnershipRemovalTime) + .add("queueRemovalTime", localQueueRemovalTime) + .add("iterationTime", localFindQueueTime + localLockFetchQueueTime + localEmptyCleanupQueueTime + + localJobSelectionTime + localOwnershipAdditionTime + localAsyncLaunchTime + + localJobsUpdateTime + localQueueProcessAndCommitTime + localOwnershipRemovalTime + + localQueueRemovalTime) .add("iterationCount", iterationCount); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): did one round of jobs retrieval."); } @@ -1974,6 +2003,7 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun .add("jobsUpdateTime", jobsUpdateTime) .add("queueProcessAndCommitTime", queueProcessAndCommitTime) .add("ownershipRemovalTime", ownershipRemovalTime) + .add("queueRemovalTime", queueRemovalTime) .add("schedulerDbTime", totalTime.secs()); logContext.log(log::INFO, "In ArchiveMount::getNextJobBatch(): jobs retrieval complete."); }