From 7ed69006c487146c3a9a43bb73fd1fc48fbdfae4 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Tue, 15 May 2018 17:35:07 +0200
Subject: [PATCH] Added logs for retrieve queue popping.

---
 scheduler/OStoreDB/OStoreDB.cpp | 66 ++++++++++++++++++++++++++-------
 1 file changed, 52 insertions(+), 14 deletions(-)

diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp
index febdba8772..3b7d1af62d 100644
--- a/scheduler/OStoreDB/OStoreDB.cpp
+++ b/scheduler/OStoreDB/OStoreDB.cpp
@@ -1908,10 +1908,10 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
         vj=validatedJobs.erase(vj);
       }
       // Before going for another round, we can release the queue and delete it if we emptied it.
-      auto remainingJobsSummary=aq.getJobsSummary();
+      auto queueSummaryAfter=aq.getJobsSummary();
       aqlock.release();
       // If the queue is empty, we can get rid of it.
-      if (!remainingJobsSummary.jobs) {
+      if (!queueSummaryAfter.jobs) {
         try {
           // The queue should be removed as it is empty.
           ScopedExclusiveLock rexl(re);
@@ -1941,6 +1941,11 @@ std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMoun
               .add("bytesBefore", beforeBytes)
               .add("filesAfter", currentFiles)
               .add("bytesAfter", currentBytes)
+              .add("queueJobsBefore", queueSummaryBefore.jobs)
+              .add("queueBytesBefore", queueSummaryBefore.bytes)
+              .add("queueJobsAfter", queueSummaryAfter.jobs)
+              .add("queueBytesAfter", queueSummaryAfter.bytes)
+              .add("queueObject", queueObject)
               .add("findQueueTime", localFindQueueTime)
               .add("lockFetchQueueTime", localLockFetchQueueTime)
               .add("emptyQueueCleanupTime", localEmptyCleanupQueueTime)
@@ -2068,6 +2073,7 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
   double asyncUpdateLaunchTime = 0;
   double jobsUpdateTime = 0;
   double queueProcessAndCommitTime = 0;
+  double queueRemovalTime = 0;
   double ownershipRemovalTime = 0;
   // Find the next files to retrieve
   // First, check we should not forcibly go down. In such an occasion, we just find noting to do.
@@ -2103,6 +2109,16 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
   uint64_t currentFiles=0;
   size_t iterationCount=0;
   while (true) {
+    double localFindQueueTime = 0;
+    double localLockFetchQueueTime = 0;
+    double localEmptyCleanupQueueTime = 0;
+    double localJobSelectionTime = 0;
+    double localOwnershipAdditionTime = 0;
+    double localAsyncLaunchTime = 0;
+    double localQueueProcessAndCommitTime = 0;
+    double localOwnershipRemovalTime = 0;
+    double localJobsUpdateTime = 0;
+    double localQueueRemovalTime = 0;
     iterationCount++;
     uint64_t beforeBytes=currentBytes;
     uint64_t beforeFiles=currentFiles;
@@ -2119,12 +2135,12 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
     // try and lock the retrieve queue. Any failure from here on means the end of the getting jobs.
     objectstore::RetrieveQueue rq(rqAddress, m_oStoreDB.m_objectStore);
     objectstore::ScopedExclusiveLock rqLock;
-    findQueueTime += t.secs(utils::Timer::resetCounter);
+    findQueueTime += localFindQueueTime = t.secs(utils::Timer::resetCounter);
     try {
       try {
         rqLock.lock(rq);
         rq.fetch();
-        lockFetchQueueTime += t.secs(utils::Timer::resetCounter);
+        lockFetchQueueTime += localLockFetchQueueTime = t.secs(utils::Timer::resetCounter);
       } catch (cta::exception::Exception & ex) {
         // The queue is now absent. We can remove its reference in the root entry.
         // A new queue could have been added in the mean time, and be non-empty.
@@ -2145,10 +2161,12 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
                 .add("Message", ex.getMessageValue());
           logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): could not de-referenced missing queue from root entry");
         }
-        emptyQueueCleanupTime += t.secs(utils::Timer::resetCounter);
+        emptyQueueCleanupTime += localEmptyCleanupQueueTime = t.secs(utils::Timer::resetCounter);
         continue;
       }
       // We now have the queue.
+      auto queueObject = rq.getAddressIfSet();
+      auto queueSummaryBefore = rq.getJobsSummary();
       {
         log::ScopedParamContainer params(logContext);
         params.add("vid", mountInfo.vid)
@@ -2180,20 +2198,20 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
               .add("requestedBytes", bytesRequested);
         logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): will process a set of candidate jobs.");
       }
-      jobSelectionTime += t.secs(utils::Timer::resetCounter);
+      jobSelectionTime += localJobSelectionTime = t.secs(utils::Timer::resetCounter);
       // We now have a batch of jobs to try and dequeue. Should not be empty.
       // First add the jobs to the owned list of the agent.
       std::list<std::string> addedJobs;
       for (const auto &j: candidateJobs) addedJobs.emplace_back(j->m_retrieveRequest.getAddressIfSet());
       m_oStoreDB.m_agentReference->addBatchToOwnership(addedJobs, m_oStoreDB.m_objectStore);
-      ownershipAdditionTime += t.secs(utils::Timer::resetCounter);
+      ownershipAdditionTime += localOwnershipAdditionTime = t.secs(utils::Timer::resetCounter);
       // We can now attempt to switch the ownership of the jobs. Depending on the type of failure (if any) we
       // will adapt the rest.
       // First, start the parallel updates of jobs
       std::list<std::unique_ptr<objectstore::RetrieveRequest::AsyncOwnerUpdater>> jobUpdates;
       for (const auto &j: candidateJobs) jobUpdates.emplace_back(
         j->m_retrieveRequest.asyncUpdateOwner(j->selectedCopyNb, m_oStoreDB.m_agentReference->getAgentAddress(), rqAddress));
-      asyncUpdateLaunchTime += t.secs(utils::Timer::resetCounter);
+      asyncUpdateLaunchTime += localAsyncLaunchTime = t.secs(utils::Timer::resetCounter);
       // Now run through the results of the asynchronous updates. Non-sucess results come in the form of exceptions.
       std::list<std::string> jobsToForget; // The jobs either absent or not owned, for which we should just remove references (agent).
       std::list<std::string> jobsToDequeue; // The jobs that should not be queued anymore. All of them indeed (invalid or successfully poped).
@@ -2278,17 +2296,17 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
           currentFiles--;
           currentBytes-=(*j)->archiveFile.fileSize;
         }
-        jobsUpdateTime += t.secs(utils::Timer::resetCounter);
+        jobsUpdateTime += localJobsUpdateTime = t.secs(utils::Timer::resetCounter);
         // In all cases: move to the nexts.
         ju=jobUpdates.erase(ju);
         j=candidateJobs.erase(j);
       }
       // All (most) jobs are now officially owned by our agent. We can hence remove them from the queue.
       rq.removeJobsAndCommit(jobsToDequeue);
-      ownershipRemovalTime += t.secs(utils::Timer::resetCounter);
+      queueProcessAndCommitTime += localQueueProcessAndCommitTime = t.secs(utils::Timer::resetCounter);
       if (jobsToForget.size()) {
         m_oStoreDB.m_agentReference->removeBatchFromOwnership(jobsToForget, m_oStoreDB.m_objectStore);
-        ownershipRemovalTime += t.secs(utils::Timer::resetCounter);
+        ownershipRemovalTime += localOwnershipRemovalTime = t.secs(utils::Timer::resetCounter);
       }
       // We can now add the validated jobs to the return value.
       auto vj = validatedJobs.begin();
@@ -2297,10 +2315,10 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
         vj=validatedJobs.erase(vj);
       }
       // Before going for another round, we can release the queue and delete it if we emptied it.
-      auto remainingJobs=rq.getJobsSummary().files;
+      auto queueSummaryAfter=rq.getJobsSummary();
       rqLock.release();
       // If the queue is empty, we can get rid of it.
-      if (!remainingJobs) {
+      if (!queueSummaryAfter.files) {
         try {
           // The queue should be removed as it is empty.
           ScopedExclusiveLock rexl(re);
@@ -2317,11 +2335,12 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
                 .add("Message", ex.getMessageValue());
           logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): could not delete a presumably empty queue");
         }
+        queueRemovalTime += localQueueRemovalTime = t.secs(utils::Timer::resetCounter);
       }
       // We can now summarize this round
       {
         log::ScopedParamContainer params(logContext);
-        params.add("tapepool", mountInfo.tapePool)
+        params.add("vid", mountInfo.vid)
               .add("queueObject", rq.getAddressIfSet())
               .add("filesAdded", currentFiles - beforeFiles)
               .add("bytesAdded", currentBytes - beforeBytes)
@@ -2329,6 +2348,25 @@ std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > OStoreDB::RetrieveMo
               .add("bytesBefore", beforeBytes)
               .add("filesAfter", currentFiles)
               .add("bytesAfter", currentBytes)
+              .add("queueJobsBefore", queueSummaryBefore.files)
+              .add("queueBytesBefore", queueSummaryBefore.bytes)
+              .add("queueJobsAfter", queueSummaryAfter.files)
+              .add("queueBytesAfter", queueSummaryAfter.bytes)
+              .add("queueObject", queueObject)
+              .add("findQueueTime", localFindQueueTime)
+              .add("lockFetchQueueTime", localLockFetchQueueTime)
+              .add("emptyQueueCleanupTime", localEmptyCleanupQueueTime)
+              .add("jobSelectionTime", localJobSelectionTime)
+              .add("ownershipAdditionTime", localOwnershipAdditionTime)
+              .add("asyncUpdateLaunchTime", localAsyncLaunchTime)
+              .add("jobsUpdateTime", localJobsUpdateTime)
+              .add("queueProcessAndCommitTime", localQueueProcessAndCommitTime)
+              .add("ownershipRemovalTime", localOwnershipRemovalTime)
+              .add("queueRemovalTime", localQueueRemovalTime)
+              .add("iterationTime", localFindQueueTime + localLockFetchQueueTime + localEmptyCleanupQueueTime
+                                    + localJobSelectionTime + localOwnershipAdditionTime + localAsyncLaunchTime
+                                    + localJobsUpdateTime + localQueueProcessAndCommitTime + localOwnershipRemovalTime
+                                    + localQueueRemovalTime)
               .add("iterationCount", iterationCount);
         logContext.log(log::INFO, "In RetrieveMount::getNextJobBatch(): did one round of jobs retrieval.");
       }
-- 
GitLab