Commit 1005113d authored by Eric Cano's avatar Eric Cano
Browse files

Preliminary implementation of Archive requests. Intermediate commit.

parent 4b1cf1bf
......@@ -19,6 +19,7 @@ add_library (CTAObjectStore SHARED
AgentWatchdog.cpp
TapePool.cpp
Tape.cpp
ArchiveToFileRequest.cpp
DriveRegister.cpp
BackendVFS.cpp
BackendRados.cpp
......
......@@ -195,10 +195,49 @@ message Tape {
repeated RetrievalJobPointer retrievaljobs = 4305;
}
// ------------- Jobs ----------------------------------------------------------
// The status of the individual archive jobs. The jobs are materialised
// by table entries in the ArchiveToFileRequest.
// This life cycle represented by the following enum
enum ArchiveJobStatus {
AJS_LinkingToTapePool = 0;
AJS_Pending = 1;
AJS_Selected = 2;
AJS_Complete = 3;
AJS_Failed = 99;
}
message ArchiveJobEntry {
required uint32 copynb = 4400;
required string tapepool = 4401;
required string tapepooladdress = 4402;
required ArchiveJobStatus status = 4403;
required uint32 totalretries = 4404;
required uint32 retrieswithinmount = 4405;
}
message ArchiveToFileRequest {
required string remotefile = 4500;
required string archivefile = 4501;
repeated ArchiveJobEntry routes = 4502;
required uint64 priority = 4503;
required CreationLog log = 4504;
optional string archivetodiraddress = 4505;
}
message ArchivalToDirRequest {
required string archivedir = 4600;
repeated string filerequestsaddresses = 4601;
required uint64 priority = 4602;
required CreationLog log = 4603;
}
// ------------- Drives handling ----------------------------------------------
message DriveRegister {
repeated string drivenames = 5000;
repeated string drivenames = 7000;
}
......
......@@ -35,7 +35,6 @@ cta::ArchiveRequest::~ArchiveRequest() throw() {
//------------------------------------------------------------------------------
cta::ArchiveRequest::ArchiveRequest(
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime):
UserRequest(priority, user, creationTime) {
const CreationLog &creationLog):
UserRequest(priority, creationLog) {
}
......@@ -51,8 +51,7 @@ public:
*/
ArchiveRequest(
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime = time(NULL));
const CreationLog & creationLog);
private:
......
......@@ -38,9 +38,8 @@ cta::ArchiveToDirRequest::ArchiveToDirRequest(
const std::string &archiveDir,
const std::list<ArchiveToFileRequest> &archiveToFileRequests,
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime):
ArchiveRequest(priority, user, creationTime),
const CreationLog & creationLog):
ArchiveRequest(priority, creationLog),
m_archiveDir(archiveDir),
m_archiveToFileRequests(archiveToFileRequests) {
}
......
......@@ -60,8 +60,7 @@ public:
const std::string &archiveDir,
const std::list<ArchiveToFileRequest> &archiveToFileRequests,
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime = time(NULL));
const CreationLog & creationLog);
/**
* Returns the full path of the destination archive directory.
......
......@@ -38,9 +38,8 @@ cta::ArchiveToFileRequest::ArchiveToFileRequest(
const std::string &archiveFile,
const std::map<uint16_t, std::string> &copyNbToPoolMap,
const uint64_t priority,
const SecurityIdentity &requester,
const time_t creationTime):
ArchiveRequest(priority, requester, creationTime),
const CreationLog & creationLog):
ArchiveRequest(priority, creationLog),
m_remoteFile(remoteFile),
m_archiveFile(archiveFile),
m_copyNbToPoolMap(copyNbToPoolMap) {
......
......@@ -61,8 +61,7 @@ public:
const std::string &archiveFile,
const std::map<uint16_t, std::string> &copyNbToPoolMap,
const uint64_t priority,
const SecurityIdentity &requester,
const time_t creationTime = time(NULL));
const CreationLog & creationLog);
/**
* Returns the URL of the source remote file to be archived.
......
......@@ -39,9 +39,8 @@ cta::ArchiveToTapeCopyRequest::ArchiveToTapeCopyRequest(
const uint16_t copyNb,
const std::string tapePoolName,
const uint64_t priority,
const SecurityIdentity &requester,
const time_t creationTime):
ArchiveRequest(priority, requester, creationTime),
const CreationLog &creationLog):
ArchiveRequest(priority, creationLog),
m_remoteFile(remoteFile),
m_archiveFile(archiveFile),
m_copyNb(copyNb),
......
......@@ -58,9 +58,8 @@ public:
const std::string &archiveFile,
const uint16_t copyNb,
const std::string tapePoolName,
const uint64_t priority,
const SecurityIdentity &requester,
const time_t creationTime = time(NULL));
const uint64_t priority,
const CreationLog & creationLog);
/**
* Returns the URL of the source remote file to be archived.
......
......@@ -218,7 +218,7 @@ void cta::MockSchedulerDatabase::queue(const ArchiveToFileRequest &rqst) {
copyNb,
tapePoolName,
rqst.getPriority(),
rqst.getRequester()));
rqst.getCreationLog()));
}
}
......@@ -227,15 +227,16 @@ void cta::MockSchedulerDatabase::queue(const ArchiveToFileRequest &rqst) {
//------------------------------------------------------------------------------
void cta::MockSchedulerDatabase::queue(const ArchiveToTapeCopyRequest &rqst) {
char *zErrMsg = 0;
const SecurityIdentity &requester = rqst.getRequester();
const CreationLog &log = rqst.getCreationLog();
std::ostringstream query;
query << "INSERT INTO ARCHIVETOTAPECOPYREQUEST(STATE, REMOTEFILE,"
" ARCHIVEFILE, TAPEPOOL, COPYNB, PRIORITY, UID, GID, CREATIONTIME) VALUES("
" ARCHIVEFILE, TAPEPOOL, COPYNB, PRIORITY, UID, GID, HOST, CREATIONTIME) VALUES("
<< "'PENDING_NS_CREATION','" << rqst.getRemoteFile() << "','" <<
rqst.getArchiveFile() << "','" << rqst.getTapePoolName() << "'," <<
rqst.getCopyNb() << "," << rqst.getPriority() << "," <<
requester.getUser().uid << "," << requester.getUser().gid << ","
<< (int)time(NULL) << ");";
log.user.uid << "," << log.user.gid << ","
<< " '" << log.host << "', "
<< (int)log.time << ");";
if(SQLITE_OK != sqlite3_exec(m_dbHandle, query.str().c_str(), 0, 0,
&zErrMsg)) {
std::ostringstream msg;
......@@ -260,7 +261,7 @@ std::map<cta::TapePool, std::list<cta::ArchiveToTapeCopyRequest> >
std::ostringstream query;
std::map<TapePool, std::list<ArchiveToTapeCopyRequest> > rqsts;
query << "SELECT STATE, REMOTEFILE, ARCHIVEFILE, TAPEPOOL, COPYNB,"
" PRIORITY, UID, GID, CREATIONTIME FROM ARCHIVETOTAPECOPYREQUEST"
" PRIORITY, UID, GID, HOST, CREATIONTIME FROM ARCHIVETOTAPECOPYREQUEST"
" ORDER BY ARCHIVEFILE;";
sqlite3_stmt *s = NULL;
const int rc = sqlite3_prepare(m_dbHandle, query.str().c_str(), -1, &s, 0 );
......@@ -290,18 +291,18 @@ std::map<cta::TapePool, std::list<cta::ArchiveToTapeCopyRequest> >
const uint64_t priority = sqlite3_column_int(statement.get(),
idx("PRIORITY"));
const UserIdentity requester(requesterUid, requesterGid);
const std::string requesterHost = "requester_host";
const SecurityIdentity requesterAndHost(requester, requesterHost);
const std::string requesterHost =
(char *)sqlite3_column_text(statement.get(),idx("HOST"));
const time_t creationTime = sqlite3_column_int(statement.get(),
idx("CREATIONTIME"));
const CreationLog log(requester, requesterHost, creationTime, "");
rqsts[tapePool].push_back(ArchiveToTapeCopyRequest(
remoteFile,
archiveFile,
copyNb,
tapePoolName,
priority,
requesterAndHost,
creationTime
log
));
}
return rqsts;
......@@ -366,7 +367,7 @@ std::list<cta::ArchiveToTapeCopyRequest> cta::MockSchedulerDatabase::
std::ostringstream query;
std::list<ArchiveToTapeCopyRequest> rqsts;
query << "SELECT STATE, REMOTEFILE, ARCHIVEFILE, TAPEPOOL, COPYNB,"
" PRIORITY, UID, GID, CREATIONTIME FROM ARCHIVETOTAPECOPYREQUEST"
" PRIORITY, UID, GID, HOST, CREATIONTIME FROM ARCHIVETOTAPECOPYREQUEST"
" WHERE TAPEPOOL='" << tapePoolName << "' ORDER BY ARCHIVEFILE;";
sqlite3_stmt *s = NULL;
const int rc = sqlite3_prepare(m_dbHandle, query.str().c_str(), -1, &s, 0 );
......@@ -396,18 +397,18 @@ std::list<cta::ArchiveToTapeCopyRequest> cta::MockSchedulerDatabase::
const uint64_t priority = sqlite3_column_int(statement.get(),
idx("PRIORITY"));
const UserIdentity requester(requesterUid, requesterGid);
const std::string requesterHost = "requester_host";
const SecurityIdentity requesterAndHost(requester, requesterHost);
const std::string requesterHost =
(char *)sqlite3_column_text(statement.get(),idx("HOST"));
const time_t creationTime = sqlite3_column_int(statement.get(),
idx("CREATIONTIME"));
const CreationLog log(requester, requesterHost, creationTime, "");
rqsts.push_back(ArchiveToTapeCopyRequest(
remoteFile,
archiveFile,
copyNb,
tapePoolName,
priority,
requesterAndHost,
creationTime
log
));
}
return rqsts;
......
......@@ -21,10 +21,13 @@
#include "objectstore/RootEntry.hpp"
#include "objectstore/TapePool.hpp"
#include "objectstore/Tape.hpp"
#include "objectstore/ArchiveToFileRequest.hpp"
#include "common/exception/Exception.hpp"
#include "scheduler/AdminHost.hpp"
#include "scheduler/AdminUser.hpp"
#include "scheduler/ArchivalRoute.hpp"
#include "scheduler/ArchiveRequest.hpp"
#include "scheduler/ArchiveToFileRequest.hpp"
#include "scheduler/LogicalLibrary.hpp"
#include "scheduler/StorageClass.hpp"
#include "scheduler/TapePool.hpp"
......@@ -390,8 +393,40 @@ void OStoreDB::deleteLogicalLibrary(const SecurityIdentity& requester,
re.commit();
}
void OStoreDB::queue(const ArchiveToFileRequest& rqst) {
void OStoreDB::queue(const cta::ArchiveToFileRequest& rqst) {
throw exception::Exception("Not Implemented");
// In order to post the job, construct it first.
objectstore::ArchiveToFileRequest atfr(m_agent->nextId("ArchiveToFileRequest"), m_objectStore);
atfr.initialize();
atfr.setArchiveFile(rqst.getArchiveFile());
atfr.setRemoteFile(rqst.getRemoteFile());
atfr.setPriority(rqst.getPriority());
atfr.setLog(rqst.getCreationLog());
// We will need to identity tapepools is order to construct the request
RootEntry re(m_objectStore);
ScopedSharedLock rel(re);
re.fetch();
auto & cl = rqst.getCopyNbToPoolMap();
for (auto copy=cl.begin(); copy != cl.end(); copy++) {
std::string tpaddr = re.getTapePoolAddress(copy->second);
atfr.addJob(copy->first, copy->second, tpaddr);
}
auto jl = atfr.dumpJobs();
if (!jl.size()) {
throw ArchiveRequestHasNoCopies("In OStoreDB::queue: the archive to file request has no copy");
}
// We successfully prepared the object. Time to create it and plug it to
// the tree.
atfr.setOwner(m_agent->getAddressIfSet());
atfr.insert();
ScopedExclusiveLock atfrl(atfr);
// We can now plug the request onto its tape pools
for (auto j=jl.begin(); j!=jl.end(); j++) {
objectstore::TapePool tp(j->tapePoolAddress, m_objectStore);
ScopedExclusiveLock tpl(tp);
tp.fetch();
//tp.addJob(j);
}
}
void OStoreDB::queue(const ArchiveToDirRequest& rqst) {
......
......@@ -121,6 +121,7 @@ public:
virtual void deleteLogicalLibrary(const SecurityIdentity& requester, const std::string& name);
/* === Archival requests handling ======================================== */
CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestHasNoCopies);
virtual void queue(const ArchiveToFileRequest& rqst);
virtual void queue(const ArchiveToDirRequest& rqst);
......
......@@ -40,9 +40,8 @@ cta::RetrieveFromTapeCopyRequest::RetrieveFromTapeCopyRequest(
const TapeCopyLocation &tapeCopy,
const std::string &remoteFile,
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime):
RetrieveRequest(priority, user, creationTime),
const CreationLog & creationLog):
RetrieveRequest(priority, creationLog),
m_archiveFile(archiveFile),
m_copyNb(copyNb),
m_tapeCopy(tapeCopy),
......
......@@ -61,8 +61,7 @@ public:
const TapeCopyLocation &tapeCopy,
const std::string &remoteFile,
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime = time(NULL));
const CreationLog &creationLog);
/**
* Returns the full path of the source archive file.
......
......@@ -35,7 +35,6 @@ cta::RetrieveRequest::~RetrieveRequest() throw() {
//------------------------------------------------------------------------------
cta::RetrieveRequest::RetrieveRequest(
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime):
UserRequest(priority, user, creationTime) {
const CreationLog & creationLog):
UserRequest(priority, creationLog) {
}
......@@ -50,8 +50,7 @@ public:
*/
RetrieveRequest(
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime = time(NULL));
const CreationLog &creationLog);
}; // class RetrieveRequest
......
......@@ -36,9 +36,8 @@ cta::RetrieveToDirRequest::~RetrieveToDirRequest() throw() {
cta::RetrieveToDirRequest::RetrieveToDirRequest(
const std::string &remoteDir,
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime):
RetrieveRequest(priority, user, creationTime),
const CreationLog & creationLog):
RetrieveRequest(priority, creationLog),
m_remoteDir(remoteDir) {
}
......
......@@ -55,8 +55,7 @@ public:
RetrieveToDirRequest(
const std::string &remoteDir,
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime = time(NULL));
const CreationLog & creationLog);
/**
* Returns the URL of the destination remote directory.
......
......@@ -39,9 +39,8 @@ cta::RetrieveToFileRequest::RetrieveToFileRequest(
const std::list<cta::TapeCopyLocation> &tapeCopies,
const std::string &remoteFile,
const uint64_t priority,
const SecurityIdentity &user,
const time_t creationTime):
RetrieveRequest(priority, user, creationTime),
const CreationLog & creationLog):
RetrieveRequest(priority, creationLog),
m_archiveFile(archiveFile),
m_tapeCopies(tapeCopies),
m_remoteFile(remoteFile) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment