Commit c8827ade authored by Victor Kotlyar's avatar Victor Kotlyar
Browse files

Make archive reporting on the flush for the batch of jobs.

Changed reporting to the Catalog with a batch of written files.
Changed synchronous reporting to the backend job by job to the
asynchronous reporting for batch of jobs.
Changed synchronous reporting to the EOS mgm to the asynchronous
reporting.
parent d733b4c7
......@@ -23,6 +23,7 @@ namespace cta { namespace eos {
class DiskReporter {
public:
virtual void reportArchiveFullyComplete() = 0;
virtual void asyncReportArchiveFullyComplete() = 0;
virtual ~DiskReporter() {};
};
......
......@@ -24,14 +24,15 @@
namespace cta { namespace eos {
DiskReporter* DiskReporterFactory::createDiskReporter(const std::string URL) {
DiskReporter* DiskReporterFactory::createDiskReporter(const std::string URL, std::promise<void> &reporterState) {
threading::MutexLocker ml(m_mutex);
auto regexResult = m_EosUrlRegex.exec(URL);
if (regexResult.size()) {
return new EOSReporter(regexResult[1], regexResult[2]);
return new EOSReporter(regexResult[1], regexResult[2], reporterState);
}
regexResult = m_NullRegex.exec(URL);
if (regexResult.size()) {
reporterState.set_value();
return new NullReporter();
}
throw cta::exception::Exception(
......
......@@ -23,12 +23,13 @@
#include "common/threading/Mutex.hpp"
#include <string>
#include <future>
namespace cta { namespace eos {
class DiskReporterFactory {
public:
DiskReporter * createDiskReporter(const std::string URL);
DiskReporter * createDiskReporter(const std::string URL, std::promise<void> &sreporterState);
private:
// The typical call to give report to EOS will be:
// xrdfs localhost query opaquefile "/eos/wfe/passwd?mgm.pcmd=event&mgm.fid=112&mgm.logid=cta&mgm.event=migrated&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0"
......
......@@ -16,13 +16,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <future>
#include "EOSReporter.hpp"
#include "common/exception/XrootCl.hpp"
namespace cta { namespace eos {
EOSReporter::EOSReporter(const std::string& hostURL, const std::string& queryValue):
m_fs(hostURL), m_query(queryValue) {}
EOSReporter::EOSReporter(const std::string& hostURL, const std::string& queryValue, std::promise<void>& reporterState):
m_fs(hostURL), m_query(queryValue), m_reporterState(reporterState) {}
void EOSReporter::reportArchiveFullyComplete() {
......@@ -30,11 +32,48 @@ void EOSReporter::reportArchiveFullyComplete() {
XrdCl::Buffer arg (m_query.size());
arg.FromString(m_query);
XrdCl::Buffer * resp = nullptr;
const uint16_t queryTimeout = 15; // Timeout in seconds that is rounded up to the nearest 15 seconds
XrdCl::XRootDStatus status=m_fs.Query(qcOpaque, arg, resp, queryTimeout);
XrdCl::XRootDStatus status=m_fs.Query(qcOpaque, arg, resp, CTA_EOS_QUERY_TIMEOUT);
delete (resp);
cta::exception::XrootCl::throwOnError(status,
"In EOSReporter::reportArchiveFullyComplete(): failed to XrdCl::FileSystem::Query()");
}
void EOSReporter::asyncReportArchiveFullyComplete() {
auto qcOpaque = XrdCl::QueryCode::OpaqueFile;
XrdCl::Buffer arg (m_query.size());
arg.FromString(m_query);
AsyncQueryHandler *handler = new AsyncQueryHandler(m_reporterState);
XrdCl::XRootDStatus status=m_fs.Query( qcOpaque, arg, handler, CTA_EOS_QUERY_TIMEOUT);
cta::exception::XrootCl::throwOnError(status,
"In EOSReporter::asyncReportArchiveFullyComplete(): failed to XrdCl::FileSystem::Query()");
}
//------------------------------------------------------------------------------
//EOSReporter::AsyncQueryHandler::AsyncQueryHandler
//------------------------------------------------------------------------------
EOSReporter::AsyncQueryHandler::AsyncQueryHandler(std::promise<void> &handlerPromise):
m_handlerPromise(handlerPromise) {}
//------------------------------------------------------------------------------
//EOSReporter::AsyncQueryHandler::HandleResponse
//------------------------------------------------------------------------------
void EOSReporter::AsyncQueryHandler::HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response) {
try {
cta::exception::XrootCl::throwOnError(*status,
"In EOSReporter::AsyncQueryHandler::HandleResponse(): failed to XrdCl::FileSystem::Query()");
} catch (...) {
try {
// store anything thrown in the promise
m_handlerPromise.set_exception(std::current_exception());
} catch(...) {
// set_exception() may throw too
}
}
m_handlerPromise.set_value();
delete response;
delete status;
delete this;
}
}} // namespace cta::disk
......@@ -21,15 +21,28 @@
#include "DiskReporter.hpp"
#include <XrdCl/XrdClFileSystem.hh>
namespace cta { namespace eos {
#include <future>
namespace cta { namespace eos {
const uint16_t CTA_EOS_QUERY_TIMEOUT = 15; // Timeout in seconds that is rounded up to the nearest 15 seconds
class EOSReporter: public DiskReporter {
public:
EOSReporter(const std::string & hostURL, const std::string & queryValue);
EOSReporter(const std::string & hostURL, const std::string & queryValue, std::promise<void> &reporterState);
void reportArchiveFullyComplete() override;
void asyncReportArchiveFullyComplete() override;
private:
XrdCl::FileSystem m_fs;
std::string m_query;
std::promise<void> &m_reporterState;
class AsyncQueryHandler: public XrdCl::ResponseHandler {
public:
AsyncQueryHandler(std::promise<void> &handlerPromise);
virtual void HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response);
private:
std::promise<void> &m_handlerPromise;
};
};
}} // namespace cta::disk
\ No newline at end of file
}} // namespace cta::disk
......@@ -26,6 +26,7 @@ class NullReporter: public DiskReporter {
public:
NullReporter() {};
void reportArchiveFullyComplete() override {};
void asyncReportArchiveFullyComplete() override {};
};
}} // namespace cta::disk
\ No newline at end of file
......@@ -461,6 +461,51 @@ const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() {
return m_srcURL;
}
ArchiveRequest::AsyncJobSuccessfulUpdater * ArchiveRequest::asyncUpdateJobSuccessful(const uint16_t copyNumber ) {
std::unique_ptr<AsyncJobSuccessfulUpdater> ret(new AsyncJobSuccessfulUpdater);
// Passing a reference to the unique pointer led to strange behaviors.
auto & retRef = *ret;
ret->m_updaterCallback=
[this,copyNumber, &retRef](const std::string &in)->std::string {
// We have a locked and fetched object, so we just need to work on its representation.
serializers::ObjectHeader oh;
oh.ParseFromString(in);
if (oh.type() != serializers::ObjectType::ArchiveRequest_t) {
std::stringstream err;
err << "In ArchiveRequest::asyncUpdateJobSuccessful()::lambda(): wrong object type: " << oh.type();
throw cta::exception::Exception(err.str());
}
serializers::ArchiveRequest payload;
payload.ParseFromString(oh.payload());
auto * jl = payload.mutable_jobs();
for (auto j=jl->begin(); j!=jl->end(); j++) {
if (j->copynb() == copyNumber) {
j->set_status(serializers::ArchiveJobStatus::AJS_Complete);
for (auto j2=jl->begin(); j2!=jl->end(); j2++) {
if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete &&
j2->status()!= serializers::ArchiveJobStatus::AJS_Failed) {
retRef.m_isLastJob = false;
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
}
}
retRef.m_isLastJob = true;
oh.set_payload(payload.SerializePartialAsString());
throw cta::objectstore::Backend::AsyncUpdateWithDelete(oh.SerializeAsString());
}
}
std::stringstream err;
err << "In ArchiveRequest::asyncUpdateJobSuccessful()::lambda(): copyNb not found";
throw cta::exception::Exception(err.str());
};
ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(), ret->m_updaterCallback));
return ret.release();
}
void ArchiveRequest::AsyncJobSuccessfulUpdater::wait() {
m_backendUpdater->wait();
}
std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) {
checkPayloadReadable();
auto jl = m_payload.jobs();
......
......@@ -76,6 +76,19 @@ public:
// An job owner updater factory. The owner MUST be previousOwner for the update to be executed.
CTA_GENERATE_EXCEPTION_CLASS(WrongPreviousOwner);
AsyncJobOwnerUpdater * asyncUpdateJobOwner(uint16_t copyNumber, const std::string & owner, const std::string &previousOwner);
// An asynchronous job updating class for success.
class AsyncJobSuccessfulUpdater {
friend class ArchiveRequest;
public:
void wait();
bool m_isLastJob;
private:
std::function<std::string(const std::string &)> m_updaterCallback;
std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater;
};
AsyncJobSuccessfulUpdater * asyncUpdateJobSuccessful(uint16_t copyNumber);
// Get a job owner
std::string getJobOwner(uint16_t copyNumber);
// Request management ========================================================
......
......@@ -109,6 +109,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(CouldNotUpdateValue);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotCommit);
CTA_GENERATE_EXCEPTION_CLASS(CouldNotUnlock);
CTA_GENERATE_EXCEPTION_CLASS(AsyncUpdateWithDelete);
/**
* A base class handling asynchronous sequence of lock exclusive, fetch, call user
......
......@@ -382,25 +382,48 @@ void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") +
au.m_name + ": "+ ex.what());
}
// Execute the user's callback. Let exceptions fly through. User knows his own exceptions.
value=au.m_update(value);
try {
// Prepare result in buffer list.
au.m_radosBufferList.clear();
au.m_radosBufferList.append(value);
} catch (std::exception & ex) {
throw CouldNotUpdateValue(
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") +
au.m_name + ": " + ex.what());
bool updateWithDelete = false;
try {
// Execute the user's callback.
value=au.m_update(value);
} catch (AsyncUpdateWithDelete & ex) {
updateWithDelete = true;
} catch (...) {
// Let exceptions fly through. User knows his own exceptions.
throw;
}
// Launch the write
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr);
auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc,
std::string("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full(): ") + au.m_name);
throw Backend::CouldNotCommit(errnum.getMessageValue());
if(updateWithDelete) {
try {
au.m_backend.remove(au.m_name);
} catch (cta::exception::Exception &ex) {
throw CouldNotUpdateValue(
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to remove value: ") +
au.m_name + ex.what());
}
// Done!
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
au.m_job.set_value();
} else {
try {
// Prepare result in buffer list.
au.m_radosBufferList.clear();
au.m_radosBufferList.append(value);
} catch (std::exception & ex) {
throw CouldNotUpdateValue(
std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") +
au.m_name + ex.what());
}
// Launch the write
librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr);
auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList);
aioc->release();
if (rc) {
cta::exception::Errnum errnum (-rc,
"In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()" + au.m_name);
throw Backend::CouldNotCommit(errnum.getMessageValue());
}
}
} catch (...) {
ANNOTATE_HAPPENS_BEFORE(&au.m_job);
......
......@@ -329,13 +329,32 @@ BackendVFS::AsyncUpdater::AsyncUpdater(BackendVFS & be, const std::string& name,
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotFetch(ex.getMessageValue());
}
// Let user's exceptions go through.
std::string postUpdateData=m_update(preUpdateData);
try {
m_backend.atomicOverwrite(m_name, postUpdateData);
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotCommit(ex.getMessageValue());
std::string postUpdateData;
bool updateWithDelete = false;
try {
postUpdateData=m_update(preUpdateData);
} catch (AsyncUpdateWithDelete & ex) {
updateWithDelete = true;
} catch (...) {
// Let user's exceptions go through.
throw;
}
if(updateWithDelete) {
try {
m_backend.remove(m_name);
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotCommit(ex.getMessageValue());
}
} else {
try {
m_backend.atomicOverwrite(m_name, postUpdateData);
} catch (cta::exception::Exception & ex) {
ANNOTATE_HAPPENS_BEFORE(&m_job);
throw Backend::CouldNotCommit(ex.getMessageValue());
}
}
try {
sl->release();
......
......@@ -125,6 +125,12 @@ public:
m_payloadInterpreted = false;
}
void resetValues () {
m_existingObject = false;
m_headerInterpreted = false;
m_payloadInterpreted = false;
}
void setOwner(const std::string & owner) {
checkHeaderWritable();
m_header.set_owner(owner);
......
......@@ -41,28 +41,28 @@ cta::ArchiveJob::ArchiveJob(ArchiveMount &mount,
tapeFile(tapeFile) {}
//------------------------------------------------------------------------------
// complete
// asyncSetJobSucceed
//------------------------------------------------------------------------------
bool cta::ArchiveJob::complete() {
// First check that the block Id for the file has been set.
if (tapeFile.blockId ==
std::numeric_limits<decltype(tapeFile.blockId)>::max())
throw BlockIdNotSet("In cta::ArchiveJob::complete(): Block ID not set");
// Also check the checksum has been set
if (archiveFile.checksumType.empty() || archiveFile.checksumValue.empty() ||
tapeFile.checksumType.empty() || tapeFile.checksumValue.empty())
throw ChecksumNotSet("In cta::ArchiveJob::complete(): checksums not set");
// And matches
if (archiveFile.checksumType != tapeFile.checksumType ||
archiveFile.checksumValue != tapeFile.checksumValue)
throw ChecksumMismatch(std::string("In cta::ArchiveJob::complete(): checksum mismatch!")
+" Archive file checksum type: "+archiveFile.checksumType
+" Archive file checksum value: "+archiveFile.checksumValue
+" Tape file checksum type: "+tapeFile.checksumType
+" Tape file checksum value: "+tapeFile.checksumValue);
// We are good to go to record the data in the persistent storage.
// Record the data in the archiveNS. The checksum will be validated if already
// present, of inserted if not.
void cta::ArchiveJob::asyncSetJobSucceed() {
m_dbJob->asyncSucceed();
}
//------------------------------------------------------------------------------
// checkAndReportComplete
//------------------------------------------------------------------------------
bool cta::ArchiveJob::checkAndAsyncReportComplete() {
if (m_dbJob->checkSucceed()) {
std::unique_ptr<eos::DiskReporter> reporter(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState));
reporter->asyncReportArchiveFullyComplete();
return true;
}
return false;
}
//------------------------------------------------------------------------------
// ArchiveJob::writeToCatalogue
//------------------------------------------------------------------------------
void cta::ArchiveJob::writeToCatalogue() {
catalogue::TapeFileWritten fileReport;
fileReport.archiveFileId = archiveFile.archiveFileID;
fileReport.blockId = tapeFile.blockId;
......@@ -82,18 +82,53 @@ bool cta::ArchiveJob::complete() {
fileReport.tapeDrive = m_mount.getDrive();
fileReport.vid = tapeFile.vid;
m_catalogue.filesWrittenToTape (std::set<catalogue::TapeFileWritten>{fileReport});
//m_ns.addTapeFile(SecurityIdentity(UserIdentity(std::numeric_limits<uint32_t>::max(),
// std::numeric_limits<uint32_t>::max()), ""), archiveFile.fileId, nameServerTapeFile);
// We will now report the successful archival to the EOS instance.
// if TODO TODO
// We can now record the success for the job in the database.
// If this is the last job of the request, we also report the success to the client.
if (m_dbJob->succeed()) {
std::unique_ptr<eos::DiskReporter> reporter(m_mount.createDiskReporter(m_dbJob->archiveReportURL));
reporter->reportArchiveFullyComplete();
return true;
}
return false;
}
//------------------------------------------------------------------------------
// ArchiveJob::validateAndGetTapeFileWritten
//------------------------------------------------------------------------------
cta::catalogue::TapeFileWritten cta::ArchiveJob::validateAndGetTapeFileWritten() {
validate();
catalogue::TapeFileWritten fileReport;
fileReport.archiveFileId = archiveFile.archiveFileID;
fileReport.blockId = tapeFile.blockId;
fileReport.checksumType = tapeFile.checksumType;
fileReport.checksumValue = tapeFile.checksumValue;
fileReport.compressedSize = tapeFile.compressedSize;
fileReport.copyNb = tapeFile.copyNb;
fileReport.diskFileId = archiveFile.diskFileId;
fileReport.diskFileUser = archiveFile.diskFileInfo.owner;
fileReport.diskFileGroup = archiveFile.diskFileInfo.group;
fileReport.diskFilePath = archiveFile.diskFileInfo.path;
fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob;
fileReport.diskInstance = archiveFile.diskInstance;
fileReport.fSeq = tapeFile.fSeq;
fileReport.size = archiveFile.fileSize;
fileReport.storageClassName = archiveFile.storageClass;
fileReport.tapeDrive = m_mount.getDrive();
fileReport.vid = tapeFile.vid;
return fileReport;
}
//------------------------------------------------------------------------------
// ArchiveJob::validate
//------------------------------------------------------------------------------
void cta::ArchiveJob::validate(){
// First check that the block Id for the file has been set.
if (tapeFile.blockId ==
std::numeric_limits<decltype(tapeFile.blockId)>::max())
throw BlockIdNotSet("In cta::ArchiveJob::validate(): Block ID not set");
// Also check the checksum has been set
if (archiveFile.checksumType.empty() || archiveFile.checksumValue.empty() ||
tapeFile.checksumType.empty() || tapeFile.checksumValue.empty())
throw ChecksumNotSet("In cta::ArchiveJob::validate(): checksums not set");
// And matches
if (archiveFile.checksumType != tapeFile.checksumType ||
archiveFile.checksumValue != tapeFile.checksumValue)
throw ChecksumMismatch(std::string("In cta::ArchiveJob::validate(): checksum mismatch!")
+" Archive file checksum type: "+archiveFile.checksumType
+" Archive file checksum value: "+archiveFile.checksumValue
+" Tape file checksum type: "+tapeFile.checksumType
+" Tape file checksum value: "+tapeFile.checksumValue);
}
//------------------------------------------------------------------------------
......@@ -109,3 +144,10 @@ std::string cta::ArchiveJob::reportURL() {
void cta::ArchiveJob::failed(const cta::exception::Exception &ex, log::LogContext & lc) {
m_dbJob->fail(lc);
}
//------------------------------------------------------------------------------
// waitForReporting
//------------------------------------------------------------------------------
void cta::ArchiveJob::waitForReporting() {
m_reporterState.get_future().get();
}
......@@ -25,6 +25,7 @@
#include <stdint.h>
#include <string>
#include <future>
namespace cta {
......@@ -68,11 +69,37 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(BlockIdNotSet);
CTA_GENERATE_EXCEPTION_CLASS(ChecksumNotSet);
CTA_GENERATE_EXCEPTION_CLASS(ChecksumMismatch);
/**
* Indicates that the job was successful and updates the backend store
* asynchronously.
*/
virtual void asyncSetJobSucceed();
/**
* Wait if the job was updated in the backend store asynchronously.
* @return true if the archive was also sent to client asynchronously.
*/
virtual bool checkAndAsyncReportComplete();
/**
* Validate that archiveFile and tapeFile fields are set correctly for archive
* request.
* Throw appropriate exception if there is any problem.
*/
virtual void validate();
/**
* Update the catalog with the archive request.
*/
virtual void writeToCatalogue();
/**
* Indicates that the job was successful and updates the backend store
* @return true if the archive was also reported to client.
* Validate that archiveFile and tapeFile fields are set correctly for archive
* request.
* @return The tapeFileWritten event for the catalog update.
*/
virtual bool complete();
virtual catalogue::TapeFileWritten validateAndGetTapeFileWritten();
/**
* Triggers a scheduler update following the failure of the job.
......@@ -99,13 +126,19 @@ private:
* The mount that generated this job
*/
ArchiveMount &m_mount;
/**
* Reference to the name server
*/
catalogue::Catalogue &m_catalogue;
public:
/**
* State for the asynchronous report to the client.
*/
std::promise<void> m_reporterState;
public:
CTA_GENERATE_EXCEPTION_CLASS(NotImplemented);
/**
......@@ -122,6 +155,11 @@ public:
* The file archive result for the NS
*/
common::dataStructures::TapeFile tapeFile;
/**
* Wait for the reporterState is set by the reporting thread.
*/
virtual void waitForReporting();
}; // class ArchiveJob
......
......@@ -77,8 +77,8 @@ uint32_t cta::ArchiveMount::getNbFiles() const {
//------------------------------------------------------------------------------
// createDiskReporter
//------------------------------------------------------------------------------
cta::eos::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL) {
return m_reporterFactory.createDiskReporter(URL);
cta::eos::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL, std::promise<void> &reporterState) {
return m_reporterFactory.createDiskReporter(URL, reporterState);
}
//------------------------------------------------------------------------------
......@@ -93,6 +93,12 @@ std::string cta::ArchiveMount::getMountTransactionId() const {
}
//------------------------------------------------------------------------------
// updateCatalogueWithTapeFilesWritten
//------------------------------------------------------------------------------
void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten) {
m_catalogue.filesWrittenToTape(tapeFilesWritten);
}
// getNextJobBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(uint64_t filesRequested,
......
......@@ -139,9 +139,17 @@ namespace cta {
/**
* Creates a disk reporter for the ArchiveJob (this is a wrapper).