Commit 7f9bc693 authored by Michael Davis's avatar Michael Davis
Browse files

[os-generic-queues] Changes RetrieveMount reference to a pointer

Because ToReport queues need to construct RetrieveJob objects which are
not associated with a mount
parent 55df6ca9
......@@ -1042,16 +1042,14 @@ getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logCo
criteria.files = filesRequested;
auto jobs = rqtrAlgo.popNextBatch(queueList.front().vid, objectstore::QueueType::JobsToReport, criteria, logContext);
if(jobs.elements.empty()) continue;
#if 0
for(auto &j : jobs.elements)
{
std::unique_ptr<OStoreDB::RetrieveJob> rj(new OStoreDB::RetrieveJob(j.retrieveRequest->getAddressIfSet(), *this));
std::unique_ptr<OStoreDB::RetrieveJob> rj(new OStoreDB::RetrieveJob(j.retrieveRequest->getAddressIfSet(), *this, nullptr));
rj->archiveFile = j.archiveFile;
rj->retrieveRequest = j.rr;
rj->selectedCopyNb = j.copyNb;
ret.emplace_back(std::move(rj));
}
#endif
return ret;
}
}
......@@ -1863,7 +1861,7 @@ getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContex
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> ret;
for(auto &j : jobs.elements)
{
std::unique_ptr<OStoreDB::RetrieveJob> rj(new OStoreDB::RetrieveJob(j.retrieveRequest->getAddressIfSet(), m_oStoreDB, *this));
std::unique_ptr<OStoreDB::RetrieveJob> rj(new OStoreDB::RetrieveJob(j.retrieveRequest->getAddressIfSet(), m_oStoreDB, this));
rj->archiveFile = j.archiveFile;
rj->retrieveRequest = j.rr;
rj->selectedCopyNb = j.copyNb;
......@@ -2356,12 +2354,6 @@ OStoreDB::ArchiveJob::~ArchiveJob() {
}
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::RetrieveJob()
//------------------------------------------------------------------------------
OStoreDB::RetrieveJob::RetrieveJob(const std::string& jobAddress, OStoreDB & oStoreDB, OStoreDB::RetrieveMount& rm):
m_jobOwned(false), m_oStoreDB(oStoreDB), m_retrieveRequest(jobAddress, m_oStoreDB.m_objectStore), m_retrieveMount(rm) { }
#if 0
//------------------------------------------------------------------------------
// OStoreDB::RetrieveJob::fail()
......
......@@ -220,13 +220,20 @@ public:
void failTransfer(const std::string& failureReason, log::LogContext& lc) override;
void failReport(const std::string& failureReason, log::LogContext& lc) override;
virtual ~RetrieveJob() override;
//private:
// This can't be private any more as it has to be instantiated for queues to report as well as queues
// to transfer, i.e. it must be possible to instantiate retrieve jobs independent from a mount
RetrieveJob(const std::string &jobAddress, OStoreDB &oStoreDB, RetrieveMount *rm) :
m_jobOwned(false), m_oStoreDB(oStoreDB),
m_retrieveRequest(jobAddress, m_oStoreDB.m_objectStore),
m_retrieveMount(rm) { }
private:
RetrieveJob(const std::string &, OStoreDB &, RetrieveMount &);
bool m_jobOwned;
uint64_t m_mountId;
OStoreDB & m_oStoreDB;
objectstore::RetrieveRequest m_retrieveRequest;
OStoreDB::RetrieveMount &m_retrieveMount;
OStoreDB::RetrieveMount *m_retrieveMount;
std::unique_ptr<objectstore::RetrieveRequest::AsyncJobDeleter> m_jobDelete;
};
......
......@@ -32,7 +32,7 @@ cta::RetrieveJob::~RetrieveJob() throw() {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::RetrieveJob::RetrieveJob(RetrieveMount &mount,
cta::RetrieveJob::RetrieveJob(RetrieveMount *mount,
const common::dataStructures::RetrieveRequest &retrieveRequest,
const common::dataStructures::ArchiveFile & archiveFile,
const uint64_t selectedCopyNb,
......
......@@ -41,6 +41,7 @@ class RetrieveJob {
* constructor of RetrieveJob.
*/
friend class RetrieveMount;
friend class Scheduler;
public:
/**
......@@ -52,7 +53,7 @@ public:
* @param tapeFileLocation the location of the tape file
* @param positioningMethod the positioning method
*/
RetrieveJob(RetrieveMount &mount,
RetrieveJob(RetrieveMount *mount,
const common::dataStructures::RetrieveRequest &retrieveRequest,
const common::dataStructures::ArchiveFile & archiveFile,
const uint64_t selectedCopyNb,
......@@ -64,7 +65,7 @@ private:
/**
* The mount that generated this job
*/
RetrieveMount &m_mount;
RetrieveMount *m_mount;
public:
......
......@@ -79,7 +79,7 @@ std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatc
std::list<std::unique_ptr<RetrieveJob>> ret;
// We prepare the response
for (auto & sdrj: dbJobBatch) {
ret.emplace_back(new RetrieveJob(*this,
ret.emplace_back(new RetrieveJob(this,
sdrj->retrieveRequest, sdrj->archiveFile, sdrj->selectedCopyNb,
PositioningMethod::ByBlock));
ret.back()->m_dbJob.reset(sdrj.release());
......
......@@ -1079,6 +1079,24 @@ std::list<std::unique_ptr<ArchiveJob> > Scheduler::getNextArchiveJobsToReportBat
return ret;
}
//------------------------------------------------------------------------------
// getNextRetrieveJobsToReportBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<RetrieveJob>> Scheduler::
getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logContext)
{
// We need to go through the queues of retrieve jobs to report
std::list<std::unique_ptr<RetrieveJob>> ret;
// Get the list of jobs to report from the scheduler db
auto dbRet = m_db.getNextRetrieveJobsToReportBatch(filesRequested, logContext);
for (auto &j : dbRet) {
ret.emplace_back(new RetrieveJob(nullptr, j->retrieveRequest, j->archiveFile, j->selectedCopyNb, PositioningMethod::ByFSeq));
ret.back()->m_dbJob.reset(j.release());
}
return ret;
}
//------------------------------------------------------------------------------
// reportArchiveJobsBatch
//------------------------------------------------------------------------------
......
......@@ -61,6 +61,7 @@
namespace cta {
class ArchiveJob;
class RetrieveJob;
/**
* Class implementing a tape resource scheduler. This class is the main entry point
......@@ -333,6 +334,21 @@ public:
void reportArchiveJobsBatch(std::list<std::unique_ptr<ArchiveJob>> & archiveJobsBatch,
eos::DiskReporterFactory & reporterFactory, log::TimingList&, utils::Timer &, log::LogContext &);
/* ============================== Retrieve reporting support ============================== */
/*!
* Batch job factory
*
* @param filesRequested the number of files requested
* @param logContext
*
* @returns A list of unique_ptr to the next successful archive jobs to report. The list
* is empty when no more jobs can be found. Will return jobs (if available) up
* to specified number.
*/
std::list<std::unique_ptr<RetrieveJob>> getNextRetrieveJobsToReportBatch(uint64_t filesRequested,
log::LogContext &logContext);
public:
/*============== Administrator management ==================================*/
void authorizeAdmin(const cta::common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc);
......
......@@ -553,10 +553,6 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) {
retrieveJob->checkComplete();
jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(0, jobBatch.size());
//std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::
//getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logContext)
}
}
......@@ -762,6 +758,9 @@ TEST_P(SchedulerTest, archive_and_retrieve_failure) {
}
// Then the request should be gone
ASSERT_EQ(0, retrieveMount->getNextJobBatch(1,1,lc).size());
// and the failure should be reported on the jobs to report queue
auto retrieveJobToReportList = scheduler.getNextRetrieveJobsToReportBatch(1,lc);
ASSERT_EQ(1, retrieveJobToReportList.size());
}
}
......
......@@ -27,7 +27,7 @@ namespace cta {
public:
int completes;
int failures;
MockRetrieveJob(RetrieveMount & rm): cta::RetrieveJob(rm,
MockRetrieveJob(RetrieveMount & rm): cta::RetrieveJob(&rm,
cta::common::dataStructures::RetrieveRequest(),
cta::common::dataStructures::ArchiveFile(), 1,
cta::PositioningMethod::ByBlock), completes(0), failures(0) {
......
......@@ -58,7 +58,7 @@ namespace unitTests{
class TestingRetrieveJob: public cta::RetrieveJob {
public:
TestingRetrieveJob(cta::RetrieveMount & rm): cta::RetrieveJob(rm,
TestingRetrieveJob(cta::RetrieveMount & rm): cta::RetrieveJob(&rm,
cta::common::dataStructures::RetrieveRequest(),
cta::common::dataStructures::ArchiveFile(), 1,
cta::PositioningMethod::ByBlock) {}
......
......@@ -51,7 +51,7 @@ namespace unitTests{
class TestingRetrieveJob: public cta::RetrieveJob {
public:
TestingRetrieveJob(): cta::RetrieveJob(*((cta::RetrieveMount *)NULL),
TestingRetrieveJob(): cta::RetrieveJob(nullptr,
cta::common::dataStructures::RetrieveRequest(),
cta::common::dataStructures::ArchiveFile(), 1,
cta::PositioningMethod::ByBlock) {}
......
......@@ -56,7 +56,7 @@ int test = RAO_TEST;
class BasicRetrieveJob: public cta::RetrieveJob {
public:
BasicRetrieveJob() : cta::RetrieveJob(*((cta::RetrieveMount *)NULL),
BasicRetrieveJob() : cta::RetrieveJob(nullptr,
cta::common::dataStructures::RetrieveRequest(),
cta::common::dataStructures::ArchiveFile(), 1,
cta::PositioningMethod::ByBlock) {}
......
......@@ -40,7 +40,7 @@ namespace unitTests {
class TestingRetrieveJob: public cta::RetrieveJob {
public:
TestingRetrieveJob() : cta::RetrieveJob(*((cta::RetrieveMount *)NULL),
TestingRetrieveJob() : cta::RetrieveJob(nullptr,
cta::common::dataStructures::RetrieveRequest(),
cta::common::dataStructures::ArchiveFile(), 1,
cta::PositioningMethod::ByBlock) {}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment