diff --git a/objectstore/ArchiveQueueAlgorithms.cpp b/objectstore/ArchiveQueueAlgorithms.cpp index 15985bad4b467dc6bec27939c7085461f5e5096a..bba7a55afd797778149e1400995a416c2ed5cdee 100644 --- a/objectstore/ArchiveQueueAlgorithms.cpp +++ b/objectstore/ArchiveQueueAlgorithms.cpp @@ -240,7 +240,7 @@ auto ContainerTraits<ArchiveQueueToReport>::getPoppingElementsCandidates(Contain elem.archiveReportURL = ""; elem.errorReportURL = ""; elem.latestError = ""; - elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired; + elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::Report; ret.summary.files++; } return ret; @@ -280,8 +280,8 @@ auto ContainerTraits<ArchiveQueueToReport>::PopCriteria::operator-=(const Popped return *this; } -void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, - log::LogContext& lc) { +void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, QueueType queueType, ScopedExclusiveLock & contLock, + const ContainerIdentifyer & cId, log::LogContext& lc) { if (cont.isEmpty()) { // The current implementation is done unlocked. contLock.release(); @@ -290,7 +290,7 @@ void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, Scope RootEntry re(cont.m_objectStore); ScopedExclusiveLock rexl(re); re.fetch(); - re.removeArchiveQueueAndCommit(cId, QueueType::JobsToTransfer, lc); + re.removeArchiveQueueAndCommit(cId, queueType, lc); log::ScopedParamContainer params(lc); params.add("tapepool", cId) .add("queueObject", cont.getAddressIfSet()); diff --git a/objectstore/ArchiveQueueAlgorithms.hpp b/objectstore/ArchiveQueueAlgorithms.hpp index 2256614010a2378f9dfb5c720ceed8850112e78c..5ffa368e6c58311ea400b0f734d0e12d4a6f8d14 100644 --- a/objectstore/ArchiveQueueAlgorithms.hpp +++ b/objectstore/ArchiveQueueAlgorithms.hpp @@ -162,7 +162,17 @@ public: e->archiveReportURL = u->get()->getArchiveReportURL(); e->errorReportURL = u->get()->getArchiveErrorReportURL(); e->srcURL = u->get()->getSrcURL(); - //if (u->get()->) + switch(u->get()->getJobStatus()) { + case serializers::ArchiveJobStatus::AJS_ToReportForTransfer: + e->reportType = SchedulerDatabase::ArchiveJob::ReportType::CompletionReport; + break; + case serializers::ArchiveJobStatus::AJS_ToReportForFailure: + e->reportType = SchedulerDatabase::ArchiveJob::ReportType::FailureReport; + break; + default: + e->reportType = SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired; + break; + } } catch (...) { ret.push_back(OpFailure<PoppedElement>()); ret.back().element = &(*e); @@ -175,7 +185,11 @@ public: return ret; } - static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc); + static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc) { + trimContainerIfNeeded(cont, QueueType::JobsToTransfer, contLock, cId, lc); + } +protected: + static void trimContainerIfNeeded (Container& cont, QueueType queueType, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc); }; @@ -212,9 +226,17 @@ public: static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria, ElementsToSkipSet & elemtsToSkip, log::LogContext & lc); + + static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc) { + ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(cont, QueueType::JobsToReport, contLock, cId, lc); + } }; template<> -class ContainerTraits<ArchiveQueueFailed>: public ContainerTraits<ArchiveQueueToReport> {/* Same same */ }; +class ContainerTraits<ArchiveQueueFailed>: public ContainerTraits<ArchiveQueueToReport> {/* Same same */ + static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc) { + ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(cont, QueueType::FailedJobs, contLock, cId, lc); + } +}; }} // namespace cta::objectstore \ No newline at end of file diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 04610a8d29c61daace713996a51ffeb559566eb3..119ceba7209732509fa0782619f19e957a1583c1 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -554,6 +554,10 @@ const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() { return m_srcURL; } +objectstore::serializers::ArchiveJobStatus ArchiveRequest::AsyncJobOwnerUpdater::getJobStatus() { + return m_jobStatus; +} + ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTransferSuccessful(const uint16_t copyNumber ) { std::unique_ptr<AsyncTransferSuccessfulUpdater> ret(new AsyncTransferSuccessfulUpdater); // Passing a reference to the unique pointer led to strange behaviors. diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index 5d113c657d2fbceb1de87f8d1263e892ba55a74e..bf736b9e66c30d303ace5ffc8a5440692a34be9d 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -136,7 +136,7 @@ std::string cta::ArchiveJob::reportType() { return "ErrorReport"; default: { - throw exception::Exception("In ArchiveJob::reportURL(): job status does not require reporting."); + throw exception::Exception("In ArchiveJob::reportType(): job status does not require reporting."); } } } diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 5238ff9093e2d02583855a2fca0ee3cd6a03cee5..5509d514fbed6caffb814afd7bb86a43e2cef8ae 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -172,7 +172,8 @@ public: enum class ReportType: uint8_t { NoReportRequired, CompletionReport, - FailureReport + FailureReport, + Report ///< A generic grouped type } reportType; cta::common::dataStructures::ArchiveFile archiveFile; cta::common::dataStructures::TapeFile tapeFile; diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp index c0f01e5aead4cc34e767ca0228caa57b64fb75fe..81fdf70ad95b6f49944c9fa3caefa548c469b6ed 100644 --- a/scheduler/SchedulerDatabaseTest.cpp +++ b/scheduler/SchedulerDatabaseTest.cpp @@ -205,7 +205,7 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) { jobBatch.emplace_back(std::move(aj.front())); aj.pop_front(); count++; - am->setJobBatchSuccessful(jobBatch, lc); + am->setJobBatchTransferred(jobBatch, lc); } else done = true; @@ -293,7 +293,7 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) { jobBatch.emplace_back(std::move(aj.front())); aj.pop_front(); count++; - am->setJobBatchSuccessful(jobBatch, lc); + am->setJobBatchTransferred(jobBatch, lc); } else done = true; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 3951d28c9ad7890afb36de195608333a11941686..d40421a22e8657b7ead04290fc57af4b50268c6f 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -356,7 +356,7 @@ TEST_P(SchedulerTest, archive_to_new_file) { // ASSERT_FALSE(found); //} -TEST_P(SchedulerTest, archive_and_retrieve_new_file) { +TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) { using namespace cta; Scheduler &scheduler = getScheduler(); @@ -472,6 +472,17 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { ASSERT_EQ(0, archiveJobBatch.size()); archiveMount->complete(); } + + { + // Emulate the the reporter process reporting successful transfer to tape to the disk system + auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc); + ASSERT_NE(0, jobsToReport.size()); + eos::DiskReporterFactory factory; + log::TimingList timings; + utils::Timer t; + scheduler.reportArchiveJobsBatch(jobsToReport, factory, timings, t, lc); + ASSERT_EQ(0, scheduler.getNextArchiveJobsToReportBatch(10, lc).size()); + } { cta::common::dataStructures::EntryLog creationLog;