Commit 0654c5cd authored by Eric Cano's avatar Eric Cano
Browse files

Created DiskReportRunner class.

This class will handle reporting like the GarbageCollector class
does for garbage collection.

Adapted the interface of scheduler and ArchiveRequests to allow delegating
reporting to the disk report runner.

This commit is not functionnal. We still need to:
- Implement the ToReport/Failed queues interface.
- Adapt the queueing in the scheduler/ArchiveMount
- Implement the popping of jobs to report.
- Implement the async reporter for the files.
- Develop the user interface for failed requests.
parent fb501ebd
......@@ -32,7 +32,7 @@ cta::ArchiveJob::~ArchiveJob() throw() {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::ArchiveJob::ArchiveJob(ArchiveMount &mount,
cta::ArchiveJob::ArchiveJob(ArchiveMount *mount,
catalogue::Catalogue & catalogue,
const common::dataStructures::ArchiveFile &archiveFile,
const std::string &srcURL,
......@@ -45,8 +45,8 @@ cta::ArchiveJob::ArchiveJob(ArchiveMount &mount,
//------------------------------------------------------------------------------
// asyncReportComplete
//------------------------------------------------------------------------------
void cta::ArchiveJob::asyncReportComplete() {
m_reporter.reset(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState));
void cta::ArchiveJob::asyncReportComplete(eos::DiskReporterFactory & reporterFactory) {
m_reporter.reset(reporterFactory.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState));
m_reporter->asyncReportArchiveFullyComplete();
m_reporterTimer.reset();
}
......@@ -80,7 +80,7 @@ cta::catalogue::TapeItemWrittenPointer cta::ArchiveJob::validateAndGetTapeFileWr
fileReport.fSeq = tapeFile.fSeq;
fileReport.size = archiveFile.fileSize;
fileReport.storageClassName = archiveFile.storageClass;
fileReport.tapeDrive = m_mount.getDrive();
fileReport.tapeDrive = getMount().getDrive();
fileReport.vid = tapeFile.vid;
return cta::catalogue::TapeItemWrittenPointer(fileReportUP.release());
}
......@@ -129,8 +129,8 @@ void cta::ArchiveJob::failed(const std::string &failureReason, log::LogContext
// That's all job's already done.
std::promise<void> reporterState;
utils::Timer t;
std::unique_ptr<cta::eos::DiskReporter> reporter(m_mount.createDiskReporter(fullReportURL, reporterState));
reporter->asyncReportArchiveFullyComplete();
// TODOTODO: put in the queue std::unique_ptr<cta::DiskReporter> reporter(getMount().createDiskReporter(fullReportURL, reporterState));
// TODOTODO: reporter->asyncReportArchiveFullyComplete();
try {
reporterState.get_future().get();
log::ScopedParamContainer params(lc);
......@@ -161,3 +161,11 @@ void cta::ArchiveJob::failed(const std::string &failureReason, log::LogContext
void cta::ArchiveJob::waitForReporting() {
m_reporterState.get_future().get();
}
//------------------------------------------------------------------------------
// cta::ArchiveJob::getMount()
//------------------------------------------------------------------------------
cta::ArchiveMount& cta::ArchiveJob::getMount() {
if (m_mount) return *m_mount;
throw exception::Exception("In ArchiveJob::getMount(): no mount set.");
}
......@@ -33,6 +33,9 @@ namespace cta {
// Forward declaration
class ArchiveMount;
namespace eos {
class DiskReporterFactory;
}
/**
* Class representing the transfer of a single copy of a remote file to tape.
......@@ -44,6 +47,7 @@ class ArchiveJob {
* constructor of ArchiveJob.
*/
friend class ArchiveMount;
friend class Scheduler;
protected:
/**
......@@ -55,7 +59,7 @@ protected:
* @param tapeFileLocation the location within the tape
*/
ArchiveJob(
ArchiveMount &mount,
ArchiveMount *mount,
catalogue::Catalogue & catalogue,
const common::dataStructures::ArchiveFile &archiveFile,
const std::string &srcURL,
......@@ -80,7 +84,7 @@ public:
/**
* Launch a report to the user.
*/
virtual void asyncReportComplete();
virtual void asyncReportComplete(eos::DiskReporterFactory & reporterFactory);
/**
* Get the report time (in seconds).
......@@ -129,7 +133,12 @@ private:
/**
* The mount that generated this job
*/
ArchiveMount &m_mount;
ArchiveMount *m_mount = nullptr;
/**
* Get access to the mount or throw exception if we do not have one
*/
ArchiveMount &getMount();
/**
* Reference to the name server
......
......@@ -114,7 +114,7 @@ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(
std::list<std::unique_ptr<ArchiveJob>> ret;
// We prepare the response
for (auto & sdaj: dbJobBatch) {
ret.emplace_back(new ArchiveJob(*this, m_catalogue,
ret.emplace_back(new ArchiveJob(this, m_catalogue,
sdaj->archiveFile, sdaj->srcURL, sdaj->tapeFile));
ret.back()->m_dbJob.reset(sdaj.release());
}
......@@ -189,7 +189,8 @@ void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::A
if (jobsToReport.count(job->m_dbJob.get())) {
logContext.log(cta::log::DEBUG,
"In ArchiveMount::reportJobsBatchWritten(): archive request complete. Will launch async report to user.");
job->asyncReportComplete();
// TODOTODO: requeue instead!
// TODOTODO: job->asyncReportComplete();
} else {
logContext.log(cta::log::DEBUG,
"In ArchiveMount::reportJobsBatchWritten(): Recorded the partial migration of a file.");
......
......@@ -20,7 +20,8 @@ set (CTA_SCHEDULER_SRC_FILES
OStoreDB/MemQueues.cpp
OStoreDB/OStoreDB.cpp
OStoreDB/OStoreDBWithAgent.cpp
LabelMount.cpp)
LabelMount.cpp
DiskReportRunner.cpp)
find_package(Protobuf3 REQUIRED)
include_directories (${PROTOBUF3_INCLUDE_DIRS})
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "DiskReportRunner.hpp"
#include "Scheduler.hpp"
#include "ArchiveJob.hpp"
namespace cta {
void DiskReportRunner::runOnePass(log::LogContext& lc) {
// We currently got for a hardcoded 500 jobs batch to report.
// We report with a budget of 30 seconds, as long as we find something to report.
utils::Timer t;
size_t roundCount = 0;
while (t.secs() < 30) {
utils::Timer t2;
auto archiveJobsToReport = m_scheduler.getNextArchiveJobsToReportBatch(500, lc);
if (archiveJobsToReport.empty()) break;
double getJobsToReportTime = t2.secs(utils::Timer::resetCounter);
for (auto &job: archiveJobsToReport) {
job->asyncReportComplete(m_reporterFactory);
}
double asyncStartReports = t2.secs(utils::Timer::resetCounter);
size_t successfulReports = 0, failedReports = 0;
// Now gather the result of the reporting to client.
for (auto &job: archiveJobsToReport) {
try {
job->waitForReporting();
log::ScopedParamContainer params(lc);
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportURL", job->reportURL())
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportTime", job->reportTime());
lc.log(cta::log::INFO,"Reported to the client a full file archival");
} catch(cta::exception::Exception &ex) {
cta::log::ScopedParamContainer params(lc);
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path).add("reportURL", job->reportURL())
.add("errorMessage", ex.getMessage().str());
lc.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:");
}
}
double asyncCheckJobCompletionTime = t2.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params (lc);
params.add("successfulReports", successfulReports)
.add("failedReports", failedReports)
.add("totalReports", successfulReports + failedReports)
.add("getJobsToReportTime", getJobsToReportTime)
.add("asyncStartReports", asyncStartReports)
.add("asyncCheckJobCompletionTime", asyncCheckJobCompletionTime);
}
double clientReportingTime=t.secs();
cta::log::ScopedParamContainer params(lc);
params.add("roundCount", roundCount)
.add("clientReportingTime", clientReportingTime);
lc.log(log::INFO, "In ReporterProcess::runOnePass(): finished one pass.");
}
} // namespace cta
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "common/log/LogContext.hpp"
#include "eos/DiskReporterFactory.hpp"
namespace cta {
class Scheduler;
class DiskReportRunner {
public:
DiskReportRunner(Scheduler & scheduler);
void runOnePass(log::LogContext & lc);
private:
Scheduler & m_scheduler;
eos::DiskReporterFactory m_reporterFactory;
};
} // namespace cta
\ No newline at end of file
......@@ -681,6 +681,22 @@ OStoreDB::ArchiveQueueItor_t OStoreDB::getArchiveJobItor(const std::string &tape
return ArchiveQueueItor_t(m_objectStore, tapePoolName);
}
//------------------------------------------------------------------------------
// OStoreDB::getNextArchiveJobsToReportBatch()
//------------------------------------------------------------------------------
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::getNextArchiveJobsToReportBatch(
uint64_t filesRequested, log::LogContext& logContext) {
throw cta::exception::Exception("TODOTODO: implement.");
}
//------------------------------------------------------------------------------
// OStoreDB::setJobBatchReported()
//------------------------------------------------------------------------------
void OStoreDB::setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch,
log::LogContext& lc) {
throw cta::exception::Exception("TODOTODO: implement.");
}
//------------------------------------------------------------------------------
// OStoreDB::getRetrieveQueueStatistics()
//------------------------------------------------------------------------------
......
......@@ -260,6 +260,10 @@ public:
typedef QueueItor<objectstore::RootEntry::ArchiveQueueDump, objectstore::ArchiveQueue> ArchiveQueueItor_t;
ArchiveQueueItor_t getArchiveJobItor(const std::string &tapePoolName) const;
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > getNextArchiveJobsToReportBatch(uint64_t filesRequested, log::LogContext & logContext) override;
void setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::LogContext& lc) override;
/* === Retrieve requests handling ======================================== */
std::list<RetrieveQueueStatistics> getRetrieveQueueStatistics(const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set<std::string>& vidsToConsider) override;
......
......@@ -111,6 +111,14 @@ public:
return m_OStoreDB.getRetrieveRequests();
}
std::list<std::unique_ptr<ArchiveJob> > getNextArchiveJobsToReportBatch(uint64_t filesRequested, log::LogContext & lc) override {
return m_OStoreDB.getNextArchiveJobsToReportBatch(filesRequested, lc);
}
void setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob*>& jobsBatch, log::LogContext& lc) override {
m_OStoreDB.setJobBatchReported(jobsBatch, lc);
}
std::list<RetrieveRequestDump> getRetrieveRequestsByVid(const std::string& vid) const override {
return m_OStoreDB.getRetrieveRequestsByVid(vid);
}
......
......@@ -1061,5 +1061,27 @@ std::list<common::dataStructures::QueueAndMountSummary> Scheduler::getQueuesAndM
return ret;
}
//------------------------------------------------------------------------------
// createDiskReporter
//------------------------------------------------------------------------------
eos::DiskReporter* Scheduler::createDiskReporter(std::string& URL, std::promise<void>& reporterState) {
return m_reporterFactory.createDiskReporter(URL, reporterState);
}
//------------------------------------------------------------------------------
// getNextArchiveJobsToReportBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<ArchiveJob> > Scheduler::getNextArchiveJobsToReportBatch(
uint64_t filesRequested, log::LogContext& logContext) {
// We need to go through the queues of archive jobs to report
std::list<std::unique_ptr<ArchiveJob> > ret;
// Get the list of jobs to report from the scheduler db
auto dbRet = m_db.getNextArchiveJobsToReportBatch(filesRequested, logContext);
for (auto & j: dbRet) {
ret.emplace_back(new ArchiveJob(nullptr, m_catalogue, j->archiveFile,
j->srcURL, j->tapeFile));
ret.back()->m_dbJob.reset(j.release());
}
return ret;
}
} // namespace cta
......@@ -48,6 +48,9 @@
#include "scheduler/TapeMount.hpp"
#include "scheduler/SchedulerDatabase.hpp"
#include "eos/DiskReporter.hpp"
#include "eos/DiskReporterFactory.hpp"
#include <list>
#include <map>
#include <memory>
......@@ -56,6 +59,8 @@
namespace cta {
class ArchiveJob;
/**
* Class implementing a tape resource scheduler. This class is the main entry point
* for most of the operations on both the tape file catalogue and the object store for
......@@ -312,6 +317,31 @@ public:
*/
std::list<common::dataStructures::QueueAndMountSummary> getQueuesAndMountSummaries(log::LogContext & lc);
/*============== Archive reporting support =================================*/
/**
* Batch job factory
*
* @param filesRequested the number of files requested
* @param logContext
* @return a list of unique_ptr to the next successful archive jobs to report.
* The list is empty when no more jobs can be found. Will return jobs (if
* available) up to specified number.
*/
std::list<std::unique_ptr<ArchiveJob>> getNextArchiveJobsToReportBatch(uint64_t filesRequested,
log::LogContext &logContext);
/**
* Creates a disk reporter for the ArchiveJob (this is a wrapper).
* @param URL: report address
* @param reporterState void promise to be set when the report is done asynchronously.
* @return pointer to the reporter created.
*/
eos::DiskReporter * createDiskReporter(std::string & URL, std::promise<void> &reporterState);
private:
/** An initialized-once factory for archive reports (indirectly used by ArchiveJobs) */
eos::DiskReporterFactory m_reporterFactory;
public:
/*============== Administrator management ==================================*/
void authorizeAdmin(const cta::common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc);
......
......@@ -176,6 +176,25 @@ public:
virtual ~ArchiveJob() {}
};
/**
* Get a a set of jobs to report to the clients. This function is like
* ArchiveMount::getNextJobBatch. It it not in the context of a mount as any
* process can grab a batch of jobs to report and proceed with the reporting.
* After reporting, setJobReported will be the last step of the job's lifecycle.
* @return A list of process-owned jobs to report.
*/
virtual std::list<std::unique_ptr<ArchiveJob>> getNextArchiveJobsToReportBatch(uint64_t filesRequested,
log::LogContext & logContext) = 0;
/**
* Set a batch of jobs as reported (modeled on ArchiveMount::setJobBatchSuccessful().
* @param jobsBatch
* @param lc
*/
virtual void setJobBatchReported(std::list<cta::SchedulerDatabase::ArchiveJob *> & jobsBatch,
log::LogContext & lc) = 0;
/*============ Retrieve management: user side ============================*/
/**
......
......@@ -28,7 +28,7 @@ namespace cta {
public:
int completes;
int failures;
MockArchiveJob(cta::ArchiveMount & am, cta::catalogue::Catalogue &catalogue): cta::ArchiveJob(am,
MockArchiveJob(cta::ArchiveMount * am, cta::catalogue::Catalogue &catalogue): cta::ArchiveJob(am,
catalogue, cta::common::dataStructures::ArchiveFile(),
"", cta::common::dataStructures::TapeFile()),
completes(0), failures(0) {}
......
......@@ -98,7 +98,7 @@ namespace cta {
void createArchiveJobs(const unsigned int nbJobs) {
for(unsigned int i = 0; i < nbJobs; i++) {
m_jobs.push_back(std::unique_ptr<cta::ArchiveJob>(
new cta::MockArchiveJob(*this, ::cta::ArchiveMount::m_catalogue)));
new cta::MockArchiveJob(this, ::cta::ArchiveMount::m_catalogue)));
}
}
}; // class MockArchiveMount
......
......@@ -419,7 +419,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionGooddayRecall) {
// Write the file to tape
cta::MockArchiveMount mam(catalogue);
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(mam, catalogue));
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(&mam, catalogue));
aj->tapeFile.fSeq = fseq;
aj->archiveFile.archiveFileID = fseq;
castor::tape::tapeFile::WriteFile wf(&ws, *aj, archiveFileSize);
......@@ -592,7 +592,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) {
const uint64_t archiveFileSize = 1000;
cta::MockArchiveMount mam(catalogue);
cta::MockRetrieveMount mrm;
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(mam, catalogue));
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(&mam, catalogue));
aj->tapeFile.fSeq = fseq;
aj->archiveFile.archiveFileID = 1000 + fseq;
castor::tape::tapeFile::WriteFile wf(&ws, *aj, archiveFileSize);
......@@ -799,7 +799,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
// Write the file to tape
cta::MockArchiveMount mam(catalogue);
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(mam, catalogue));
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(&mam, catalogue));
aj->tapeFile.fSeq = fseq;
aj->archiveFile.archiveFileID = fseq;
castor::tape::tapeFile::WriteFile wf(&ws, *aj, archiveFileSize);
......@@ -1010,7 +1010,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionNoSuchDrive) {
// Write the file to tape
cta::MockArchiveMount mam(catalogue);
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(mam, catalogue));
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(&mam, catalogue));
aj->tapeFile.fSeq = fseq;
aj->archiveFile.archiveFileID = fseq;
castor::tape::tapeFile::WriteFile wf(&ws, *aj, archiveFileSize);
......@@ -1158,7 +1158,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionFailtoMount) {
// Write the file to tape
cta::MockArchiveMount mam(catalogue);
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(mam, catalogue));
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(&mam, catalogue));
aj->tapeFile.fSeq = fseq;
aj->archiveFile.archiveFileID = fseq;
castor::tape::tapeFile::WriteFile wf(&ws, *aj, archiveFileSize);
......
......@@ -41,7 +41,7 @@ namespace unitTests{
class TestingArchiveJob: public cta::ArchiveJob {
public:
TestingArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL),
TestingArchiveJob(): cta::ArchiveJob(nullptr,
*((cta::catalogue::Catalogue *)NULL), cta::common::dataStructures::ArchiveFile(),
"", cta::common::dataStructures::TapeFile()) {
}
......
......@@ -67,7 +67,7 @@ namespace unitTests {
public:
MockArchiveJobExternalStats(cta::ArchiveMount & am, cta::catalogue::Catalogue & catalogue,
int & completes, int &failures):
MockArchiveJob(am, catalogue), completesRef(completes), failuresRef(failures) {}
MockArchiveJob(&am, catalogue), completesRef(completes), failuresRef(failures) {}
virtual void validate() override {}
virtual cta::catalogue::TapeItemWrittenPointer validateAndGetTapeFileWritten() override {
......@@ -213,12 +213,12 @@ namespace unitTests {
::testing::InSequence dummy;
std::unique_ptr<cta::ArchiveJob> job1;
{
std::unique_ptr<cta::MockArchiveJob> mockJob(new cta::MockArchiveJob(tam, *m_catalogue));
std::unique_ptr<cta::MockArchiveJob> mockJob(new cta::MockArchiveJob(&tam, *m_catalogue));
job1.reset(mockJob.release());
}
std::unique_ptr<cta::ArchiveJob> job2;
{
std::unique_ptr<cta::MockArchiveJob> mockJob(new cta::MockArchiveJob(tam, *m_catalogue));
std::unique_ptr<cta::MockArchiveJob> mockJob(new cta::MockArchiveJob(&tam, *m_catalogue));
job2.reset(mockJob.release());
}
std::unique_ptr<cta::ArchiveJob> job3;
......
......@@ -64,7 +64,7 @@ class BasicRetrieveJob: public cta::RetrieveJob {
class BasicArchiveJob: public cta::ArchiveJob {
public:
BasicArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL),
BasicArchiveJob(): cta::ArchiveJob(nullptr,
*((cta::catalogue::Catalogue *)NULL), cta::common::dataStructures::ArchiveFile(),
"", cta::common::dataStructures::TapeFile()) {
}
......
......@@ -48,7 +48,7 @@ namespace unitTests {
class TestingArchiveJob: public cta::ArchiveJob {
public:
TestingArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL),
TestingArchiveJob(): cta::ArchiveJob(nullptr,
*((cta::catalogue::Catalogue *)NULL), cta::common::dataStructures::ArchiveFile(),
"", cta::common::dataStructures::TapeFile()) {
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment