diff --git a/eos/DiskReporter.hpp b/eos/DiskReporter.hpp index affaf001de8d8cf8f6cc57084ea6950e6d2c3378..72441f2232a71e804094895f99a172ce9fe538f3 100644 --- a/eos/DiskReporter.hpp +++ b/eos/DiskReporter.hpp @@ -23,6 +23,7 @@ namespace cta { namespace eos { class DiskReporter { public: virtual void reportArchiveFullyComplete() = 0; + virtual void asyncReportArchiveFullyComplete() = 0; virtual ~DiskReporter() {}; }; diff --git a/eos/DiskReporterFactory.cpp b/eos/DiskReporterFactory.cpp index a49735eb03c1e171225c726c0695e705ab4d9cfc..4be5df6936e94e57a2cfcf6f844a9472741bdb14 100644 --- a/eos/DiskReporterFactory.cpp +++ b/eos/DiskReporterFactory.cpp @@ -24,14 +24,15 @@ namespace cta { namespace eos { -DiskReporter* DiskReporterFactory::createDiskReporter(const std::string URL) { +DiskReporter* DiskReporterFactory::createDiskReporter(const std::string URL, std::promise<void> &reporterState) { threading::MutexLocker ml(m_mutex); auto regexResult = m_EosUrlRegex.exec(URL); if (regexResult.size()) { - return new EOSReporter(regexResult[1], regexResult[2]); + return new EOSReporter(regexResult[1], regexResult[2], reporterState); } regexResult = m_NullRegex.exec(URL); if (regexResult.size()) { + reporterState.set_value(); return new NullReporter(); } throw cta::exception::Exception( diff --git a/eos/DiskReporterFactory.hpp b/eos/DiskReporterFactory.hpp index 3c52eb459675a3c0838bc692c6b3a9dbb51a9d25..be3ffac3c78acfc1279e8f697da71ad95861250b 100644 --- a/eos/DiskReporterFactory.hpp +++ b/eos/DiskReporterFactory.hpp @@ -23,12 +23,13 @@ #include "common/threading/Mutex.hpp" #include <string> +#include <future> namespace cta { namespace eos { class DiskReporterFactory { public: - DiskReporter * createDiskReporter(const std::string URL); + DiskReporter * createDiskReporter(const std::string URL, std::promise<void> &sreporterState); private: // The typical call to give report to EOS will be: // xrdfs localhost query opaquefile "/eos/wfe/passwd?mgm.pcmd=event&mgm.fid=112&mgm.logid=cta&mgm.event=migrated&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0" diff --git a/eos/EOSReporter.cpp b/eos/EOSReporter.cpp index d48967dbf9b3c651c132ba356dd7f6f901ce3571..4905da0f3933d37c979c580591bad15ed263c80e 100644 --- a/eos/EOSReporter.cpp +++ b/eos/EOSReporter.cpp @@ -16,13 +16,15 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <future> + #include "EOSReporter.hpp" #include "common/exception/XrootCl.hpp" namespace cta { namespace eos { -EOSReporter::EOSReporter(const std::string& hostURL, const std::string& queryValue): - m_fs(hostURL), m_query(queryValue) {} +EOSReporter::EOSReporter(const std::string& hostURL, const std::string& queryValue, std::promise<void>& reporterState): + m_fs(hostURL), m_query(queryValue), m_reporterState(reporterState) {} void EOSReporter::reportArchiveFullyComplete() { @@ -30,11 +32,48 @@ void EOSReporter::reportArchiveFullyComplete() { XrdCl::Buffer arg (m_query.size()); arg.FromString(m_query); XrdCl::Buffer * resp = nullptr; - const uint16_t queryTimeout = 15; // Timeout in seconds that is rounded up to the nearest 15 seconds - XrdCl::XRootDStatus status=m_fs.Query(qcOpaque, arg, resp, queryTimeout); + XrdCl::XRootDStatus status=m_fs.Query(qcOpaque, arg, resp, CTA_EOS_QUERY_TIMEOUT); delete (resp); cta::exception::XrootCl::throwOnError(status, "In EOSReporter::reportArchiveFullyComplete(): failed to XrdCl::FileSystem::Query()"); } +void EOSReporter::asyncReportArchiveFullyComplete() { + auto qcOpaque = XrdCl::QueryCode::OpaqueFile; + XrdCl::Buffer arg (m_query.size()); + arg.FromString(m_query); + AsyncQueryHandler *handler = new AsyncQueryHandler(m_reporterState); + XrdCl::XRootDStatus status=m_fs.Query( qcOpaque, arg, handler, CTA_EOS_QUERY_TIMEOUT); + cta::exception::XrootCl::throwOnError(status, + "In EOSReporter::asyncReportArchiveFullyComplete(): failed to XrdCl::FileSystem::Query()"); +} + +//------------------------------------------------------------------------------ +//EOSReporter::AsyncQueryHandler::AsyncQueryHandler +//------------------------------------------------------------------------------ +EOSReporter::AsyncQueryHandler::AsyncQueryHandler(std::promise<void> &handlerPromise): + m_handlerPromise(handlerPromise) {} + +//------------------------------------------------------------------------------ +//EOSReporter::AsyncQueryHandler::HandleResponse +//------------------------------------------------------------------------------ +void EOSReporter::AsyncQueryHandler::HandleResponse(XrdCl::XRootDStatus *status, + XrdCl::AnyObject *response) { + try { + cta::exception::XrootCl::throwOnError(*status, + "In EOSReporter::AsyncQueryHandler::HandleResponse(): failed to XrdCl::FileSystem::Query()"); + } catch (...) { + try { + // store anything thrown in the promise + m_handlerPromise.set_exception(std::current_exception()); + } catch(...) { + // set_exception() may throw too + } + } + + m_handlerPromise.set_value(); + delete response; + delete status; + delete this; + } }} // namespace cta::disk diff --git a/eos/EOSReporter.hpp b/eos/EOSReporter.hpp index c3c78de642ea5099dcf31080199e9668f13cc587..c35fe5490910891653a18aa2145809bed9ef0ed2 100644 --- a/eos/EOSReporter.hpp +++ b/eos/EOSReporter.hpp @@ -21,15 +21,28 @@ #include "DiskReporter.hpp" #include <XrdCl/XrdClFileSystem.hh> -namespace cta { namespace eos { +#include <future> +namespace cta { namespace eos { +const uint16_t CTA_EOS_QUERY_TIMEOUT = 15; // Timeout in seconds that is rounded up to the nearest 15 seconds + class EOSReporter: public DiskReporter { public: - EOSReporter(const std::string & hostURL, const std::string & queryValue); + EOSReporter(const std::string & hostURL, const std::string & queryValue, std::promise<void> &reporterState); void reportArchiveFullyComplete() override; + void asyncReportArchiveFullyComplete() override; private: XrdCl::FileSystem m_fs; std::string m_query; + std::promise<void> &m_reporterState; + class AsyncQueryHandler: public XrdCl::ResponseHandler { + public: + AsyncQueryHandler(std::promise<void> &handlerPromise); + virtual void HandleResponse(XrdCl::XRootDStatus *status, + XrdCl::AnyObject *response); + private: + std::promise<void> &m_handlerPromise; + }; }; -}} // namespace cta::disk \ No newline at end of file +}} // namespace cta::disk diff --git a/eos/NullReporter.hpp b/eos/NullReporter.hpp index e4d66ebc2239ef2ffcec2e7e24b039e2f7356ce4..86d1786a5c7c923275046972f13397ec4befbad9 100644 --- a/eos/NullReporter.hpp +++ b/eos/NullReporter.hpp @@ -26,6 +26,7 @@ class NullReporter: public DiskReporter { public: NullReporter() {}; void reportArchiveFullyComplete() override {}; + void asyncReportArchiveFullyComplete() override {}; }; }} // namespace cta::disk \ No newline at end of file diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index fd781dacbac99d71afdb864ec859c15b84c52da0..2601a8f66c25067d9a454c7b89d13259c0e1cbbd 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -461,6 +461,51 @@ const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() { return m_srcURL; } +ArchiveRequest::AsyncJobSuccessfulUpdater * ArchiveRequest::asyncUpdateJobSuccessful(const uint16_t copyNumber ) { + std::unique_ptr<AsyncJobSuccessfulUpdater> ret(new AsyncJobSuccessfulUpdater); + // Passing a reference to the unique pointer led to strange behaviors. + auto & retRef = *ret; + ret->m_updaterCallback= + [this,copyNumber, &retRef](const std::string &in)->std::string { + // We have a locked and fetched object, so we just need to work on its representation. + serializers::ObjectHeader oh; + oh.ParseFromString(in); + if (oh.type() != serializers::ObjectType::ArchiveRequest_t) { + std::stringstream err; + err << "In ArchiveRequest::asyncUpdateJobSuccessful()::lambda(): wrong object type: " << oh.type(); + throw cta::exception::Exception(err.str()); + } + serializers::ArchiveRequest payload; + payload.ParseFromString(oh.payload()); + auto * jl = payload.mutable_jobs(); + for (auto j=jl->begin(); j!=jl->end(); j++) { + if (j->copynb() == copyNumber) { + j->set_status(serializers::ArchiveJobStatus::AJS_Complete); + for (auto j2=jl->begin(); j2!=jl->end(); j2++) { + if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete && + j2->status()!= serializers::ArchiveJobStatus::AJS_Failed) { + retRef.m_isLastJob = false; + oh.set_payload(payload.SerializePartialAsString()); + return oh.SerializeAsString(); + } + } + retRef.m_isLastJob = true; + oh.set_payload(payload.SerializePartialAsString()); + throw cta::objectstore::Backend::AsyncUpdateWithDelete(oh.SerializeAsString()); + } + } + std::stringstream err; + err << "In ArchiveRequest::asyncUpdateJobSuccessful()::lambda(): copyNb not found"; + throw cta::exception::Exception(err.str()); + }; + ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(), ret->m_updaterCallback)); + return ret.release(); +} + +void ArchiveRequest::AsyncJobSuccessfulUpdater::wait() { + m_backendUpdater->wait(); +} + std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) { checkPayloadReadable(); auto jl = m_payload.jobs(); diff --git a/objectstore/ArchiveRequest.hpp b/objectstore/ArchiveRequest.hpp index bf6a8f72d3ef8bd62a05b2001b0179c53f239d3d..d56d3d0998065c60773f53258c2052eb105fb9c2 100644 --- a/objectstore/ArchiveRequest.hpp +++ b/objectstore/ArchiveRequest.hpp @@ -76,6 +76,19 @@ public: // An job owner updater factory. The owner MUST be previousOwner for the update to be executed. CTA_GENERATE_EXCEPTION_CLASS(WrongPreviousOwner); AsyncJobOwnerUpdater * asyncUpdateJobOwner(uint16_t copyNumber, const std::string & owner, const std::string &previousOwner); + + // An asynchronous job updating class for success. + class AsyncJobSuccessfulUpdater { + friend class ArchiveRequest; + public: + void wait(); + bool m_isLastJob; + private: + std::function<std::string(const std::string &)> m_updaterCallback; + std::unique_ptr<Backend::AsyncUpdater> m_backendUpdater; + }; + AsyncJobSuccessfulUpdater * asyncUpdateJobSuccessful(uint16_t copyNumber); + // Get a job owner std::string getJobOwner(uint16_t copyNumber); // Request management ======================================================== diff --git a/objectstore/Backend.hpp b/objectstore/Backend.hpp index a20bd81e869970164a95b81f13eee4fb7b4d5107..7b9a5ca0db36fc4330d4553fa0175d10cb5cd337 100644 --- a/objectstore/Backend.hpp +++ b/objectstore/Backend.hpp @@ -109,6 +109,7 @@ public: CTA_GENERATE_EXCEPTION_CLASS(CouldNotUpdateValue); CTA_GENERATE_EXCEPTION_CLASS(CouldNotCommit); CTA_GENERATE_EXCEPTION_CLASS(CouldNotUnlock); + CTA_GENERATE_EXCEPTION_CLASS(AsyncUpdateWithDelete); /** * A base class handling asynchronous sequence of lock exclusive, fetch, call user diff --git a/objectstore/BackendRados.cpp b/objectstore/BackendRados.cpp index 47d890e427d7443e14287e79827dbad17be3b6a3..18f7a3187da43f0870f2cbbbce06ae46296b4565 100644 --- a/objectstore/BackendRados.cpp +++ b/objectstore/BackendRados.cpp @@ -382,25 +382,48 @@ void BackendRados::AsyncUpdater::fetchCallback(librados::completion_t completion std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to read buffer: ") + au.m_name + ": "+ ex.what()); } - // Execute the user's callback. Let exceptions fly through. User knows his own exceptions. - value=au.m_update(value); - try { - // Prepare result in buffer list. - au.m_radosBufferList.clear(); - au.m_radosBufferList.append(value); - } catch (std::exception & ex) { - throw CouldNotUpdateValue( - std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") + - au.m_name + ": " + ex.what()); + + bool updateWithDelete = false; + try { + // Execute the user's callback. + value=au.m_update(value); + } catch (AsyncUpdateWithDelete & ex) { + updateWithDelete = true; + } catch (...) { + // Let exceptions fly through. User knows his own exceptions. + throw; } - // Launch the write - librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr); - auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList); - aioc->release(); - if (rc) { - cta::exception::Errnum errnum (-rc, - std::string("In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full(): ") + au.m_name); - throw Backend::CouldNotCommit(errnum.getMessageValue()); + + if(updateWithDelete) { + try { + au.m_backend.remove(au.m_name); + } catch (cta::exception::Exception &ex) { + throw CouldNotUpdateValue( + std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to remove value: ") + + au.m_name + ex.what()); + } + // Done! + ANNOTATE_HAPPENS_BEFORE(&au.m_job); + au.m_job.set_value(); + } else { + try { + // Prepare result in buffer list. + au.m_radosBufferList.clear(); + au.m_radosBufferList.append(value); + } catch (std::exception & ex) { + throw CouldNotUpdateValue( + std::string("In In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to prepare write buffer(): ") + + au.m_name + ex.what()); + } + // Launch the write + librados::AioCompletion * aioc = librados::Rados::aio_create_completion(pThis, commitCallback, nullptr); + auto rc=au.m_backend.m_radosCtx.aio_write_full(au.m_name, aioc, au.m_radosBufferList); + aioc->release(); + if (rc) { + cta::exception::Errnum errnum (-rc, + "In BackendRados::AsyncUpdater::fetchCallback::update_lambda(): failed to launch aio_write_full()" + au.m_name); + throw Backend::CouldNotCommit(errnum.getMessageValue()); + } } } catch (...) { ANNOTATE_HAPPENS_BEFORE(&au.m_job); diff --git a/objectstore/BackendVFS.cpp b/objectstore/BackendVFS.cpp index 3cb0a2c99c6d2f1b991eea3ccb24eb0167249d9b..a8680b6565c35241b3537f2c83e96641b0cdaad2 100644 --- a/objectstore/BackendVFS.cpp +++ b/objectstore/BackendVFS.cpp @@ -329,13 +329,32 @@ BackendVFS::AsyncUpdater::AsyncUpdater(BackendVFS & be, const std::string& name, ANNOTATE_HAPPENS_BEFORE(&m_job); throw Backend::CouldNotFetch(ex.getMessageValue()); } - // Let user's exceptions go through. - std::string postUpdateData=m_update(preUpdateData); - try { - m_backend.atomicOverwrite(m_name, postUpdateData); - } catch (cta::exception::Exception & ex) { - ANNOTATE_HAPPENS_BEFORE(&m_job); - throw Backend::CouldNotCommit(ex.getMessageValue()); + + std::string postUpdateData; + bool updateWithDelete = false; + try { + postUpdateData=m_update(preUpdateData); + } catch (AsyncUpdateWithDelete & ex) { + updateWithDelete = true; + } catch (...) { + // Let user's exceptions go through. + throw; + } + + if(updateWithDelete) { + try { + m_backend.remove(m_name); + } catch (cta::exception::Exception & ex) { + ANNOTATE_HAPPENS_BEFORE(&m_job); + throw Backend::CouldNotCommit(ex.getMessageValue()); + } + } else { + try { + m_backend.atomicOverwrite(m_name, postUpdateData); + } catch (cta::exception::Exception & ex) { + ANNOTATE_HAPPENS_BEFORE(&m_job); + throw Backend::CouldNotCommit(ex.getMessageValue()); + } } try { sl->release(); diff --git a/objectstore/ObjectOps.hpp b/objectstore/ObjectOps.hpp index 727b1abf1ac161cb4ed7ba51236fbfe6737ec4ea..f028186f82a827354df66d5cf63a7cac344eddd5 100644 --- a/objectstore/ObjectOps.hpp +++ b/objectstore/ObjectOps.hpp @@ -125,6 +125,12 @@ public: m_payloadInterpreted = false; } + void resetValues () { + m_existingObject = false; + m_headerInterpreted = false; + m_payloadInterpreted = false; + } + void setOwner(const std::string & owner) { checkHeaderWritable(); m_header.set_owner(owner); diff --git a/scheduler/ArchiveJob.cpp b/scheduler/ArchiveJob.cpp index 10d9905f99a84db148d1769d88b3603be349b83e..de37ff600db33e566f1e239887d37165f73eb88d 100644 --- a/scheduler/ArchiveJob.cpp +++ b/scheduler/ArchiveJob.cpp @@ -41,28 +41,28 @@ cta::ArchiveJob::ArchiveJob(ArchiveMount &mount, tapeFile(tapeFile) {} //------------------------------------------------------------------------------ -// complete +// asyncSetJobSucceed //------------------------------------------------------------------------------ -bool cta::ArchiveJob::complete() { - // First check that the block Id for the file has been set. - if (tapeFile.blockId == - std::numeric_limits<decltype(tapeFile.blockId)>::max()) - throw BlockIdNotSet("In cta::ArchiveJob::complete(): Block ID not set"); - // Also check the checksum has been set - if (archiveFile.checksumType.empty() || archiveFile.checksumValue.empty() || - tapeFile.checksumType.empty() || tapeFile.checksumValue.empty()) - throw ChecksumNotSet("In cta::ArchiveJob::complete(): checksums not set"); - // And matches - if (archiveFile.checksumType != tapeFile.checksumType || - archiveFile.checksumValue != tapeFile.checksumValue) - throw ChecksumMismatch(std::string("In cta::ArchiveJob::complete(): checksum mismatch!") - +" Archive file checksum type: "+archiveFile.checksumType - +" Archive file checksum value: "+archiveFile.checksumValue - +" Tape file checksum type: "+tapeFile.checksumType - +" Tape file checksum value: "+tapeFile.checksumValue); - // We are good to go to record the data in the persistent storage. - // Record the data in the archiveNS. The checksum will be validated if already - // present, of inserted if not. +void cta::ArchiveJob::asyncSetJobSucceed() { + m_dbJob->asyncSucceed(); +} + +//------------------------------------------------------------------------------ +// checkAndReportComplete +//------------------------------------------------------------------------------ +bool cta::ArchiveJob::checkAndAsyncReportComplete() { + if (m_dbJob->checkSucceed()) { + std::unique_ptr<eos::DiskReporter> reporter(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState)); + reporter->asyncReportArchiveFullyComplete(); + return true; + } + return false; +} + +//------------------------------------------------------------------------------ +// ArchiveJob::writeToCatalogue +//------------------------------------------------------------------------------ +void cta::ArchiveJob::writeToCatalogue() { catalogue::TapeFileWritten fileReport; fileReport.archiveFileId = archiveFile.archiveFileID; fileReport.blockId = tapeFile.blockId; @@ -82,18 +82,53 @@ bool cta::ArchiveJob::complete() { fileReport.tapeDrive = m_mount.getDrive(); fileReport.vid = tapeFile.vid; m_catalogue.filesWrittenToTape (std::set<catalogue::TapeFileWritten>{fileReport}); - //m_ns.addTapeFile(SecurityIdentity(UserIdentity(std::numeric_limits<uint32_t>::max(), - // std::numeric_limits<uint32_t>::max()), ""), archiveFile.fileId, nameServerTapeFile); - // We will now report the successful archival to the EOS instance. - // if TODO TODO - // We can now record the success for the job in the database. - // If this is the last job of the request, we also report the success to the client. - if (m_dbJob->succeed()) { - std::unique_ptr<eos::DiskReporter> reporter(m_mount.createDiskReporter(m_dbJob->archiveReportURL)); - reporter->reportArchiveFullyComplete(); - return true; - } - return false; +} +//------------------------------------------------------------------------------ +// ArchiveJob::validateAndGetTapeFileWritten +//------------------------------------------------------------------------------ +cta::catalogue::TapeFileWritten cta::ArchiveJob::validateAndGetTapeFileWritten() { + validate(); + catalogue::TapeFileWritten fileReport; + fileReport.archiveFileId = archiveFile.archiveFileID; + fileReport.blockId = tapeFile.blockId; + fileReport.checksumType = tapeFile.checksumType; + fileReport.checksumValue = tapeFile.checksumValue; + fileReport.compressedSize = tapeFile.compressedSize; + fileReport.copyNb = tapeFile.copyNb; + fileReport.diskFileId = archiveFile.diskFileId; + fileReport.diskFileUser = archiveFile.diskFileInfo.owner; + fileReport.diskFileGroup = archiveFile.diskFileInfo.group; + fileReport.diskFilePath = archiveFile.diskFileInfo.path; + fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob; + fileReport.diskInstance = archiveFile.diskInstance; + fileReport.fSeq = tapeFile.fSeq; + fileReport.size = archiveFile.fileSize; + fileReport.storageClassName = archiveFile.storageClass; + fileReport.tapeDrive = m_mount.getDrive(); + fileReport.vid = tapeFile.vid; + return fileReport; +} + +//------------------------------------------------------------------------------ +// ArchiveJob::validate +//------------------------------------------------------------------------------ +void cta::ArchiveJob::validate(){ + // First check that the block Id for the file has been set. + if (tapeFile.blockId == + std::numeric_limits<decltype(tapeFile.blockId)>::max()) + throw BlockIdNotSet("In cta::ArchiveJob::validate(): Block ID not set"); + // Also check the checksum has been set + if (archiveFile.checksumType.empty() || archiveFile.checksumValue.empty() || + tapeFile.checksumType.empty() || tapeFile.checksumValue.empty()) + throw ChecksumNotSet("In cta::ArchiveJob::validate(): checksums not set"); + // And matches + if (archiveFile.checksumType != tapeFile.checksumType || + archiveFile.checksumValue != tapeFile.checksumValue) + throw ChecksumMismatch(std::string("In cta::ArchiveJob::validate(): checksum mismatch!") + +" Archive file checksum type: "+archiveFile.checksumType + +" Archive file checksum value: "+archiveFile.checksumValue + +" Tape file checksum type: "+tapeFile.checksumType + +" Tape file checksum value: "+tapeFile.checksumValue); } //------------------------------------------------------------------------------ @@ -109,3 +144,10 @@ std::string cta::ArchiveJob::reportURL() { void cta::ArchiveJob::failed(const cta::exception::Exception &ex, log::LogContext & lc) { m_dbJob->fail(lc); } + +//------------------------------------------------------------------------------ +// waitForReporting +//------------------------------------------------------------------------------ +void cta::ArchiveJob::waitForReporting() { + m_reporterState.get_future().get(); +} diff --git a/scheduler/ArchiveJob.hpp b/scheduler/ArchiveJob.hpp index c042fe42f92f488d5949a624f0148541d65aac49..d94afd831da7e09e2ab34307928e344036c782fa 100644 --- a/scheduler/ArchiveJob.hpp +++ b/scheduler/ArchiveJob.hpp @@ -25,6 +25,7 @@ #include <stdint.h> #include <string> +#include <future> namespace cta { @@ -68,11 +69,37 @@ public: CTA_GENERATE_EXCEPTION_CLASS(BlockIdNotSet); CTA_GENERATE_EXCEPTION_CLASS(ChecksumNotSet); CTA_GENERATE_EXCEPTION_CLASS(ChecksumMismatch); + + /** + * Indicates that the job was successful and updates the backend store + * asynchronously. + */ + virtual void asyncSetJobSucceed(); + + /** + * Wait if the job was updated in the backend store asynchronously. + * @return true if the archive was also sent to client asynchronously. + */ + virtual bool checkAndAsyncReportComplete(); + + /** + * Validate that archiveFile and tapeFile fields are set correctly for archive + * request. + * Throw appropriate exception if there is any problem. + */ + virtual void validate(); + + /** + * Update the catalog with the archive request. + */ + virtual void writeToCatalogue(); + /** - * Indicates that the job was successful and updates the backend store - * @return true if the archive was also reported to client. + * Validate that archiveFile and tapeFile fields are set correctly for archive + * request. + * @return The tapeFileWritten event for the catalog update. */ - virtual bool complete(); + virtual catalogue::TapeFileWritten validateAndGetTapeFileWritten(); /** * Triggers a scheduler update following the failure of the job. @@ -99,13 +126,19 @@ private: * The mount that generated this job */ ArchiveMount &m_mount; - + /** * Reference to the name server */ catalogue::Catalogue &m_catalogue; -public: + /** + * State for the asynchronous report to the client. + */ + std::promise<void> m_reporterState; + +public: + CTA_GENERATE_EXCEPTION_CLASS(NotImplemented); /** @@ -122,6 +155,11 @@ public: * The file archive result for the NS */ common::dataStructures::TapeFile tapeFile; + + /** + * Wait for the reporterState is set by the reporting thread. + */ + virtual void waitForReporting(); }; // class ArchiveJob diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index e3e429ec2ba25d34750fa117fb8f71456e63d19a..93b6723504a2b0b8dd2284ccc3930ecf488ee9a0 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -77,8 +77,8 @@ uint32_t cta::ArchiveMount::getNbFiles() const { //------------------------------------------------------------------------------ // createDiskReporter //------------------------------------------------------------------------------ -cta::eos::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL) { - return m_reporterFactory.createDiskReporter(URL); +cta::eos::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL, std::promise<void> &reporterState) { + return m_reporterFactory.createDiskReporter(URL, reporterState); } //------------------------------------------------------------------------------ @@ -93,6 +93,12 @@ std::string cta::ArchiveMount::getMountTransactionId() const { } //------------------------------------------------------------------------------ +// updateCatalogueWithTapeFilesWritten +//------------------------------------------------------------------------------ +void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten) { + m_catalogue.filesWrittenToTape(tapeFilesWritten); +} + // getNextJobBatch //------------------------------------------------------------------------------ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(uint64_t filesRequested, diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index 9f9bde38c29979f185421b16a27615aeb6f0d4a3..f6584249d153294ded5d2d712535c731167071f0 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -139,9 +139,17 @@ namespace cta { /** * Creates a disk reporter for the ArchiveJob (this is a wrapper). * @param URL: report address - * @return poitner to the reporter created. + * @param reporterState void promise to be set when the report is done asynchronously. + * @return pointer to the reporter created. */ - eos::DiskReporter * createDiskReporter(std::string & URL); + eos::DiskReporter * createDiskReporter(std::string & URL, std::promise<void> &reporterState); + + /** + * Update the catalog with a set of TapeFileWritten events. + * + * @param tapeFilesWritten The set of report events for the catalog update. + */ + void updateCatalogueWithTapeFilesWritten(const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten); /** * Destructor. @@ -159,7 +167,7 @@ namespace cta { * A reference to the file catalogue. */ catalogue::Catalogue & m_catalogue; - + /** * Internal tracking of the session completion */ diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index 934c9ea25f5f32f91f891adc8e0b36c8c311addc..bc1148533ce6df3496c7bf17f7a52c0d39f05a7d 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -2255,24 +2255,26 @@ void OStoreDB::ArchiveJob::bumpUpTapeFileCount(uint64_t newFileCount) { } //------------------------------------------------------------------------------ -// OStoreDB::ArchiveJob::succeed() +// OStoreDB::ArchiveJob::asyncSucceed() //------------------------------------------------------------------------------ -bool OStoreDB::ArchiveJob::succeed() { - // Lock the request and set the job as successful. - objectstore::ScopedExclusiveLock atfrl(m_archiveRequest); - m_archiveRequest.fetch(); - std::string atfrAddress = m_archiveRequest.getAddressIfSet(); - bool lastJob=m_archiveRequest.setJobSuccessful(tapeFile.copyNb); - if (lastJob) { - m_archiveRequest.remove(); - } else { - m_archiveRequest.commit(); +void OStoreDB::ArchiveJob::asyncSucceed() { + m_jobUpdate.reset(m_archiveRequest.asyncUpdateJobSuccessful(tapeFile.copyNb)); +} + +//------------------------------------------------------------------------------ +// OStoreDB::ArchiveJob::checkSucceed() +//------------------------------------------------------------------------------ +bool OStoreDB::ArchiveJob::checkSucceed() { + m_jobUpdate->wait(); + if (m_jobUpdate->m_isLastJob) { + m_archiveRequest.resetValues(); } // We no more own the job (which could be gone) m_jobOwned = false; // Remove ownership from agent - m_agentReference.removeFromOwnership(atfrAddress, m_objectStore); - return lastJob; + const std::string atfrAddress = m_archiveRequest.getAddressIfSet(); + m_agentReference.removeFromOwnership(atfrAddress, m_objectStore); + return m_jobUpdate->m_isLastJob; } //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index ae73b7a016b686dca7f81c3c083ac0060996e8a0..feb9706f77b13604e213dd241201f9548ff71d5e 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -140,8 +140,9 @@ public: public: CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned); CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob); - bool succeed() override; void fail(log::LogContext & lc) override; + void asyncSucceed() override; + bool checkSucceed() override; void bumpUpTapeFileCount(uint64_t newFileCount) override; ~ArchiveJob() override; private: @@ -156,6 +157,7 @@ public: objectstore::AgentReference & m_agentReference; objectstore::ArchiveRequest m_archiveRequest; ArchiveMount & m_archiveMount; + std::unique_ptr<objectstore::ArchiveRequest::AsyncJobSuccessfulUpdater> m_jobUpdate; }; /* === Retrieve Mount handling ============================================ */ diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 217c0f251d93aaf80df0d752fae8027cd0898f18..eff6e30dfc2cdf7849dfa83c1a8f6a75bab477ce 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -185,9 +185,11 @@ public: std::string archiveReportURL; cta::common::dataStructures::ArchiveFile archiveFile; cta::common::dataStructures::TapeFile tapeFile; - /// Indicates a success to the DB. If this is the last job, return true. - virtual bool succeed() = 0; virtual void fail(log::LogContext & lc) = 0; + /// Indicates a success to the DB. + virtual void asyncSucceed() = 0; + /// Check a succeed job status. If this is the last job, return true. + virtual bool checkSucceed() = 0; virtual void bumpUpTapeFileCount(uint64_t newFileCount) = 0; virtual ~ArchiveJob() {} }; diff --git a/scheduler/SchedulerDatabaseTest.cpp b/scheduler/SchedulerDatabaseTest.cpp index e923a331a262589e5e3e0c223b225bdf7805c2e7..04bed21469f34c5d89d4efdd6429227f10f6b6b9 100644 --- a/scheduler/SchedulerDatabaseTest.cpp +++ b/scheduler/SchedulerDatabaseTest.cpp @@ -205,7 +205,8 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) { if (aj.size()) { count++; //std::cout << aj->archiveFile.diskFileInfo.path << std::endl; - aj.front()->succeed(); + aj.front()->asyncSucceed(); + aj.front()->checkSucceed(); } else done = true; @@ -291,7 +292,8 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) { if (aj.size()) { count++; //std::cout << aj->archiveFile.diskFileInfo.path << std::endl; - aj.front()->succeed(); + aj.front()->asyncSucceed(); + aj.front()->checkSucceed(); } else done = true; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index eef9864aaac663ce5abb49e5d62e4abe2e0a7896..6761270966d260b9bbc34b4673fafd357d167ce9 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -459,7 +459,10 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { archiveJob->tapeFile.checksumValue = "1234abcd"; archiveJob->tapeFile.compressedSize = archiveJob->archiveFile.fileSize; archiveJob->tapeFile.copyNb = 1; - archiveJob->complete(); + archiveJob->validate(); + archiveJob->writeToCatalogue(); + archiveJob->asyncSetJobSucceed(); + archiveJob->checkAndAsyncReportComplete(); archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); ASSERT_EQ(0, archiveJobBatch.size()); archiveMount->complete(); diff --git a/scheduler/testingMocks/MockArchiveJob.hpp b/scheduler/testingMocks/MockArchiveJob.hpp index 96375a20f783eb64d7011643fc2f2334cfadcc2f..53df1ac01bf2e807945c6fb93ced91301aee72d9 100644 --- a/scheduler/testingMocks/MockArchiveJob.hpp +++ b/scheduler/testingMocks/MockArchiveJob.hpp @@ -33,14 +33,43 @@ namespace cta { completes(0), failures(0) {} ~MockArchiveJob() throw() {} - - bool complete() override { + + void failed(const cta::exception::Exception& ex, log::LogContext & lc) override { + failures++; + } + + virtual void asyncSetJobSucceed() override { completes++; + } + virtual bool checkAndAsyncReportComplete() override { return false; + } + virtual void validate() override {} + virtual void writeToCatalogue() override {} + virtual catalogue::TapeFileWritten validateAndGetTapeFileWritten() override { + catalogue::TapeFileWritten fileReport; + fileReport.archiveFileId = archiveFile.archiveFileID; + fileReport.blockId = tapeFile.blockId; + fileReport.checksumType = tapeFile.checksumType; + fileReport.checksumValue = tapeFile.checksumValue; + fileReport.compressedSize = tapeFile.compressedSize; + fileReport.copyNb = tapeFile.copyNb; + fileReport.diskFileId = archiveFile.diskFileId; + fileReport.diskFileUser = archiveFile.diskFileInfo.owner; + fileReport.diskFileGroup = archiveFile.diskFileInfo.group; + fileReport.diskFilePath = archiveFile.diskFileInfo.path; + fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob; + fileReport.diskInstance = archiveFile.diskInstance; + fileReport.fSeq = tapeFile.fSeq; + fileReport.size = archiveFile.fileSize; + fileReport.storageClassName = archiveFile.storageClass; + fileReport.tapeDrive = "dummy"; + fileReport.vid = tapeFile.vid; + return fileReport; } - - void failed(const cta::exception::Exception& ex, log::LogContext & lc) override { + virtual void failed(const cta::exception::Exception& ex) { failures++; } + virtual void retry() {} }; } diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp index c5ad2e4609977047f1b73d3889c3f431b02b6064..0e004b22eeafe02fe01561efe3d5811b770b2cbe 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.cpp @@ -24,6 +24,7 @@ #include "castor/tape/tapeserver/daemon/MigrationReportPacker.hpp" #include "castor/tape/tapeserver/daemon/TaskWatchDog.hpp" #include "castor/tape/tapeserver/drive/DriveInterface.hpp" +#include "catalogue/TapeFileWritten.hpp" #include <memory> #include <numeric> @@ -205,59 +206,130 @@ void MigrationReportPacker::ReportFlush::execute(MigrationReportPacker& reportPa reportPacker.m_lc.log(cta::log::INFO,"Received a flush report from tape, but had no file to report to client. Doing nothing."); return; } - std::unique_ptr<cta::ArchiveJob> job; - try{ - while(!reportPacker.m_successfulArchiveJobs.empty()) { - // Get the next job to report and make sure we will not attempt to process it twice. - job = std::move(reportPacker.m_successfulArchiveJobs.front()); - reportPacker.m_successfulArchiveJobs.pop(); - if (!job.get()) continue; - cta::log::ScopedParamContainer params(reportPacker.m_lc); - params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); - if (job->complete()) { - params.add("reportURL", job->reportURL()); - reportPacker.m_lc.log(cta::log::INFO,"Reported to the client a full file archival"); - } else { - reportPacker.m_lc.log(cta::log::INFO, "Recorded the partial migration of a file"); - } - job.reset(nullptr); - } - reportPacker.m_lc.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape"); - } catch(const cta::exception::Exception& e){ - cta::log::ScopedParamContainer params(reportPacker.m_lc); - params.add("exceptionMessageValue", e.getMessageValue()); - if (job.get()) { - params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) - .add("reportURL", job->reportURL()); - } - const std::string msg_error="An exception was caught trying to call reportMigrationResults"; - reportPacker.m_lc.log(cta::log::ERR, msg_error); - throw failedMigrationRecallResult(msg_error); - } catch(const std::exception& e){ - cta::log::ScopedParamContainer params(reportPacker.m_lc); - params.add("exceptionWhat", e.what()); - if (job.get()) { - params.add("fileId", job->archiveFile.archiveFileID) - .add("diskInstance", job->archiveFile.diskInstance) - .add("diskFileId", job->archiveFile.diskFileId) - .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); - } - const std::string msg_error="An std::exception was caught trying to call reportMigrationResults"; - reportPacker.m_lc.log(cta::log::ERR, msg_error); - throw failedMigrationRecallResult(msg_error); - } + proceedJobsBatch(reportPacker,std::move(reportPacker.m_successfulArchiveJobs), reportPacker.m_lc); } else { // This is an abnormal situation: we should never flush after an error! reportPacker.m_lc.log(cta::log::ALERT,"Received a flush after an error: sending file errors to client"); } } +//------------------------------------------------------------------------------ +//ReportFlush::proceedJobsBatch +//------------------------------------------------------------------------------ +void MigrationReportPacker::ReportFlush::proceedJobsBatch(const MigrationReportPacker& reportPacker, std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, cta::log::LogContext &logContext){ + std::set<cta::catalogue::TapeFileWritten> tapeFilesWritten; + std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs; + std::unique_ptr<cta::ArchiveJob> job; + try{ + while(!successfulArchiveJobs.empty()) { + // Get the next job to report and make sure we will not attempt to process it twice. + job = std::move(successfulArchiveJobs.front()); + successfulArchiveJobs.pop(); + if (!job.get()) continue; + tapeFilesWritten.insert(job->validateAndGetTapeFileWritten()); + validatedSuccessfulArchiveJobs.emplace_back(std::move(job)); + job.reset(nullptr); + } + + updateCatalogueWithTapeFilesWritten(reportPacker, tapeFilesWritten, logContext); + asyncUpdateBackendWithJobsSucceeded(validatedSuccessfulArchiveJobs); + checkAndAsyncReportCompletedJobs(validatedSuccessfulArchiveJobs, logContext); + + logContext.log(cta::log::INFO,"Reported to the client that a batch of files was written on tape"); + } catch(const cta::exception::Exception& e){ + cta::log::ScopedParamContainer params(logContext); + params.add("exceptionMessageValue", e.getMessageValue()); + if (job.get()) { + params.add("fileId", job->archiveFile.archiveFileID) + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path) + .add("reportURL", job->reportURL()); + } + const std::string msg_error="An exception was caught trying to call reportMigrationResults"; + logContext.log(cta::log::ERR, msg_error); + throw failedMigrationRecallResult(msg_error); + } catch(const std::exception& e){ + cta::log::ScopedParamContainer params(logContext); + params.add("exceptionWhat", e.what()); + if (job.get()) { + params.add("fileId", job->archiveFile.archiveFileID) + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); + } + const std::string msg_error="An std::exception was caught trying to call reportMigrationResults"; + logContext.log(cta::log::ERR, msg_error); + throw failedMigrationRecallResult(msg_error); + } +} + +//------------------------------------------------------------------------------ +//ReportFlush::asyncUpdateBackendWithJobsSucceeded +//------------------------------------------------------------------------------ +void MigrationReportPacker::ReportFlush::asyncUpdateBackendWithJobsSucceeded( + const std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs) { + for (const auto &job: validatedSuccessfulArchiveJobs){ + job->asyncSetJobSucceed(); + } +} + +//------------------------------------------------------------------------------ +//ReportFlush::checkAndAsyncReportCompletedJobs +//------------------------------------------------------------------------------ +void MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs( + std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs, + cta::log::LogContext &logContext) { + std::list<std::unique_ptr <cta::ArchiveJob> > reportedArchiveJobs; + + for (auto &job: validatedSuccessfulArchiveJobs){ + cta::log::ScopedParamContainer params(logContext); + params.add("fileId", job->archiveFile.archiveFileID) + .add("diskInstance", job->archiveFile.diskInstance) + .add("diskFileId", job->archiveFile.diskFileId) + .add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path); + logContext.log(cta::log::DEBUG, + "In MigrationReportPacker::ReportFlush::checkAndAsyncReportCompletedJobs()" + " check for async backend update finished"); + if(job->checkAndAsyncReportComplete()) { + params.add("reportURL", job->reportURL()); + reportedArchiveJobs.emplace_back(std::move(job)); + logContext.log(cta::log::INFO,"Sent to the client a full file archival"); + } else { + logContext.log(cta::log::INFO, "Recorded the partial migration of a file"); + } + } + + for (const auto &job: reportedArchiveJobs){ + try { + job->waitForReporting(); // should not be a deadWait as soon as we have a timeout on the xroot query + cta::log::ScopedParamContainer params(logContext); + params.add("reportURL", job->reportURL()); + logContext.log(cta::log::INFO,"Reported to the client a full file archival"); + } catch(cta::exception::Exception &ex) { + cta::log::ScopedParamContainer params(logContext); + params.add("reportURL", job->reportURL()); + params.add("errorMessage", ex.getMessage().str()); + logContext.log(cta::log::ERR,"Unsuccessful report to the client a full file archival:"); + } catch(...) { + throw; + } + } +} + +//------------------------------------------------------------------------------ +//ReportFlush::updateCatalogueWithTapeFilesWritten +//------------------------------------------------------------------------------ +void MigrationReportPacker::ReportFlush::updateCatalogueWithTapeFilesWritten( + const MigrationReportPacker &reportPacker, + const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten, + cta::log::LogContext &logContext) { + reportPacker.m_archiveMount->updateCatalogueWithTapeFilesWritten(tapeFilesWritten); + cta::log::ScopedParamContainer params(logContext); + params.add("tapeFilesWritten", tapeFilesWritten.size()); + logContext.log(cta::log::INFO,"Catalog updated for batch of jobs"); +} + //------------------------------------------------------------------------------ //reportTapeFull()::execute //------------------------------------------------------------------------------ diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp index dd014536c7a2e4ff5e068f3294ef558c2311164c..1bd7a6f2e051a7591bb4b1c5fca61a4b5457f381 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPacker.hpp @@ -165,6 +165,18 @@ private: ReportFlush(drive::compressionStats compressStats):m_compressStats(compressStats){} void execute(MigrationReportPacker& reportPacker) override; + void proceedJobsBatch(const MigrationReportPacker& reportPacker, + std::queue<std::unique_ptr<cta::ArchiveJob> > successfulArchiveJobs, + cta::log::LogContext &log); + void asyncUpdateBackendWithJobsSucceeded( + const std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs); + void checkAndAsyncReportCompletedJobs( + std::list<std::unique_ptr<cta::ArchiveJob> > &validatedSuccessfulArchiveJobs, + cta::log::LogContext &logContext); + void updateCatalogueWithTapeFilesWritten( + const MigrationReportPacker &reportPacker, + const std::set<cta::catalogue::TapeFileWritten> &tapeFilesWritten, + cta::log::LogContext &logContext); }; class ReportTapeFull: public Report { public: diff --git a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp index 687e28a600292ed152befe6cd5e77479ffcd3d4a..908f5e8c7f5578a55bc23da328a63d95b0e1f05a 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/MigrationReportPackerTest.cpp @@ -69,16 +69,42 @@ namespace unitTests { int & completes, int &failures): MockArchiveJob(am, catalogue), completesRef(completes), failuresRef(failures) {} - bool complete() override { + virtual void asyncSetJobSucceed() override { completesRef++; + } + + virtual bool checkAndAsyncReportComplete() override { return false; } + virtual void validate() override {} + virtual void writeToCatalogue() override {} + virtual cta::catalogue::TapeFileWritten validateAndGetTapeFileWritten() override { + cta::catalogue::TapeFileWritten fileReport; + fileReport.archiveFileId = archiveFile.archiveFileID; + fileReport.blockId = tapeFile.blockId; + fileReport.checksumType = tapeFile.checksumType; + fileReport.checksumValue = tapeFile.checksumValue; + fileReport.compressedSize = tapeFile.compressedSize; + fileReport.copyNb = tapeFile.copyNb; + fileReport.diskFileId = archiveFile.diskFileId; + fileReport.diskFileUser = archiveFile.diskFileInfo.owner; + fileReport.diskFileGroup = archiveFile.diskFileInfo.group; + fileReport.diskFilePath = archiveFile.diskFileInfo.path; + fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob; + fileReport.diskInstance = archiveFile.diskInstance; + fileReport.fSeq = tapeFile.fSeq; + fileReport.size = archiveFile.fileSize; + fileReport.storageClassName = archiveFile.storageClass; + fileReport.tapeDrive = std::string("testDrive"); + fileReport.vid = tapeFile.vid; + return fileReport; + } + void failed(const cta::exception::Exception& ex, cta::log::LogContext & lc) override { failuresRef++; } - private: int & completesRef; int & failuresRef; @@ -86,7 +112,29 @@ namespace unitTests { TEST_F(castor_tape_tapeserver_daemon_MigrationReportPackerTest, MigrationReportPackerNominal) { cta::MockArchiveMount tam(*m_catalogue); + + const std::string vid1 = "VTEST001"; + const std::string vid2 = "VTEST002"; + const std::string logicalLibraryName = "logical_library_name"; + const std::string tapePoolName = "tape_pool_name"; + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; + const bool disabledValue = true; + const bool fullValue = false; + const std::string createTapeComment = "Create tape"; + cta::common::dataStructures::SecurityIdentity admin = cta::common::dataStructures::SecurityIdentity("admin","localhost"); + m_catalogue->createLogicalLibrary(admin, logicalLibraryName, "Create logical library"); + m_catalogue->createTapePool(admin, tapePoolName, 2, true, "Create tape pool"); + m_catalogue->createTape(admin, vid1, logicalLibraryName, tapePoolName, capacityInBytes, + disabledValue, fullValue, createTapeComment); + + cta::common::dataStructures::StorageClass storageClass; + storageClass.diskInstance = "disk_instance"; + storageClass.name = "storage_class"; + storageClass.nbCopies = 1; + storageClass.comment = "Create storage class"; + m_catalogue->createStorageClass(admin, storageClass); + ::testing::InSequence dummy; std::unique_ptr<cta::ArchiveJob> job1; int job1completes(0), job1failures(0); @@ -95,6 +143,25 @@ namespace unitTests { new MockArchiveJobExternalStats(tam, *m_catalogue, job1completes, job1failures)); job1.reset(mockJob.release()); } + job1->archiveFile.archiveFileID=1; + job1->archiveFile.diskInstance="disk_instance"; + job1->archiveFile.diskFileId="diskFileId1"; + job1->archiveFile.diskFileInfo.path="filePath1"; + job1->archiveFile.diskFileInfo.owner="testUser1"; + job1->archiveFile.diskFileInfo.group="testGroup1"; + job1->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob1"; + job1->archiveFile.fileSize=1024; + job1->archiveFile.checksumType="md5"; + job1->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; + job1->archiveFile.storageClass="storage_class"; + job1->tapeFile.vid="VTEST001"; + job1->tapeFile.fSeq=1; + job1->tapeFile.blockId=256; + job1->tapeFile.compressedSize=768; + job1->tapeFile.copyNb=1; + job1->tapeFile.checksumType="md5"; + job1->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; + std::unique_ptr<cta::ArchiveJob> job2; int job2completes(0), job2failures(0); { @@ -102,7 +169,25 @@ namespace unitTests { new MockArchiveJobExternalStats(tam, *m_catalogue, job2completes, job2failures)); job2.reset(mockJob.release()); } - + job2->archiveFile.archiveFileID=2; + job2->archiveFile.diskInstance="disk_instance"; + job2->archiveFile.diskFileId="diskFileId2"; + job2->archiveFile.diskFileInfo.path="filePath2"; + job2->archiveFile.diskFileInfo.owner="testUser2"; + job2->archiveFile.diskFileInfo.group="testGroup2"; + job2->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob2"; + job2->archiveFile.fileSize=1024; + job2->archiveFile.checksumType="md5"; + job2->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; + job2->archiveFile.storageClass="storage_class"; + job2->tapeFile.vid="VTEST001"; + job2->tapeFile.fSeq=2; + job2->tapeFile.blockId=512; + job2->tapeFile.compressedSize=768; + job2->tapeFile.copyNb=1; + job2->tapeFile.checksumType="md5"; + job2->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; + cta::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerNominal",cta::log::DEBUG); cta::log::LogContext lc(log); tapeserver::daemon::MigrationReportPacker mrp(&tam,lc); @@ -170,9 +255,31 @@ namespace unitTests { ASSERT_EQ(1, job3failures); } - TEST_F(castor_tape_tapeserver_daemon_MigrationReportPackerTest, MigrationReportPackerOneByteFile) { + TEST_F(castor_tape_tapeserver_daemon_MigrationReportPackerTest, MigrationReportPackerBadFile) { cta::MockArchiveMount tam(*m_catalogue); + + const std::string vid1 = "VTEST001"; + const std::string vid2 = "VTEST002"; + const std::string logicalLibraryName = "logical_library_name"; + const std::string tapePoolName = "tape_pool_name"; + const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000; + const bool disabledValue = true; + const bool fullValue = false; + const std::string createTapeComment = "Create tape"; + cta::common::dataStructures::SecurityIdentity admin = cta::common::dataStructures::SecurityIdentity("admin","localhost"); + m_catalogue->createLogicalLibrary(admin, logicalLibraryName, "Create logical library"); + m_catalogue->createTapePool(admin, tapePoolName, 2, true, "Create tape pool"); + m_catalogue->createTape(admin, vid1, logicalLibraryName, tapePoolName, capacityInBytes, + disabledValue, fullValue, createTapeComment); + + cta::common::dataStructures::StorageClass storageClass; + storageClass.diskInstance = "disk_instance"; + storageClass.name = "storage_class"; + storageClass.nbCopies = 1; + storageClass.comment = "Create storage class"; + m_catalogue->createStorageClass(admin, storageClass); + ::testing::InSequence dummy; std::unique_ptr<cta::ArchiveJob> migratedBigFile; int migratedBigFileCompletes(0), migratedBigFileFailures(0); @@ -196,9 +303,62 @@ namespace unitTests { migratedNullFile.reset(mockJob.release()); } - migratedBigFile->archiveFile.fileSize=100000; - migratedFileSmall->archiveFile.fileSize=1; - migratedNullFile->archiveFile.fileSize=0; + migratedBigFile->archiveFile.archiveFileID=4; + migratedBigFile->archiveFile.diskInstance="disk_instance"; + migratedBigFile->archiveFile.diskFileId="diskFileId2"; + migratedBigFile->archiveFile.diskFileInfo.path="filePath2"; + migratedBigFile->archiveFile.diskFileInfo.owner="testUser2"; + migratedBigFile->archiveFile.diskFileInfo.group="testGroup2"; + migratedBigFile->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob2"; + migratedBigFile->archiveFile.fileSize=100000; + migratedBigFile->archiveFile.checksumType="md5"; + migratedBigFile->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; + migratedBigFile->archiveFile.storageClass="storage_class"; + migratedBigFile->tapeFile.vid="VTEST001"; + migratedBigFile->tapeFile.fSeq=1; + migratedBigFile->tapeFile.blockId=256; + migratedBigFile->tapeFile.compressedSize=768; + migratedBigFile->tapeFile.copyNb=1; + migratedBigFile->tapeFile.checksumType="md5"; + migratedBigFile->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; + + migratedFileSmall->archiveFile.archiveFileID=5; + migratedFileSmall->archiveFile.diskInstance="disk_instance"; + migratedFileSmall->archiveFile.diskFileId="diskFileId3"; + migratedFileSmall->archiveFile.diskFileInfo.path="filePath3"; + migratedFileSmall->archiveFile.diskFileInfo.owner="testUser2"; + migratedFileSmall->archiveFile.diskFileInfo.group="testGroup2"; + migratedFileSmall->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob2"; + migratedFileSmall->archiveFile.fileSize=1; + migratedFileSmall->archiveFile.checksumType="md5"; + migratedFileSmall->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; + migratedFileSmall->archiveFile.storageClass="storage_class"; + migratedFileSmall->tapeFile.vid="VTEST001"; + migratedFileSmall->tapeFile.fSeq=2; + migratedFileSmall->tapeFile.blockId=512; + migratedFileSmall->tapeFile.compressedSize=1; + migratedFileSmall->tapeFile.copyNb=1; + migratedFileSmall->tapeFile.checksumType="md5"; + migratedFileSmall->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; + + migratedNullFile->archiveFile.archiveFileID=6; + migratedNullFile->archiveFile.diskInstance="disk_instance"; + migratedNullFile->archiveFile.diskFileId="diskFileId4"; + migratedNullFile->archiveFile.diskFileInfo.path="filePath4"; + migratedNullFile->archiveFile.diskFileInfo.owner="testUser2"; + migratedNullFile->archiveFile.diskFileInfo.group="testGroup2"; + migratedNullFile->archiveFile.diskFileInfo.recoveryBlob="recoveryBlob2"; + migratedNullFile->archiveFile.fileSize=0; + migratedNullFile->archiveFile.checksumType="md5"; + migratedNullFile->archiveFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; + migratedNullFile->archiveFile.storageClass="storage_class"; + migratedNullFile->tapeFile.vid="VTEST001"; + migratedNullFile->tapeFile.fSeq=3; + migratedNullFile->tapeFile.blockId=768; + migratedNullFile->tapeFile.compressedSize=0; + migratedNullFile->tapeFile.copyNb=1; + migratedNullFile->tapeFile.checksumType="md5"; + migratedFileSmall->tapeFile.checksumValue="b170288bf1f61b26a648358866f4d6c6"; cta::log::StringLogger log("castor_tape_tapeserver_daemon_MigrationReportPackerOneByteFile",cta::log::DEBUG); cta::log::LogContext lc(log); @@ -216,10 +376,12 @@ namespace unitTests { mrp.waitThread(); std::string temp = log.getLog(); - ASSERT_NE(std::string::npos, temp.find("Reported to the client that a batch of files was written on tape")); - ASSERT_EQ(1, tam.completes); - ASSERT_EQ(1, migratedBigFileCompletes); - ASSERT_EQ(1, migratedFileSmallCompletes); - ASSERT_EQ(1, migratedNullFileCompletes); + ASSERT_NE(std::string::npos, temp.find("TapeFileWrittenEvent is invalid")); + ASSERT_NE(std::string::npos, temp.find("Successfully closed client's session " + "after the failed report MigrationResult")); + ASSERT_EQ(0, tam.completes); + ASSERT_EQ(0, migratedBigFileCompletes); + ASSERT_EQ(0, migratedFileSmallCompletes); + ASSERT_EQ(0, migratedNullFileCompletes); } }