Commit 6a577e60 authored by Eric Cano's avatar Eric Cano
Browse files

Implemented OStoreDB::RetrieveMount::getNextJob()

parent cfd65df2
......@@ -49,15 +49,7 @@ public:
uint16_t totalFailures;
};
FailuresCount addJobFailure(uint16_t copyNumber, uint64_t sessionId);
// Duplication of the protbuff statuses
enum class JobStatus {
AJS_LinkingToTapePool = 0,
AJS_Pending = 1,
AJS_Selected = 2,
AJS_Complete = 3,
AJS_Failed = 99
};
JobStatus getJobStatus(uint16_t copyNumber);
serializers::ArchiveJobStatus getJobStatus(uint16_t copyNumber);
// Handling of the consequences of a job status change for the entire request.
// This function returns true if the request got finished.
bool finishIfNecessary();
......
......@@ -80,6 +80,11 @@ void cta::objectstore::RetrieveToFileRequest::setPriority(uint64_t priority) {
m_payload.set_priority(priority);
}
uint64_t cta::objectstore::RetrieveToFileRequest::getPriority() {
checkPayloadReadable();
return m_payload.priority();
}
void cta::objectstore::RetrieveToFileRequest::setCreationLog(
const objectstore::CreationLog& creationLog) {
checkPayloadWritable();
......@@ -120,4 +125,24 @@ uint64_t cta::objectstore::RetrieveToFileRequest::getSize() {
void cta::objectstore::RetrieveToFileRequest::setSize(uint64_t size) {
checkPayloadWritable();
m_payload.set_size(size);
}
\ No newline at end of file
}
auto cta::objectstore::RetrieveToFileRequest::getJob(uint16_t copyNb) -> JobDump {
checkPayloadReadable();
// find the job
auto & jl = m_payload.jobs();
for (auto j=jl.begin(); j!=jl.end(); j++) {
if (j->copynb() == copyNb) {
JobDump ret;
ret.blockid = j->blockid();
ret.copyNb = j->copynb();
ret.fseq = j->fseq();
ret.tape = j->tape();
ret.tapeAddress = j->tapeaddress();
return ret;
}
}
throw NoSuchJob("In objectstore::RetrieveToFileRequest::getJob(): job not found for this copyNb");
}
......@@ -35,26 +35,53 @@ public:
RetrieveToFileRequest(const std::string & address, Backend & os);
RetrieveToFileRequest(GenericObject & go);
void initialize();
// Job management ============================================================
void addJob(const cta::TapeFileLocation & tapeFileLocation,
const std::string & tapeaddress);
void setJobFailureLimits(uint16_t copyNumber,
uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries);
void setJobSelected(uint16_t copyNumber, const std::string & owner);
void setJobPending(uint16_t copyNumber);
class JobDump {
public:
uint16_t copyNb;
std::string tape;
std::string tapeAddress;
uint64_t fseq;
uint64_t blockid;
};
JobDump getJob(uint16_t copyNb);
struct FailuresCount {
uint16_t failuresWithinMount;
uint16_t totalFailures;
};
FailuresCount addJobFailure(uint16_t copyNumber, uint64_t sessionId);
serializers::RetrieveJobStatus getJobStatus(uint16_t copyNumber);
// Handling of the consequences of a job status. This is simpler that archival
// as one finish is enough.
void finish();
// Mark all jobs as pending mount (following their linking to a tape pool)
void setAllJobsLinkingToTapePool();
// Mark all the jobs as being deleted, in case of a cancellation
void setAllJobsFailed();
// Mark all the jobs as pending deletion from NS.
void setAllJobsPendingNSdeletion();
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
// Request management ========================================================
void setSuccessful();
void setFailed();
// ===========================================================================
void setArchiveFile(const std::string & archiveFile);
std::string getArchiveFile();
void setRemoteFile (const std::string & remoteFile);
std::string getRemoteFile();
void setPriority (uint64_t priority);
uint64_t getPriority();
void setCreationLog (const objectstore::CreationLog& creationLog);
CreationLog getCreationLog();
void setRetrieveToDirRequestAddress(const std::string & dirRequestAddress);
void setSize(uint64_t size);
uint64_t getSize();
class JobDump {
public:
uint16_t copyNb;
std::string tape;
std::string tapeAddress;
uint64_t fseq;
uint64_t blockid;
};
std::list<JobDump> dumpJobs();
};
......
......@@ -207,6 +207,20 @@ auto cta::objectstore::Tape::dumpAndFetchRetrieveRequests()
} catch (cta::exception::Exception &) {}
}
return ret;
}
auto cta::objectstore::Tape::dumpJobs() -> std::list<JobDump> {
checkPayloadReadable();
std::list<JobDump> ret;
auto & rjl = m_payload.retrievejobs();
for (auto rj=rjl.begin(); rj!=rjl.end(); rj++) {
ret.push_back(JobDump());
auto & b=ret.back();
b.copyNb = rj->copynb();
b.address = rj->address();
b.size = rj->size();
}
return ret;
}
......@@ -280,6 +294,30 @@ void cta::objectstore::Tape::releaseBusy() {
m_payload.set_busy(false);
}
void cta::objectstore::Tape::removeJob(const std::string& retriveToFileAddress) {
checkPayloadWritable();
auto * jl = m_payload.mutable_retrievejobs();
bool found=false;
do {
found=false;
found = false;
// Push the found entry all the way to the end.
for (size_t i=0; i<(size_t)jl->size(); i++) {
if (jl->Get(i).address() == retriveToFileAddress) {
found = true;
while (i+1 < (size_t)jl->size()) {
jl->SwapElements(i, i+1);
i++;
}
break;
}
}
// and remove it
if (found)
jl->RemoveLast();
} while (found);
}
......
......@@ -78,6 +78,14 @@ public:
};
JobsSummary getJobsSummary();
std::list<RetrieveRequestDump> dumpAndFetchRetrieveRequests();
struct JobDump {
std::string address;
uint16_t copyNb;
uint64_t size;
};
std::list<JobDump> dumpJobs();
void removeJob(const std::string & retriveToFileAddress);
// -- Stored data counting ---------------------------------------------------
uint64_t getStoredData();
......
......@@ -91,7 +91,7 @@ std::unique_ptr<cta::ArchiveJob> cta::ArchiveMount::getNextJob() {
std::unique_ptr<cta::SchedulerDatabase::ArchiveJob> dbJob(m_dbMount->getNextJob().release());
if (!dbJob.get())
return std::unique_ptr<cta::ArchiveJob>(NULL);
// We have something to migrate: prepare the response
// We have something to archive: prepare the response
std::unique_ptr<cta::ArchiveJob> ret(new ArchiveJob(*this, m_ns,
dbJob->archiveFile, dbJob->remoteFile, dbJob->nameServerTapeFile));
ret->m_dbJob.reset(dbJob.release());
......
......@@ -1351,9 +1351,9 @@ auto OStoreDB::ArchiveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::
while (tp.dumpJobs().size()) {
// Get the tape pool's jobs list, and pop the first
auto jl = tp.dumpJobs();
// First take a lock on a download the job
// First take a lock on and download the job
// If the request is not around anymore, we will just move the the next
// prepare the return value
// Prepare the return value
std::unique_ptr<OStoreDB::ArchiveJob> privateRet(new OStoreDB::ArchiveJob(
jl.front().address, m_objectStore, m_agent));
privateRet->m_copyNb = jl.front().copyNb;
......@@ -1439,12 +1439,90 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo(
return mountInfo;
}
auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<RetrieveJob> {
throw NotImplemented("In OStoreDB::RetrieveMount::getNextJob: not implemented");
auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<SchedulerDatabase::RetrieveJob> {
// Find the next file to retrieve
// Get the tape pool and then tape
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
auto tpl = re.dumpTapePools();
rel.release();
std::string tpAddress;
for (auto tpp = tpl.begin(); tpp != tpl.end(); tpp++) {
if (tpp->tapePool == mountInfo.tapePool)
tpAddress = tpp->address;
}
if (!tpAddress.size())
throw NoSuchTapePool("In OStoreDB::RetrieveMount::getNextJob(): tape pool not found");
objectstore::TapePool tp(tpAddress, m_objectStore);
objectstore::ScopedSharedLock tplock(tp);
tp.fetch();
auto tl = tp.dumpTapes();
tplock.release();
std::string tAddress;
for (auto tptr = tl.begin(); tptr != tl.end(); tptr++) {
if (tptr->vid == mountInfo.vid)
tAddress = tptr->address;
}
if (!tAddress.size())
throw NoSuchTape("In OStoreDB::RetrieveMount::getNextJob(): tape not found");
objectstore::Tape t(tAddress, m_objectStore);
objectstore::ScopedExclusiveLock tlock(t);
t.fetch();
while (t.dumpJobs().size()) {
// Get the tape pool's jobs list, and pop the first
auto jl=t.dumpJobs();
// First take a lock on and download the job
// If the request is not around anymore, we will just move the the next
// Prepare the return value
std::unique_ptr<OStoreDB::RetrieveJob> privateRet(new OStoreDB::RetrieveJob(
jl.front().address, m_objectStore, m_agent));
privateRet->m_copyNb = jl.front().copyNb;
objectstore::ScopedExclusiveLock rtfrl;
try {
rtfrl.lock(privateRet->m_rtfr);
privateRet->m_rtfr.fetch();
} catch (cta::exception::Exception &) {
// we failed to access the object. It might be missing.
// Just pop this job from the pool and move to the next.
t.removeJob(privateRet->m_rtfr.getAddressIfSet());
// Commit in case we do not pass by again.
t.commit();
continue;
}
// Take ownership of the job
// Add to ownership
objectstore::ScopedExclusiveLock al(m_agent);
m_agent.fetch();
m_agent.addToOwnership(privateRet->m_rtfr.getAddressIfSet());
m_agent.commit();
al.release();
// Make the ownership official (for the whole request in retrieves)
privateRet->m_rtfr.setOwner(m_agent.getAddressIfSet());
privateRet->m_rtfr.commit();
// Remove the job from the tape pool queue
t.removeJob(privateRet->m_rtfr.getAddressIfSet());
// We can commit and release the tape pool lock, we will only fill up
// memory structure from here on.
t.commit();
privateRet->archiveFile.path = privateRet->m_rtfr.getArchiveFile();
privateRet->remoteFile = privateRet->m_rtfr.getRemoteFile();
objectstore::RetrieveToFileRequest::JobDump jobDump=
privateRet->m_rtfr.getJob(privateRet->m_copyNb);
privateRet->nameServerTapeFile.tapeFileLocation.fSeq = jobDump.fseq;
privateRet->nameServerTapeFile.tapeFileLocation.blockId = jobDump.blockid;
privateRet->nameServerTapeFile.tapeFileLocation.copyNb = privateRet->m_copyNb;
privateRet->nameServerTapeFile.tapeFileLocation.vid = mountInfo.vid;
std::numeric_limits<decltype(privateRet->nameServerTapeFile.tapeFileLocation.blockId)>::max();
privateRet->m_jobOwned = true;
return std::unique_ptr<SchedulerDatabase::RetrieveJob> (privateRet.release());
}
return std::unique_ptr<SchedulerDatabase::RetrieveJob> (NULL);
}
void OStoreDB::RetrieveMount::complete(time_t completionTime) {
throw NotImplemented("In OStoreDB::RetrieveMount::getNextJob: not implemented");
throw NotImplemented("In OStoreDB::RetrieveMount::complete: not implemented");
}
......@@ -1529,5 +1607,88 @@ OStoreDB::ArchiveJob::~ArchiveJob() {
}
}
OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress,
objectstore::Backend& os, objectstore::Agent& ag): m_jobOwned(false),
m_objectStore(os), m_agent(ag), m_rtfr(jobAddress, os) { }
void OStoreDB::RetrieveJob::fail() {
throw NotImplemented("");
}
OStoreDB::RetrieveJob::~RetrieveJob() {
if (m_jobOwned) {
// Re-queue the job entirely if we failed to handle it.
try {
// We now need to select the tape from which we will migrate next. This should
// be the tape with the most jobs already queued.
// TODO: this will have to look at tape statuses on the long run as well
uint16_t selectedCopyNumber;
uint64_t bestTapeQueuedBytes;
std::string selectedVid;
std::string selectedTapeAddress;
objectstore::ScopedExclusiveLock rtfrl(m_rtfr);
m_rtfr.fetch();
auto jl=m_rtfr.dumpJobs();
{
// First tape copy is always better than nothing.
auto tc=jl.begin();
selectedCopyNumber = tc->copyNb;
selectedVid = tc->tape;
selectedTapeAddress = tc->tapeAddress;
// Get info for the tape.
{
objectstore::Tape t(tc->tapeAddress, m_objectStore);
objectstore::ScopedSharedLock tl(t);
t.fetch();
bestTapeQueuedBytes = t.getJobsSummary().bytes;
}
tc++;
// Compare with the next ones
for (;tc!=jl.end(); tc++) {
objectstore::Tape t(tc->tapeAddress, m_objectStore);
objectstore::ScopedSharedLock tl(t);
t.fetch();
if (t.getJobsSummary().bytes > bestTapeQueuedBytes) {
bestTapeQueuedBytes = t.getJobsSummary().bytes;
selectedCopyNumber = tc->copyNb;
selectedVid = tc->tape;
selectedTapeAddress = tc->tapeAddress;
}
}
}
// We now can enqueue the request on this most promising tape.
{
objectstore::Tape tp(selectedTapeAddress, m_objectStore);
ScopedExclusiveLock tpl(tp);
tp.fetch();
objectstore::RetrieveToFileRequest::JobDump jd;
jd.copyNb = selectedCopyNumber;
jd.tape = selectedVid;
jd.tapeAddress = selectedTapeAddress;
tp.addJob(jd, m_rtfr.getAddressIfSet(), m_rtfr.getSize(), m_rtfr.getPriority(), m_rtfr.getCreationLog().time);
tp.commit();
}
// The request is now fully set. It belongs to the tape.
std::string previousOwner = m_rtfr.getOwner();
m_rtfr.setOwner(selectedTapeAddress);
m_rtfr.commit();
// And remove reference from the agent (if it was owned by an agent)
try {
if (!previousOwner.size())
return;
objectstore::Agent agent(previousOwner, m_objectStore);
objectstore::ScopedExclusiveLock al(agent);
agent.fetch();
agent.removeFromOwnership(m_rtfr.getAddressIfSet());
agent.commit();
} catch (...) {}
} catch (...) {}
}
}
void OStoreDB::RetrieveJob::succeed() {
throw NotImplemented("");
}
}
......@@ -21,6 +21,7 @@
#include "scheduler/SchedulerDatabase.hpp"
#include "objectstore/Agent.hpp"
#include "objectstore/ArchiveToFileRequest.hpp"
#include "objectstore/RetrieveToFileRequest.hpp"
#include "objectstore/SchedulerGlobalLock.hpp"
namespace cta {
......@@ -118,6 +119,26 @@ public:
virtual std::unique_ptr<RetrieveJob> getNextJob();
virtual void complete(time_t completionTime);
};
/* === Retrieve Job handling ============================================== */
class RetrieveJob: public SchedulerDatabase::RetrieveJob {
friend class RetrieveMount;
public:
CTA_GENERATE_EXCEPTION_CLASS(JobNowOwned);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchJob);
virtual void succeed();
virtual void fail();
virtual ~RetrieveJob();
private:
RetrieveJob(const std::string &, objectstore::Backend &, objectstore::Agent &);
bool m_jobOwned;
uint16_t m_copyNb;
objectstore::Backend & m_objectStore;
objectstore::Agent & m_agent;
objectstore::RetrieveToFileRequest m_rtfr;
std::map<std::string, std::string> m_vidToAddress; /**< Cache of tape objects
* addresses filled up at queuing time */
};
/* === Admin host handling ================================================ */
virtual void createAdminHost(const std::string& hostName,
......
......@@ -27,15 +27,15 @@ cta::RetrieveJob::~RetrieveJob() throw() {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::RetrieveJob::RetrieveJob(/*RetrieveMount &mount,*/
cta::RetrieveJob::RetrieveJob(RetrieveMount &mount,
const ArchiveFile &archiveFile,
const RemotePathAndStatus &remotePathAndStatus,
const TapeFileLocation &tapeFileLocation,
const std::string &remotePath,
const NameServerTapeFile &nameServerTapeFile,
const PositioningMethod positioningMethod):
/*mount(mount),*/
m_mount(mount),
archiveFile(archiveFile),
remotePathAndStatus(remotePathAndStatus),
tapeFileLocation(tapeFileLocation),
remotePath(remotePath),
nameServerTapeFile(nameServerTapeFile),
positioningMethod(positioningMethod),
transferredSize(std::numeric_limits<decltype(transferredSize)>::max()) {}
......
......@@ -23,9 +23,11 @@
#include "common/exception/Exception.hpp"
#include "common/remoteFS/RemotePathAndStatus.hpp"
#include "scheduler/PositioningMethod.hpp"
#include "scheduler/SchedulerDatabase.hpp"
#include <string>
#include <limits>
#include <memory>
namespace cta {
......@@ -41,14 +43,8 @@ class RetrieveJob {
* constructor of RetrieveJob.
*/
friend class RetrieveMount;
protected:
/**
* Empty constructor. TODO: to be removed in the future when we put in the reference to the owning mount;
*/
RetrieveJob(): transferredSize(std::numeric_limits<decltype(transferredSize)>::max()) {}
public:
/**
* Constructor. It is not public as it is generated by the RetrieveMount.
*
......@@ -58,18 +54,26 @@ protected:
* @param tapeFileLocation the location of the tape file
* @param positioningMethod the positioning method
*/
RetrieveJob(/*RetrieveMount &mount,*/
RetrieveJob(RetrieveMount &mount,
const ArchiveFile &archiveFile,
const RemotePathAndStatus &remotePathAndStatus,
const TapeFileLocation &tapeFileLocation,
const std::string &remotePath,
const NameServerTapeFile &tapeFileLocation,
const PositioningMethod positioningMethod);
private:
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> m_dbJob;
/**
* The mount that generated this job
*/
RetrieveMount &m_mount;
public:
/**
* Destructor.
*/
virtual ~RetrieveJob() throw() = 0;
virtual ~RetrieveJob() throw();
/**
* Indicates that the job was successful. The checksum and the size of the
......@@ -102,14 +106,14 @@ public:
ArchiveFile archiveFile;
/**
* The remote file information
* The remote file path
*/
RemotePathAndStatus remotePathAndStatus;
std::string remotePath;
/**
* The location of the tape file
*/
TapeFileLocation tapeFileLocation;
NameServerTapeFile nameServerTapeFile;
/**
* The positioning method
......
......@@ -21,14 +21,15 @@
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::RetrieveMount::RetrieveMount() {
}
cta::RetrieveMount::RetrieveMount():
m_sessionRunning(false) {}
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::RetrieveMount::RetrieveMount(
std::unique_ptr<SchedulerDatabase::RetrieveMount> dbMount) {
std::unique_ptr<SchedulerDatabase::RetrieveMount> dbMount):
m_sessionRunning(false) {
m_dbMount.reset(dbMount.release());
}
......@@ -64,7 +65,18 @@ std::string cta::RetrieveMount::getMountTransactionId() const throw(){
// getNextJob
//------------------------------------------------------------------------------
std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob() {
throw NotImplemented(std::string(__FUNCTION__) + ": Not implemented");
if (!m_sessionRunning)
throw SessionNotRunning("In RetrieveMount::getNextJob(): trying to get job from complete/not started session");
// Try and get a new job from the DB
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> dbJob(m_dbMount->getNextJob().release());
if (!dbJob.get())
return std::unique_ptr<cta::RetrieveJob>(NULL);
// We have something to retrieve: prepare the response
std::unique_ptr<cta::RetrieveJob> ret (new RetrieveJob(*this,
dbJob->archiveFile, dbJob->remoteFile, dbJob->nameServerTapeFile,
PositioningMethod::ByBlock));
ret->m_dbJob.reset(dbJob.release());
return ret;
}
//------------------------------------------------------------------------------
......
......@@ -36,12 +36,11 @@ namespace cta {
class RetrieveMount: public TapeMount {
friend class Scheduler;
protected:
/**
* Constructor.
* Trivial constructor
*/
RetrieveMount();
/**
* Constructor.
*
......@@ -92,6 +91,7 @@ namespace cta {
*/
virtual void complete();
CTA_GENERATE_EXCEPTION_CLASS(SessionNotRunning);
/**
* Job factory
*
......@@ -111,6 +111,11 @@ namespace cta {
* The database representation of this mount.
*/
std::unique_ptr<cta::SchedulerDatabase::RetrieveMount> m_dbMount;
/**
* Internal tracking of the session completion
*/
bool m_sessionRunning;
}; // class RetrieveMount
......
......@@ -894,14 +894,15 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(
for (auto t=tapesList.begin(); t!=tapesList.end(); t++) {
if (t->vid == m->vid && t->status.availableToRead()) {
try {
std::unique_ptr<RetrieveMount> internalRet (new RetrieveMount());