From 2dd5369bc7a0db47dca603d1b681df219ec354df Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Mon, 27 Aug 2018 09:07:39 +0200
Subject: [PATCH] Fixed report queue bugs.

Extended unit test to also simulate reporter.
Fixed issue with trimming trying to trim the wrong type of queue.
Fixed report type not properly extracted when popping request for report.
---
 objectstore/ArchiveQueueAlgorithms.cpp |  8 ++++----
 objectstore/ArchiveQueueAlgorithms.hpp | 28 +++++++++++++++++++++++---
 objectstore/ArchiveRequest.cpp         |  4 ++++
 scheduler/ArchiveJob.cpp               |  2 +-
 scheduler/SchedulerDatabase.hpp        |  3 ++-
 scheduler/SchedulerDatabaseTest.cpp    |  4 ++--
 scheduler/SchedulerTest.cpp            | 13 +++++++++++-
 7 files changed, 50 insertions(+), 12 deletions(-)

diff --git a/objectstore/ArchiveQueueAlgorithms.cpp b/objectstore/ArchiveQueueAlgorithms.cpp
index 15985bad4b..bba7a55afd 100644
--- a/objectstore/ArchiveQueueAlgorithms.cpp
+++ b/objectstore/ArchiveQueueAlgorithms.cpp
@@ -240,7 +240,7 @@ auto ContainerTraits<ArchiveQueueToReport>::getPoppingElementsCandidates(Contain
     elem.archiveReportURL = "";
     elem.errorReportURL = "";
     elem.latestError = "";
-    elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired;
+    elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::Report;
     ret.summary.files++;
   }
   return ret;
@@ -280,8 +280,8 @@ auto ContainerTraits<ArchiveQueueToReport>::PopCriteria::operator-=(const Popped
   return *this;
 }
 
-void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId,
-    log::LogContext& lc) {
+void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, QueueType queueType, ScopedExclusiveLock & contLock,
+    const ContainerIdentifyer & cId, log::LogContext& lc) {
   if (cont.isEmpty()) {
     // The current implementation is done unlocked.
     contLock.release();
@@ -290,7 +290,7 @@ void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, Scope
       RootEntry re(cont.m_objectStore);
       ScopedExclusiveLock rexl(re);
       re.fetch();
-      re.removeArchiveQueueAndCommit(cId, QueueType::JobsToTransfer, lc);
+      re.removeArchiveQueueAndCommit(cId, queueType, lc);
       log::ScopedParamContainer params(lc);
       params.add("tapepool", cId)
             .add("queueObject", cont.getAddressIfSet());
diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp
index 2256614010..5ffa368e6c 100644
--- a/objectstore/ArchiveQueueAlgorithms.hpp
+++ b/objectstore/ArchiveQueueAlgorithms.hpp
@@ -162,7 +162,17 @@ public:
         e->archiveReportURL = u->get()->getArchiveReportURL();
         e->errorReportURL = u->get()->getArchiveErrorReportURL();
         e->srcURL = u->get()->getSrcURL();
-        //if (u->get()->)
+        switch(u->get()->getJobStatus()) {
+          case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
+            e->reportType = SchedulerDatabase::ArchiveJob::ReportType::CompletionReport;
+            break;
+          case serializers::ArchiveJobStatus::AJS_ToReportForFailure:
+            e->reportType = SchedulerDatabase::ArchiveJob::ReportType::FailureReport;
+            break;
+          default:
+            e->reportType = SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired;
+            break;
+        }
       } catch (...) {
         ret.push_back(OpFailure<PoppedElement>());
         ret.back().element = &(*e);
@@ -175,7 +185,11 @@ public:
     return ret;
   }
   
-  static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc);
+  static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc) {
+    trimContainerIfNeeded(cont, QueueType::JobsToTransfer, contLock, cId, lc);
+  }
+protected:
+  static void trimContainerIfNeeded (Container& cont, QueueType queueType, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc);
   
 };
 
@@ -212,9 +226,17 @@ public:
   
   static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria,
       ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
+  
+  static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc) {
+    ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(cont, QueueType::JobsToReport, contLock, cId, lc);
+  }
 };
 
 template<>
-class ContainerTraits<ArchiveQueueFailed>: public ContainerTraits<ArchiveQueueToReport> {/* Same same */ };
+class ContainerTraits<ArchiveQueueFailed>: public ContainerTraits<ArchiveQueueToReport> {/* Same same */ 
+  static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc) {
+    ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(cont, QueueType::FailedJobs, contLock, cId, lc);
+  }
+};
 
 }} // namespace cta::objectstore
\ No newline at end of file
diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp
index 04610a8d29..119ceba720 100644
--- a/objectstore/ArchiveRequest.cpp
+++ b/objectstore/ArchiveRequest.cpp
@@ -554,6 +554,10 @@ const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() {
   return m_srcURL;
 }
 
+objectstore::serializers::ArchiveJobStatus ArchiveRequest::AsyncJobOwnerUpdater::getJobStatus() {
+  return m_jobStatus;
+}
+
 ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTransferSuccessful(const uint16_t copyNumber ) {
   std::unique_ptr<AsyncTransferSuccessfulUpdater> ret(new AsyncTransferSuccessfulUpdater);  
   // Passing a reference to the unique pointer led to strange behaviors.
diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp
index 5d113c657d..bf736b9e66 100644
--- a/scheduler/ArchiveJob.cpp
+++ b/scheduler/ArchiveJob.cpp
@@ -136,7 +136,7 @@ std::string cta::ArchiveJob::reportType() {
     return "ErrorReport";
   default:
     { 
-      throw exception::Exception("In ArchiveJob::reportURL(): job status does not require reporting.");
+      throw exception::Exception("In ArchiveJob::reportType(): job status does not require reporting.");
     }
   }
 }
diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp
index 5238ff9093..5509d514fb 100644
--- a/scheduler/SchedulerDatabase.hpp
+++ b/scheduler/SchedulerDatabase.hpp
@@ -172,7 +172,8 @@ public:
     enum class ReportType: uint8_t {
       NoReportRequired,
       CompletionReport,
-      FailureReport
+      FailureReport,
+      Report ///< A generic grouped type
     } reportType;
     cta::common::dataStructures::ArchiveFile archiveFile;
     cta::common::dataStructures::TapeFile tapeFile;
diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp
index c0f01e5aea..81fdf70ad9 100644
--- a/scheduler/SchedulerDatabaseTest.cpp
+++ b/scheduler/SchedulerDatabaseTest.cpp
@@ -205,7 +205,7 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
       jobBatch.emplace_back(std::move(aj.front()));
       aj.pop_front();
       count++;
-      am->setJobBatchSuccessful(jobBatch, lc);
+      am->setJobBatchTransferred(jobBatch, lc);
     }
     else
       done = true;
@@ -293,7 +293,7 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
       jobBatch.emplace_back(std::move(aj.front()));
       aj.pop_front();
       count++;
-      am->setJobBatchSuccessful(jobBatch, lc);
+      am->setJobBatchTransferred(jobBatch, lc);
     }
     else
       done = true;
diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp
index 3951d28c9a..d40421a22e 100644
--- a/scheduler/SchedulerTest.cpp
+++ b/scheduler/SchedulerTest.cpp
@@ -356,7 +356,7 @@ TEST_P(SchedulerTest, archive_to_new_file) {
 //  ASSERT_FALSE(found);
 //}
 
-TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
+TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) {
   using namespace cta;
 
   Scheduler &scheduler = getScheduler();
@@ -472,6 +472,17 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
     ASSERT_EQ(0, archiveJobBatch.size());
     archiveMount->complete();
   }
+  
+  {
+    // Emulate the the reporter process reporting successful transfer to tape to the disk system
+    auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc);
+    ASSERT_NE(0, jobsToReport.size());
+    eos::DiskReporterFactory factory;
+    log::TimingList timings;
+    utils::Timer t;
+    scheduler.reportArchiveJobsBatch(jobsToReport, factory, timings, t, lc);
+    ASSERT_EQ(0, scheduler.getNextArchiveJobsToReportBatch(10, lc).size());
+  }
 
   {
     cta::common::dataStructures::EntryLog creationLog;
-- 
GitLab