Commit e0710afc authored by Eric Cano's avatar Eric Cano
Browse files

Fixed report queue bugs.

Extended unit test to also simulate reporter.
Fixed issue with trimming trying to trim the wrong type of queue.
Fixed report type not properly extracted when popping request for report.
parent 5c3d1d92
......@@ -240,7 +240,7 @@ auto ContainerTraits<ArchiveQueueToReport>::getPoppingElementsCandidates(Contain
elem.archiveReportURL = "";
elem.errorReportURL = "";
elem.latestError = "";
elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired;
elem.reportType = SchedulerDatabase::ArchiveJob::ReportType::Report;
ret.summary.files++;
}
return ret;
......@@ -280,8 +280,8 @@ auto ContainerTraits<ArchiveQueueToReport>::PopCriteria::operator-=(const Popped
return *this;
}
void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId,
log::LogContext& lc) {
void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, QueueType queueType, ScopedExclusiveLock & contLock,
const ContainerIdentifyer & cId, log::LogContext& lc) {
if (cont.isEmpty()) {
// The current implementation is done unlocked.
contLock.release();
......@@ -290,7 +290,7 @@ void ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(Container& cont, Scope
RootEntry re(cont.m_objectStore);
ScopedExclusiveLock rexl(re);
re.fetch();
re.removeArchiveQueueAndCommit(cId, QueueType::JobsToTransfer, lc);
re.removeArchiveQueueAndCommit(cId, queueType, lc);
log::ScopedParamContainer params(lc);
params.add("tapepool", cId)
.add("queueObject", cont.getAddressIfSet());
......
......@@ -162,7 +162,17 @@ public:
e->archiveReportURL = u->get()->getArchiveReportURL();
e->errorReportURL = u->get()->getArchiveErrorReportURL();
e->srcURL = u->get()->getSrcURL();
//if (u->get()->)
switch(u->get()->getJobStatus()) {
case serializers::ArchiveJobStatus::AJS_ToReportForTransfer:
e->reportType = SchedulerDatabase::ArchiveJob::ReportType::CompletionReport;
break;
case serializers::ArchiveJobStatus::AJS_ToReportForFailure:
e->reportType = SchedulerDatabase::ArchiveJob::ReportType::FailureReport;
break;
default:
e->reportType = SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired;
break;
}
} catch (...) {
ret.push_back(OpFailure<PoppedElement>());
ret.back().element = &(*e);
......@@ -175,7 +185,11 @@ public:
return ret;
}
static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc);
static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc) {
trimContainerIfNeeded(cont, QueueType::JobsToTransfer, contLock, cId, lc);
}
protected:
static void trimContainerIfNeeded (Container& cont, QueueType queueType, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc);
};
......@@ -212,9 +226,17 @@ public:
static PoppedElementsBatch getPoppingElementsCandidates(Container & cont, PopCriteria & unfulfilledCriteria,
ElementsToSkipSet & elemtsToSkip, log::LogContext & lc);
static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc) {
ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(cont, QueueType::JobsToReport, contLock, cId, lc);
}
};
template<>
class ContainerTraits<ArchiveQueueFailed>: public ContainerTraits<ArchiveQueueToReport> {/* Same same */ };
class ContainerTraits<ArchiveQueueFailed>: public ContainerTraits<ArchiveQueueToReport> {/* Same same */
static void trimContainerIfNeeded (Container& cont, ScopedExclusiveLock & contLock, const ContainerIdentifyer & cId, log::LogContext& lc) {
ContainerTraits<ArchiveQueue>::trimContainerIfNeeded(cont, QueueType::FailedJobs, contLock, cId, lc);
}
};
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -554,6 +554,10 @@ const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() {
return m_srcURL;
}
objectstore::serializers::ArchiveJobStatus ArchiveRequest::AsyncJobOwnerUpdater::getJobStatus() {
return m_jobStatus;
}
ArchiveRequest::AsyncTransferSuccessfulUpdater * ArchiveRequest::asyncUpdateTransferSuccessful(const uint16_t copyNumber ) {
std::unique_ptr<AsyncTransferSuccessfulUpdater> ret(new AsyncTransferSuccessfulUpdater);
// Passing a reference to the unique pointer led to strange behaviors.
......
......@@ -136,7 +136,7 @@ std::string cta::ArchiveJob::reportType() {
return "ErrorReport";
default:
{
throw exception::Exception("In ArchiveJob::reportURL(): job status does not require reporting.");
throw exception::Exception("In ArchiveJob::reportType(): job status does not require reporting.");
}
}
}
......
......@@ -172,7 +172,8 @@ public:
enum class ReportType: uint8_t {
NoReportRequired,
CompletionReport,
FailureReport
FailureReport,
Report ///< A generic grouped type
} reportType;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::TapeFile tapeFile;
......
......@@ -205,7 +205,7 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
jobBatch.emplace_back(std::move(aj.front()));
aj.pop_front();
count++;
am->setJobBatchSuccessful(jobBatch, lc);
am->setJobBatchTransferred(jobBatch, lc);
}
else
done = true;
......@@ -293,7 +293,7 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
jobBatch.emplace_back(std::move(aj.front()));
aj.pop_front();
count++;
am->setJobBatchSuccessful(jobBatch, lc);
am->setJobBatchTransferred(jobBatch, lc);
}
else
done = true;
......
......@@ -356,7 +356,7 @@ TEST_P(SchedulerTest, archive_to_new_file) {
// ASSERT_FALSE(found);
//}
TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) {
using namespace cta;
Scheduler &scheduler = getScheduler();
......@@ -472,6 +472,17 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
ASSERT_EQ(0, archiveJobBatch.size());
archiveMount->complete();
}
{
// Emulate the the reporter process reporting successful transfer to tape to the disk system
auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc);
ASSERT_NE(0, jobsToReport.size());
eos::DiskReporterFactory factory;
log::TimingList timings;
utils::Timer t;
scheduler.reportArchiveJobsBatch(jobsToReport, factory, timings, t, lc);
ASSERT_EQ(0, scheduler.getNextArchiveJobsToReportBatch(10, lc).size());
}
{
cta::common::dataStructures::EntryLog creationLog;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment