Commit 72933cef authored by Eric Cano's avatar Eric Cano
Browse files

Implemented bulk retrieve request reporting

Successful retrieve requests are asynchronously deleted as soon as they
are complete. Then from time to time (every 500), the requests get removed from
process ownership in bulk.
parent 120b7f34
......@@ -110,7 +110,7 @@ void cta::ArchiveJob::validate(){
std::string cta::ArchiveJob::reportURL() {
return m_dbJob->archiveReportURL;
}
//------------------------------------------------------------------------------
// failed
//------------------------------------------------------------------------------
......
......@@ -118,7 +118,7 @@ public:
* @return The URL used to report to the disk system.
*/
virtual std::string reportURL();
private:
std::unique_ptr<cta::SchedulerDatabase::ArchiveJob> m_dbJob;
......
......@@ -143,7 +143,7 @@ void cta::ArchiveMount::reportJobsBatchWritten(std::queue<std::unique_ptr<cta::A
files++;
bytes+=job->archiveFile.fileSize;
validatedSuccessfulArchiveJobs.emplace_back(std::move(job));
job.reset(nullptr);
job.reset();
}
utils::Timer t;
// Note: former content of ReportFlush::updateCatalogueWithTapeFilesWritten
......
......@@ -2298,6 +2298,38 @@ void OStoreDB::RetrieveMount::setTapeSessionStats(const castor::tape::tapeserver
m_oStoreDB.updateDriveStatistics(driveInfo, inputs, lc);
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::castFromSchedDBJob()
//------------------------------------------------------------------------------
OStoreDB::RetrieveJob * OStoreDB::RetrieveMount::castFromSchedDBJob(SchedulerDatabase::RetrieveJob * job) {
OStoreDB::RetrieveJob * ret = dynamic_cast<OStoreDB::RetrieveJob *> (job);
if (!ret) {
std::string unexpectedType = typeid(*job).name();
throw cta::exception::Exception(std::string("In OStoreDB::RetrieveMount::castFromSchedDBJob(): unexpected archive job type while casting: ")+
unexpectedType);
}
return ret;
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::waitAndFinishSettingJobsBatchSuccessful()
//------------------------------------------------------------------------------
std::set<cta::SchedulerDatabase::RetrieveJob*> OStoreDB::RetrieveMount::finishSettingJobsBatchSuccessful(
std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, log::LogContext& lc) {
std::set<cta::SchedulerDatabase::RetrieveJob*> ret;
std::list<std::string> rjToUnown;
// We will wait on the asynchronously started reports of jobs and remove them from
// ownership.
for (auto & sDBJob: jobsBatch) {
auto osdbJob = castFromSchedDBJob(sDBJob);
rjToUnown.push_back(osdbJob->m_retrieveRequest.getAddressIfSet());
ret.insert(sDBJob);
}
m_oStoreDB.m_agentReference->removeBatchFromOwnership(rjToUnown, m_oStoreDB.m_objectStore);
return ret;
}
//------------------------------------------------------------------------------
// OStoreDB::ArchiveMount::setDriveStatus()
//------------------------------------------------------------------------------
......@@ -2613,12 +2645,8 @@ void OStoreDB::RetrieveJob::checkSucceed() {
m_retrieveRequest.resetValues();
// We no more own the job (which could be gone)
m_jobOwned = false;
// Remove ownership form the agent
const std::string rtfrAddress = m_retrieveRequest.getAddressIfSet();
m_oStoreDB.m_agentReference->removeFromOwnership(rtfrAddress, m_oStoreDB.m_objectStore);
// Ownership will ber removed from agent by caller through retrieve mount object.
}
} // namespace cta
......@@ -163,6 +163,7 @@ public:
friend class ArchiveJob;
/* === Retrieve Mount handling ============================================ */
class RetrieveJob;
class RetrieveMount: public SchedulerDatabase::RetrieveMount {
friend class TapeMountDecisionInfo;
private:
......@@ -170,10 +171,14 @@ public:
OStoreDB & m_oStoreDB;
public:
const MountInfo & getMountInfo() override;
std::list<std::unique_ptr<RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) override;
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) override;
void complete(time_t completionTime) override;
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override;
private:
OStoreDB::RetrieveJob * castFromSchedDBJob(SchedulerDatabase::RetrieveJob * job);
public:
std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, log::LogContext& lc) override;
};
friend class RetrieveMount;
......
/*
/*
* The CERN Tape Retrieve (CTA) project
* Copyright (C) 2015 CERN
*
......
......@@ -17,6 +17,7 @@
*/
#include "scheduler/RetrieveMount.hpp"
#include "common/Timer.hpp"
//------------------------------------------------------------------------------
// constructor
......@@ -86,6 +87,76 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc
return ret;
}
//------------------------------------------------------------------------------
// waitAndFinishSettingJobsBatchRetrieved()
//------------------------------------------------------------------------------
void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::unique_ptr<cta::RetrieveJob> >& successfulRetrieveJobs, cta::log::LogContext& logContext) {
std::list<std::unique_ptr<cta::RetrieveJob> > validatedSuccessfulArchiveJobs;
std::list<cta::SchedulerDatabase::RetrieveJob *> validatedSuccessfulDBArchiveJobs;
std::unique_ptr<cta::RetrieveJob> job;
double waitUpdateCompletionTime=0;
double jobBatchFinishingTime=0;
double schedulerDbTime=0;
uint64_t files=0;
uint64_t bytes=0;
utils::Timer t;
try {
while (!successfulRetrieveJobs.empty()) {
job = std::move(successfulRetrieveJobs.front());
successfulRetrieveJobs.pop();
if (!job.get()) continue;
files++;
bytes+=job->archiveFile.fileSize;
job->checkComplete();
validatedSuccessfulDBArchiveJobs.emplace_back(job->m_dbJob.get());
validatedSuccessfulArchiveJobs.emplace_back(std::move(job));
job.reset();
}
waitUpdateCompletionTime=t.secs(utils::Timer::resetCounter);
// Complete the cleaning up of the jobs in the mount
m_dbMount->finishSettingJobsBatchSuccessful(validatedSuccessfulDBArchiveJobs, logContext);
jobBatchFinishingTime=t.secs();
schedulerDbTime=jobBatchFinishingTime + waitUpdateCompletionTime;
{
cta::log::ScopedParamContainer params(logContext);
params.add("successfulRetrieveJobs", successfulRetrieveJobs.size())
.add("files", files)
.add("bytes", bytes)
.add("waitUpdateCompletionTime", waitUpdateCompletionTime)
.add("jobBatchFinishingTime", jobBatchFinishingTime)
.add("schedulerDbTime", schedulerDbTime);
logContext.log(cta::log::DEBUG,"In RetrieveMout::waitAndFinishSettingJobsBatchRetrieved(): deleted complete retrieve jobs.");
}
} catch(const cta::exception::Exception& e){
cta::log::ScopedParamContainer params(logContext);
params.add("exceptionMessageValue", e.getMessageValue());
if (job.get()) {
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
}
const std::string msg_error="In RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(): got an exception";
logContext.log(cta::log::ERR, msg_error);
// Failing here does not really affect the session so we can carry on. Reported jobs are reported, non-reported ones
// will be retried.
} catch(const std::exception& e){
cta::log::ScopedParamContainer params(logContext);
params.add("exceptionWhat", e.what());
if (job.get()) {
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
}
const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an standard exception";
logContext.log(cta::log::ERR, msg_error);
// Failing here does not really affect the session so we can carry on. Reported jobs are reported, non-reported ones
// will be retried.
}
}
//------------------------------------------------------------------------------
// tapeComplete()
//------------------------------------------------------------------------------
......
......@@ -25,6 +25,7 @@
#include "scheduler/TapeMount.hpp"
#include <memory>
#include <queue>
namespace cta {
......@@ -131,6 +132,17 @@ namespace cta {
virtual std::list<std::unique_ptr<RetrieveJob>> getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, log::LogContext &logContext);
/**
* Wait and complete reporting of a batch of jobs successes. The per jobs handling has
* already been launched in the background using cta::RetrieveJob::asyncComplete().
* This function will check completion of those async completes and then proceed
* with any necessary common handling.
*
* @param successfulRetrieveJobs the jobs to report
* @param logContext
*/
virtual void waitAndFinishSettingJobsBatchRetrieved(std::queue<std::unique_ptr<cta::RetrieveJob> > & successfulRetrieveJobs, cta::log::LogContext &logContext);
/**
* Destructor.
*/
......
......@@ -315,6 +315,8 @@ public:
virtual void complete(time_t completionTime) = 0;
virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0;
virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0;
virtual std::set<cta::SchedulerDatabase::RetrieveJob *> finishSettingJobsBatchSuccessful(
std::list<cta::SchedulerDatabase::RetrieveJob *> & jobsBatch, log::LogContext & lc) = 0;
virtual ~RetrieveMount() {}
uint32_t nbFilesCurrentlyOnTape;
};
......
......@@ -40,7 +40,7 @@ namespace cta {
virtual void reportJobSucceeded() {
completes++;
}
}
virtual void validate() override {}
virtual catalogue::TapeFileWritten validateAndGetTapeFileWritten() override {
......
......@@ -67,6 +67,8 @@ namespace cta {
void setDriveStatus(cta::common::dataStructures::DriveStatus status) override {};
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override {};
void waitAndFinishSettingJobsBatchRetrieved(std::queue<std::unique_ptr<cta::RetrieveJob> >& successfulRetrieveJobs, cta::log::LogContext& logContext) override {};
private:
......
......@@ -33,6 +33,7 @@
#include "scheduler/TapeMountDummy.hpp"
#include "scheduler/Scheduler.hpp"
#include "scheduler/SchedulerDatabase.hpp"
#include "scheduler/testingMocks/MockRetrieveMount.hpp"
#include <memory>
......@@ -45,6 +46,8 @@ namespace unitTests{
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); }
};
class TestingRetrieveMount: public cta::RetrieveMount {
......
......@@ -40,6 +40,7 @@ namespace unitTests{
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); }
};
class TestingRetrieveMount: public cta::RetrieveMount {
......
......@@ -210,7 +210,7 @@ private:
*/
cta::threading::BlockingQueue<Report*> m_fifo;
cta::threading::Mutex m_producterProtection;
/**
......
......@@ -113,13 +113,7 @@ void RecallReportPacker::reportTestGoingToEnd(){
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
m_successfulRetrieveJob->asyncComplete();
}
//------------------------------------------------------------------------------
//ReportSuccessful::waitForAsyncExecuteFinished
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::waitForAsyncExecuteFinished(){
m_successfulRetrieveJob->checkComplete();
parent.m_successfulRetrieveJobs.push(std::move(m_successfulRetrieveJob));
}
//------------------------------------------------------------------------------
......@@ -251,17 +245,12 @@ void RecallReportPacker::WorkerThread::run(){
// as opposed to migrations where one failure fails the session.
try {
rep->execute(m_parent);
if (typeid(*rep) == typeid(RecallReportPacker::ReportSuccessful)) {
reportedSuccessfully.emplace_back(std::move(rep));
cta::utils::Timer timing;
const unsigned int checkedReports = m_parent.flushCheckAndFinishAsyncExecute(reportedSuccessfully);
if(checkedReports) {
cta::log::ScopedParamContainer params(m_parent.m_lc);
params.add("checkedReports", checkedReports)
.add("reportingTime", timing.secs());
m_parent.m_lc.log(cta::log::DEBUG, "After flushCheckAndFinishAsyncExecute()");
}
}
// This slightly hackish bit prevents too many calls to sub function and gettime()
// m_parent.fullCheckAndFinishAsyncExecute will execute the shared half of the
// request updates (individual, asynchronous is done in rep->execute(m_parent);
if (typeid(*rep) == typeid(RecallReportPacker::ReportSuccessful)
&& m_parent.m_successfulRetrieveJobs.size() >= m_parent.RECALL_REPORT_PACKER_FLUSH_SIZE)
m_parent.fullCheckAndFinishAsyncExecute();
} catch(const cta::exception::Exception& e){
//we get there because to tried to close the connection and it failed
//either from the catch a few lines above or directly from rep->execute
......@@ -296,15 +285,9 @@ void RecallReportPacker::WorkerThread::run(){
if (endFound) break;
}
// Make sure the last batch of reports got cleaned up.
try {
cta::utils::Timer timing;
const unsigned int checkedReports = m_parent.fullCheckAndFinishAsyncExecute(reportedSuccessfully);
if (checkedReports) {
cta::log::ScopedParamContainer params(m_parent.m_lc);
params.add("checkedReports", checkedReports)
.add("reportingTime", timing.secs());
m_parent.m_lc.log(cta::log::DEBUG, "After fullCheckAndFinishAsyncExecute()");
}
m_parent.fullCheckAndFinishAsyncExecute();
} catch(const cta::exception::Exception& e){
cta::log::ScopedParamContainer params(m_parent.m_lc);
params.add("exceptionWhat", e.getMessageValue())
......@@ -361,36 +344,11 @@ bool RecallReportPacker::errorHappened() {
return m_errorHappened || (m_watchdog && m_watchdog->errorHappened());
}
//------------------------------------------------------------------------------
//flushCheckAndFinishAsyncExecute()
//------------------------------------------------------------------------------
unsigned int RecallReportPacker::flushCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully) {
unsigned int checkedReports = 0;
if (reportedSuccessfully.size() >= RECALL_REPORT_PACKER_FLUSH_SIZE) {
while (!reportedSuccessfully.empty()) {
std::unique_ptr<Report> report=std::move(reportedSuccessfully.back());
reportedSuccessfully.pop_back();
if (!report.get()) continue;
report->waitForAsyncExecuteFinished();
checkedReports ++;
}
}
return checkedReports;
}
//------------------------------------------------------------------------------
//fullCheckAndFinishAsyncExecute()
//------------------------------------------------------------------------------
unsigned int RecallReportPacker::fullCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully) {
unsigned int checkedReports = 0;
while (!reportedSuccessfully.empty()) {
std::unique_ptr<Report> report=std::move(reportedSuccessfully.back());
reportedSuccessfully.pop_back();
if (!report.get()) continue;
report->waitForAsyncExecuteFinished();
checkedReports++;
}
return checkedReports;
void RecallReportPacker::fullCheckAndFinishAsyncExecute() {
m_retrieveMount->waitAndFinishSettingJobsBatchRetrieved(m_successfulRetrieveJobs, m_lc);
}
//------------------------------------------------------------------------------
......
......@@ -125,7 +125,6 @@ private:
public:
virtual ~Report(){}
virtual void execute(RecallReportPacker& packer)=0;
virtual void waitForAsyncExecuteFinished() {};
virtual bool goingToEnd() {return false;}
};
class ReportTestGoingToEnd : public Report {
......@@ -145,7 +144,6 @@ private:
ReportSuccessful(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob):
m_successfulRetrieveJob(std::move(successfulRetrieveJob)){}
void execute(RecallReportPacker& reportPacker) override;
void waitForAsyncExecuteFinished() override;
};
class ReportError : public Report {
/**
......@@ -213,6 +211,12 @@ private:
*/
cta::RetrieveMount * m_retrieveMount;
/**
* The successful reports that were pre-reported asynchronously.
* They are collected and completed regularly.
*/
std::queue<std::unique_ptr<cta::RetrieveJob> > m_successfulRetrieveJobs;
/**
* Tracking of the tape thread end
*/
......@@ -221,15 +225,7 @@ private:
/**
* Tracking of the disk thread end
*/
bool m_diskThreadComplete;
public:
/*
* Check if flush limit is reached and proceed finish procedure for async execute
*
* @param reportedSuccessfuly The successful reports to check
* @return The number of reports proceeded
*/
unsigned int flushCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully);
bool m_diskThreadComplete;
/*
* Proceed finish procedure for async execute for all reports.
......@@ -237,12 +233,12 @@ public:
* @param reportedSuccessfuly The successful reports to check
* @return The number of reports proceeded
*/
unsigned int fullCheckAndFinishAsyncExecute(std::list <std::unique_ptr<Report>> &reportedSuccessfully);
void fullCheckAndFinishAsyncExecute();
/*
* The limit for successful reports to trigger flush.
*/
const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 32;
const unsigned int RECALL_REPORT_PACKER_FLUSH_SIZE = 500;
};
}}}}
......
......@@ -135,6 +135,9 @@ namespace unitTests
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
std::set<cta::SchedulerDatabase::RetrieveJob*> finishSettingJobsBatchSuccessful(std::list<cta::SchedulerDatabase::RetrieveJob*>& jobsBatch, cta::log::LogContext& lc) override { throw std::runtime_error("Not implemented"); }
};
TEST_F(castor_tape_tapeserver_daemonTest, RecallTaskInjectorNominal) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment