Commit 02a5de96 authored by Eric Cano's avatar Eric Cano
Browse files

Added archive report URL support in front end, object store and scheduler.

parent 70c26eaa
......@@ -45,6 +45,7 @@ bool ArchiveRequest::operator==(const ArchiveRequest &rhs) const {
&& diskFileInfo==rhs.diskFileInfo
&& diskpoolName==rhs.diskpoolName
&& diskpoolThroughput==rhs.diskpoolThroughput
&& archiveReportURL==rhs.archiveReportURL
&& creationLog==rhs.creationLog;
}
......@@ -69,6 +70,7 @@ std::ostream &operator<<(std::ostream &os, const ArchiveRequest &obj) {
<< " diskFileInfo=" << obj.diskFileInfo
<< " diskpoolName=" << obj.diskpoolName
<< " diskpoolThroughput=" << obj.diskpoolThroughput
<< " archiveReportURL=" << obj.archiveReportURL
<< " creationLog=" << obj.creationLog << ")";
return os;
}
......
......@@ -59,6 +59,7 @@ struct ArchiveRequest {
DiskFileInfo diskFileInfo;
std::string diskpoolName;
uint64_t diskpoolThroughput;
std::string archiveReportURL;
EntryLog creationLog;
}; // struct ArchiveRequest
......
......@@ -19,7 +19,7 @@ cmake_minimum_required (VERSION 2.6)
find_package (xrootdclient REQUIRED)
include_directories (${XROOTD_INCLUDE_DIR} ${CMAKE_SOURCE_DIR})
add_library (ctaclientsystem
add_library (ctadisk
DiskReporter.cpp
DiskReporterFactory.cpp
EOSReporter.cpp)
......@@ -20,10 +20,12 @@
#include "EOSReporter.hpp"
#include "NullReporter.hpp"
#include "common/exception/Exception.hpp"
#include "common/threading/MutexLocker.hpp"
namespace cta { namespace disk {
DiskReporter* DiskReporterFactory::createDiskReporter(const std::string URL) {
threading::MutexLocker ml(m_mutex);
auto regexResult = m_EosUrlRegex.exec(URL);
if (regexResult.size()) {
return new EOSReporter(regexResult[1], regexResult[2]);
......
......@@ -20,6 +20,7 @@
#include "DiskReporter.hpp"
#include "common/utils/Regex.hpp"
#include "common/threading/Mutex.hpp"
#include <string>
......@@ -36,5 +37,7 @@ private:
// XrdCl::FileSystem(XrdCl::URL("eoserver.cern.ch")).Query("/eos/wfe/passwd?mgm.pcmd=event&mgm.fid=112&mgm.logid=cta&mgm.event=migrated&mgm.workflow=default&mgm.path=/eos/wfe/passwd&mgm.ruid=0&mgm.rgid=0");
cta::utils::Regex m_EosUrlRegex{"^eosQuery://([^/]+)(/.*)$"};
cta::utils::Regex m_NullRegex{"^$"};
/// This mutex ensures we do not use the regexes in parallel.
cta::threading::Mutex m_mutex;
};
}} // namespace cta::disk
\ No newline at end of file
......@@ -205,6 +205,23 @@ uint64_t cta::objectstore::ArchiveRequest::getDiskpoolThroughput() {
return m_payload.diskpoolthroughput();
}
//------------------------------------------------------------------------------
// setArchiveReportURL
//------------------------------------------------------------------------------
void ArchiveRequest::setArchiveReportURL(const std::string& URL) {
checkPayloadWritable();
m_payload.set_archivereporturl(URL);
}
//------------------------------------------------------------------------------
// getArviveReportURL
//------------------------------------------------------------------------------
std::string ArchiveRequest::getArchiveReportURL() {
checkPayloadReadable();
return m_payload.archivereporturl();
}
//------------------------------------------------------------------------------
// setDiskpoolThroughput
//------------------------------------------------------------------------------
......
......@@ -76,6 +76,9 @@ public:
void setDiskpoolThroughput(const uint64_t diskpoolThroughput);
uint64_t getDiskpoolThroughput();
void setArchiveReportURL(const std::string &URL);
std::string getArchiveReportURL();
void setRequester(const cta::common::dataStructures::UserIdentity &requester);
cta::common::dataStructures::UserIdentity getRequester();
......
......@@ -326,6 +326,7 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
ar.setMountPolicy(mp);
ar.setDiskpoolName("");
ar.setDiskpoolThroughput(666);
ar.setArchiveReportURL("");
ar.setRequester(cta::common::dataStructures::UserIdentity("user0", "group0"));
ar.setSrcURL("root://eoseos/myFile");
ar.setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr)));
......
......@@ -333,6 +333,7 @@ message ArchiveRequest {
required DiskFileInfo diskfileinfo = 9040;
required string diskfileid = 9050;
required string diskinstance = 9055;
required string archivereporturl = 9057;
required uint64 filesize = 9060;
required User requester = 9070;
required string srcurl = 9080;
......
......@@ -18,6 +18,7 @@
#include "scheduler/ArchiveJob.hpp"
#include "scheduler/ArchiveMount.hpp"
#include "disk/DiskReporterFactory.hpp"
#include <limits>
//------------------------------------------------------------------------------
......@@ -85,8 +86,12 @@ void cta::ArchiveJob::complete() {
// std::numeric_limits<uint32_t>::max()), ""), archiveFile.fileId, nameServerTapeFile);
// We will now report the successful archival to the EOS instance.
// if TODO TODO
// We can now record the success for the job in the database
m_dbJob->succeed();
// We can now record the success for the job in the database.
// If this is the last job of the request, we also report the success to the client.
if (m_dbJob->succeed()) {
std::unique_ptr<disk::DiskReporter> reporter(m_mount.createDiskReporter(m_dbJob->archiveReportURL));
reporter->reportArchiveFullyComplete();
}
}
//------------------------------------------------------------------------------
......
......@@ -110,7 +110,7 @@ public:
/**
* The remote file information
*/
std::string srcURL;
std::string srcURL;
/**
* The file archive result for the NS
......
......@@ -74,6 +74,13 @@ uint32_t cta::ArchiveMount::getNbFiles() const {
return m_dbMount->nbFilesCurrentlyOnTape;
}
//------------------------------------------------------------------------------
// createDiskReporter
//------------------------------------------------------------------------------
cta::disk::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL) {
return m_reporterFactory.createDiskReporter(URL);
}
//------------------------------------------------------------------------------
// getMountTransactionId
//------------------------------------------------------------------------------
......
......@@ -23,6 +23,7 @@
#include "scheduler/SchedulerDatabase.hpp"
#include "scheduler/TapeMount.hpp"
#include "catalogue/Catalogue.hpp"
#include "disk/DiskReporterFactory.hpp"
#include <memory>
#include <atomic>
......@@ -125,6 +126,13 @@ namespace cta {
*/
uint32_t getNbFiles() const override;
/**
* Creates a disk reporter for the ArchiveJob (this is a wrapper).
* @param URL: report address
* @return poitner to the reporter created.
*/
disk::DiskReporter * createDiskReporter(std::string & URL);
/**
* Destructor.
*/
......@@ -147,6 +155,9 @@ namespace cta {
*/
std::atomic<bool> m_sessionRunning;
private:
/** An initialized-once factory for archive reports (indirectly used by ArchiveJobs) */
disk::DiskReporterFactory m_reporterFactory;
}; // class ArchiveMount
} // namespace cta
......@@ -24,7 +24,7 @@ add_library (ctascheduler SHARED
${CTA_SCHEDULER_SRC_FILES})
install (TARGETS ctascheduler DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
target_link_libraries (ctascheduler ctacommon ctaobjectstore protobuf ctautils)
target_link_libraries (ctascheduler ctacommon ctaobjectstore protobuf ctautils ctadisk)
#add_library (ctaschedulerutils SHARED
# _old_prototype_DummyScheduler.cpp)
......
......@@ -303,6 +303,7 @@ void OStoreDB::queueArchive(const std::string &instanceName, const cta::common::
aReq.setMountPolicy(criteria.mountPolicy);
aReq.setDiskpoolName(request.diskpoolName);
aReq.setDiskpoolThroughput(request.diskpoolThroughput);
aReq.setArchiveReportURL(request.archiveReportURL);
aReq.setRequester(request.requester);
aReq.setSrcURL(request.srcURL);
aReq.setEntryLog(request.creationLog);
......@@ -632,6 +633,7 @@ std::list<cta::common::dataStructures::ArchiveJob>
ret.back().instanceName = osar.getArchiveFile().diskInstance;
ret.back().request.requester = osar.getRequester();
ret.back().request.srcURL = osar.getSrcURL();
ret.back().request.archiveReportURL = osar.getArchiveReportURL();
ret.back().request.storageClass = osar.getArchiveFile().storageClass;
}
return ret;
......@@ -687,6 +689,7 @@ std::map<std::string, std::list<common::dataStructures::ArchiveJob> >
ret[tpp.tapePool].back().instanceName = osar.getArchiveFile().diskInstance;
ret[tpp.tapePool].back().request.requester = osar.getRequester();
ret[tpp.tapePool].back().request.srcURL = osar.getSrcURL();
ret[tpp.tapePool].back().request.archiveReportURL = osar.getArchiveReportURL();
ret[tpp.tapePool].back().request.storageClass = osar.getArchiveFile().storageClass;
}
}
......@@ -1583,6 +1586,7 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::
aql.release();
privateRet->archiveFile = privateRet->m_archiveRequest.getArchiveFile();
privateRet->srcURL = privateRet->m_archiveRequest.getSrcURL();
privateRet->archiveReportURL = privateRet->m_archiveRequest.getArchiveReportURL();
privateRet->tapeFile.fSeq = ++nbFilesCurrentlyOnTape;
privateRet->tapeFile.copyNb = job.copyNb;
privateRet->tapeFile.vid = mountInfo.vid;
......@@ -1974,12 +1978,13 @@ void OStoreDB::ArchiveJob::bumpUpTapeFileCount(uint64_t newFileCount) {
//------------------------------------------------------------------------------
// OStoreDB::ArchiveJob::succeed()
//------------------------------------------------------------------------------
void OStoreDB::ArchiveJob::succeed() {
bool OStoreDB::ArchiveJob::succeed() {
// Lock the request and set the job as successful.
objectstore::ScopedExclusiveLock atfrl(m_archiveRequest);
m_archiveRequest.fetch();
std::string atfrAddress = m_archiveRequest.getAddressIfSet();
if (m_archiveRequest.setJobSuccessful(tapeFile.copyNb)) {
bool lastJob=m_archiveRequest.setJobSuccessful(tapeFile.copyNb);
if (lastJob) {
m_archiveRequest.remove();
} else {
m_archiveRequest.commit();
......@@ -1992,6 +1997,7 @@ void OStoreDB::ArchiveJob::succeed() {
ag.fetch();
ag.removeFromOwnership(atfrAddress);
ag.commit();
return lastJob;
}
//------------------------------------------------------------------------------
......
......@@ -99,7 +99,7 @@ public:
public:
CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
void succeed() override;
bool succeed() override;
void fail() override;
void bumpUpTapeFileCount(uint64_t newFileCount) override;
~ArchiveJob() override;
......
......@@ -186,9 +186,11 @@ public:
class ArchiveJob {
public:
std::string srcURL;
std::string archiveReportURL;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::TapeFile tapeFile;
virtual void succeed() = 0;
/// Indicates a success to the DB. If this is the last job, return true.
virtual bool succeed() = 0;
virtual void fail() = 0;
virtual void bumpUpTapeFileCount(uint64_t newFileCount) = 0;
virtual ~ArchiveJob() {}
......
......@@ -23,7 +23,6 @@
#include "common/archiveRoutes/ArchiveRoute.hpp"
#include "common/make_unique.hpp"
#include "scheduler/ArchiveMount.hpp"
#include "scheduler/ArchiveRequest.hpp"
#include "scheduler/LogicalLibrary.hpp"
#include "scheduler/MountRequest.hpp"
#include "scheduler/OStoreDB/OStoreDBFactory.hpp"
......
......@@ -2029,6 +2029,7 @@ void XrdCtaFile::xCom_archive() {
optional<std::string> recoveryblob = getOptionStringValue("", "--recoveryblob", true, false);
optional<std::string> diskpool = getOptionStringValue("", "--diskpool", true, false);
optional<uint64_t> throughput = getOptionUint64Value("", "--throughput", true, false);
optional<std::string> archiveReportURL = getOptionStringValue("", "-–reportURL", false, true, "");
checkOptions(help.str());
cta::common::dataStructures::UserIdentity originator;
originator.name=user.value();
......@@ -2049,6 +2050,7 @@ void XrdCtaFile::xCom_archive() {
request.requester=originator;
request.srcURL=srcurl.value();
request.storageClass=storageclass.value();
request.archiveReportURL=archiveReportURL.value();
uint64_t archiveFileId = m_scheduler->queueArchive(m_cliIdentity.username, request);
cmdlineOutput << "<eos::wfe::path::fxattr:sys.archiveFileId>" << archiveFileId << std::endl;
logRequestAndSetCmdlineResult(cta::common::dataStructures::FrontendReturnCode::ok, cmdlineOutput.str());
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment