Commit 5b6b9ff5 authored by Eric Cano's avatar Eric Cano
Browse files

Fixed using of wait() instead of get() on promise for reporter.

This prevented the passing of exceptions as output.
Also integrated the promise in the reporter class.
parent 2b9dce90
......@@ -18,12 +18,17 @@
#pragma once
#include <future>
namespace cta { namespace eos {
class DiskReporter {
public:
virtual void asyncReport() = 0;
virtual void waitReport() { m_promise.get_future().get(); }
virtual ~DiskReporter() {};
protected:
std::promise<void> m_promise;
};
}} // name space cta::clientsystem
\ No newline at end of file
......@@ -24,15 +24,14 @@
namespace cta { namespace eos {
DiskReporter* DiskReporterFactory::createDiskReporter(const std::string URL, std::promise<void> &reporterState) {
DiskReporter* DiskReporterFactory::createDiskReporter(const std::string URL) {
threading::MutexLocker ml(m_mutex);
auto regexResult = m_EosUrlRegex.exec(URL);
if (regexResult.size()) {
return new EOSReporter(regexResult[1], regexResult[2], reporterState);
return new EOSReporter(regexResult[1], regexResult[2]);
}
regexResult = m_NullRegex.exec(URL);
if (regexResult.size()) {
reporterState.set_value();
return new NullReporter();
}
throw cta::exception::Exception(
......
......@@ -29,7 +29,7 @@ namespace cta { namespace eos {
class DiskReporterFactory {
public:
DiskReporter * createDiskReporter(const std::string URL, std::promise<void> &sreporterState);
DiskReporter * createDiskReporter(const std::string URL);
private:
// The typical call to give report to EOS will be:
// xrdfs localhost query opaquefile "/eos/wfe/passwd?mgm.pcmd=event&mgm.fid=112&mgm.logid=cta&mgm.event=migrated&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0"
......
......@@ -23,8 +23,8 @@
namespace cta { namespace eos {
EOSReporter::EOSReporter(const std::string& hostURL, const std::string& queryValue, std::promise<void>& reporterState):
m_fs(hostURL), m_query(queryValue), m_reporterState(reporterState) {}
EOSReporter::EOSReporter(const std::string& hostURL, const std::string& queryValue):
m_fs(hostURL), m_query(queryValue) {}
void EOSReporter::asyncReport() {
auto qcOpaque = XrdCl::QueryCode::OpaqueFile;
......@@ -43,11 +43,11 @@ void EOSReporter::HandleResponse(XrdCl::XRootDStatus *status,
try {
cta::exception::XrootCl::throwOnError(*status,
"In EOSReporter::AsyncQueryHandler::HandleResponse(): failed to XrdCl::FileSystem::Query()");
m_reporterState.set_value();
m_promise.set_value();
} catch (...) {
try {
// store anything thrown in the promise
m_reporterState.set_exception(std::current_exception());
m_promise.set_exception(std::current_exception());
} catch(...) {
// set_exception() may throw too
}
......
......@@ -28,12 +28,11 @@ const uint16_t CTA_EOS_QUERY_TIMEOUT = 15; // Timeout in seconds that is rounded
class EOSReporter: public DiskReporter, public XrdCl::ResponseHandler {
public:
EOSReporter(const std::string & hostURL, const std::string & queryValue, std::promise<void> &reporterState);
EOSReporter(const std::string & hostURL, const std::string & queryValue);
void asyncReport() override;
private:
XrdCl::FileSystem m_fs;
std::string m_query;
std::promise<void> &m_reporterState;
void HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response) override;
};
......
......@@ -24,7 +24,7 @@ namespace cta { namespace eos {
class NullReporter: public DiskReporter {
public:
NullReporter() {};
NullReporter() { m_promise.set_value(); };
void asyncReport() override {};
};
......
......@@ -78,8 +78,8 @@ uint32_t cta::ArchiveMount::getNbFiles() const {
//------------------------------------------------------------------------------
// createDiskReporter
//------------------------------------------------------------------------------
cta::eos::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL, std::promise<void> &reporterState) {
return m_reporterFactory.createDiskReporter(URL, reporterState);
cta::eos::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL) {
return m_reporterFactory.createDiskReporter(URL);
}
//------------------------------------------------------------------------------
......
......@@ -154,7 +154,7 @@ namespace cta {
* @param reporterState void promise to be set when the report is done asynchronously.
* @return pointer to the reporter created.
*/
eos::DiskReporter * createDiskReporter(std::string & URL, std::promise<void> &reporterState);
eos::DiskReporter * createDiskReporter(std::string & URL);
/**
* Update the catalog with a set of TapeFileWritten events.
......
......@@ -73,10 +73,10 @@ void cta::RetrieveJob::failed(const std::string & failureReason, log::LogContext
// That's all job's already done.
std::promise<void> reporterState;
utils::Timer t;
std::unique_ptr<cta::eos::DiskReporter> reporter(m_mount.createDiskReporter(fullReportURL, reporterState));
std::unique_ptr<cta::eos::DiskReporter> reporter(m_mount.createDiskReporter(fullReportURL));
reporter->asyncReport();
try {
reporterState.get_future().get();
reporter->waitReport();
log::ScopedParamContainer params(lc);
params.add("fileId", m_dbJob->archiveFile.archiveFileID)
.add("diskInstance", m_dbJob->archiveFile.diskInstance)
......
......@@ -159,9 +159,8 @@ void cta::RetrieveMount::waitAndFinishSettingJobsBatchRetrieved(std::queue<std::
//------------------------------------------------------------------------------
// createDiskReporter()
//------------------------------------------------------------------------------
cta::eos::DiskReporter* cta::RetrieveMount::createDiskReporter(std::string& URL,
std::promise<void>& reporterState) {
return m_reporterFactory.createDiskReporter(URL, reporterState);
cta::eos::DiskReporter* cta::RetrieveMount::createDiskReporter(std::string& URL) {
return m_reporterFactory.createDiskReporter(URL);
}
//------------------------------------------------------------------------------
......
......@@ -148,10 +148,9 @@ namespace cta {
/**
* Creates a disk reporter for the retrieve job (this is a wrapper).
* @param URL: report address
* @param reporterState void promise to be set when the report is done asynchronously.
* @return pointer to the reporter created.
*/
eos::DiskReporter * createDiskReporter(std::string & URL, std::promise<void> &reporterState);
eos::DiskReporter * createDiskReporter(std::string & URL);
/**
* Destructor.
......
......@@ -1086,19 +1086,18 @@ void Scheduler::reportArchiveJobsBatch(std::list<std::unique_ptr<ArchiveJob> >&
eos::DiskReporterFactory & reporterFactory, log::TimingList& timingList, utils::Timer& t,
log::LogContext& lc){
// Create the reporters
struct ReporterAndPromise {
struct JobAndReporter {
std::unique_ptr<eos::DiskReporter> reporter;
std::promise<void> promise;
ArchiveJob * archiveJob;
};
std::list<ReporterAndPromise> pendingReports;
std::list<JobAndReporter> pendingReports;
std::list<ArchiveJob *> reportedJobs;
for (auto &j: archiveJobsBatch) {
pendingReports.push_back(ReporterAndPromise());
pendingReports.push_back(JobAndReporter());
auto & current = pendingReports.back();
// We could fail to create the disk reporter or to get the report URL. This should not impact the other jobs.
try {
current.reporter.reset(reporterFactory.createDiskReporter(j->reportURL(), current.promise));
current.reporter.reset(reporterFactory.createDiskReporter(j->reportURL()));
current.reporter->asyncReport();
current.archiveJob = j.get();
} catch (cta::exception::Exception & ex) {
......@@ -1118,7 +1117,7 @@ void Scheduler::reportArchiveJobsBatch(std::list<std::unique_ptr<ArchiveJob> >&
timingList.insertAndReset("asyncReportLaunchTime", t);
for (auto &current: pendingReports) {
try {
current.promise.get_future().wait();
current.reporter->waitReport();
reportedJobs.push_back(current.archiveJob);
} catch (cta::exception::Exception & ex) {
// Log the error, update the request.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment