From 73e8f68fb970d50df05ee5e00594d2b368a3a24a Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Mon, 7 May 2018 12:13:34 +0200
Subject: [PATCH] Improved retrieve queue insertion algorith selection.

---
 objectstore/RetrieveQueueShard.cpp | 65 ++++++++++++++++++++++++++----
 1 file changed, 58 insertions(+), 7 deletions(-)

diff --git a/objectstore/RetrieveQueueShard.cpp b/objectstore/RetrieveQueueShard.cpp
index a560093333..96d6aee890 100644
--- a/objectstore/RetrieveQueueShard.cpp
+++ b/objectstore/RetrieveQueueShard.cpp
@@ -173,14 +173,65 @@ auto RetrieveQueueShard::getJobsSummary() -> JobsSummary {
 
 void RetrieveQueueShard::addJobsBatch(JobsToAddSet& jobsToAdd) {
   checkPayloadWritable();
-  // Decide on the best algorithm. In place insertion implies 2*k*(N/2) copies
+  // Protect following algorithms against zero-sized array.
+  if (jobsToAdd.empty()) return; 
+  // Decide on the best algorithm. In place insertion implies 3*k*(N/2) copies
   // as we move the inserted job in place by swaps from the end (N/2 on average).
+  // In practice, we can reduce the value of N by finding the last index imin 
+  // at which fseq is smaller than the lowest fseq of the data to be inserted.
+  // We then know that the first element to insert take exactly Np = N-imin-1 swaps
+  // to move in place.
+  // If we make no assumption for the other jobs to insert, they will each require 
+  // Np/2 swaps to move  in place on average.
+  // The cost can then reduced to Np + (k-1)Np/2 = 3*(k+1)Np/2
   // On the other hand, insertion through memory implies 2N+3k copies, N to copy
   // to a set, k copies to create the second set, k copies again during merge
   // and then N+k copies to copy the merged set into payload.
+  // Find Np by bisection.
   size_t N=m_payload.retrievejobs_size();
+  size_t Np=N;
+  auto fSeqLimit = jobsToAdd.begin()->fSeq;
+  if (0==N) Np=0;
+  else if (1==N) Np=m_payload.retrievejobs(0).fseq() <= fSeqLimit? 0: 1;
+  else {
+    size_t iminmin=0;
+    size_t iminmax=N-1;
+    size_t imin=N/2;
+    uint64_t iminfseq = ~0;
+    uint64_t imin1fseq = ~0;
+    while (true) {
+      iminfseq = m_payload.retrievejobs(imin).fseq();
+      if (iminfseq > fSeqLimit) {
+        iminmax=imin;
+        imin=(iminmin+iminmax)/2;
+        // Did we reach the beginning of the array?
+        if (!imin) {
+          Np=m_payload.retrievejobs(0).fseq() <= fSeqLimit? N-1: N;
+          break;
+        }
+      } else {
+        // Before comparing, make sure we are not reaching the end of the set.
+        if (imin >= N-1) {
+          Np=m_payload.retrievejobs(N-1).fseq() <= fSeqLimit? 0: 1;
+          break;
+        }
+        imin1fseq = m_payload.retrievejobs(imin+1).fseq();
+        if (imin1fseq > fSeqLimit) {
+          // We found the right imin.
+          Np=N-imin-1;
+          break;
+        } else if (imin+1 == N-1) {
+          Np=0;
+          break;
+        } else {
+          iminmin=imin;
+          imin=(iminmin+iminmax)/2;
+        }
+      }
+    }
+  }
   size_t k=jobsToAdd.size();
-  if (k*N > 2*N+3*k)
+  if (3*(k+1)*Np/2 > 2*N+3*k)
     addJobsThroughCopy(jobsToAdd);
   else
     addJobsInPlace(jobsToAdd);
@@ -212,14 +263,14 @@ void RetrieveQueueShard::addJob(const RetrieveQueue::JobToAdd& jobToAdd) {
 
 void RetrieveQueueShard::addJobsThroughCopy(JobsToAddSet& jobsToAdd) {
   checkPayloadWritable();
-  SerializedJobsToAddSet jobsMap;
+  SerializedJobsToAddSet jobsSet;
   SerializedJobsToAddSet serializedJobsToAdd;
-  SerializedJobsToAddSet::iterator i = jobsMap.begin();
+  SerializedJobsToAddSet::iterator i = jobsSet.begin();
   // Copy the request pointers in memory (in an ordered multi set)
   for (auto &j: m_payload.retrievejobs())
     // As the queue is already sorted, we hit at the right
     // location (in-order insertion).
-    i = jobsMap.insert(i, j);
+    i = jobsSet.insert(i, j);
   // Create a serialized version of the jobs to add.
   i = serializedJobsToAdd.begin();
   uint64_t totalSize = m_payload.retrievejobstotalsize();
@@ -237,10 +288,10 @@ void RetrieveQueueShard::addJobsThroughCopy(JobsToAddSet& jobsToAdd) {
     totalSize+=jobToAdd.fileSize;
   }
   // Let STL do the heavy lifting of in-order insertion.
-  jobsMap.insert(serializedJobsToAdd.begin(), serializedJobsToAdd.end());
+  jobsSet.insert(serializedJobsToAdd.begin(), serializedJobsToAdd.end());
   // Recreate the shard from the in-memory image (it's already sorted).
   m_payload.mutable_retrievejobs()->Clear();
-  for (auto &j: jobsMap)
+  for (auto &j: jobsSet)
     *m_payload.add_retrievejobs() = j;
   m_payload.set_retrievejobstotalsize(totalSize);
 }
-- 
GitLab