Commit c4632479 authored by Eric Cano's avatar Eric Cano
Browse files

Used RetrieveMount::getNextJobBatch() instead of RetrieveMount::getNextJob().

Removed all instances of RetrieveMount::getNextJob() and siblings.
Fixed incomplete deserialization of archive file in RetrieveRequest::asyncUpdateOwner().
parent a770a044
......@@ -387,19 +387,9 @@ auto RetrieveRequest::asyncUpdateOwner(uint16_t copyNumber, const std::string& o
retRef.m_retieveRequest.dstURL = payload.schedulerrequest().dsturl();
retRef.m_retieveRequest.requester.name = payload.schedulerrequest().requester().name();
retRef.m_retieveRequest.requester.group = payload.schedulerrequest().requester().group();
retRef.m_archiveFile.archiveFileID = payload.archivefile().archivefileid();
retRef.m_archiveFile.checksumType = payload.archivefile().checksumtype();
retRef.m_archiveFile.checksumValue = payload.archivefile().checksumvalue();
retRef.m_archiveFile.creationTime = payload.archivefile().creationtime();
retRef.m_archiveFile.diskFileId = payload.archivefile().diskfileid();
retRef.m_archiveFile.diskFileInfo.group = payload.archivefile().diskfileinfo().group();
retRef.m_archiveFile.diskFileInfo.owner = payload.archivefile().diskfileinfo().owner();
retRef.m_archiveFile.diskFileInfo.path = payload.archivefile().diskfileinfo().path();
retRef.m_archiveFile.diskFileInfo.recoveryBlob = payload.archivefile().diskfileinfo().recoveryblob();
retRef.m_archiveFile.diskInstance = payload.archivefile().diskinstance();
retRef.m_archiveFile.fileSize = payload.archivefile().filesize();
retRef.m_archiveFile.reconciliationTime = payload.archivefile().reconciliationtime();
retRef.m_archiveFile.storageClass = payload.archivefile().storageclass();
objectstore::ArchiveFileSerDeser af;
af.deserialize(payload.archivefile());
retRef.m_archiveFile = af;
oh.set_payload(payload.SerializePartialAsString());
return oh.SerializeAsString();
}
......
......@@ -99,6 +99,7 @@ void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta::
m_catalogue.filesWrittenToTape(tapeFilesWritten);
}
//------------------------------------------------------------------------------
// getNextJobBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(uint64_t filesRequested,
......
......@@ -1936,111 +1936,6 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo(
return mountInfo;
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::getNextJob()
//------------------------------------------------------------------------------
auto OStoreDB::RetrieveMount::getNextJob(log::LogContext & logContext) -> std::unique_ptr<SchedulerDatabase::RetrieveJob> {
// Find the next file to retrieve
// Get the tape pool and then tape
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
// First, check we should not forcibly go down. In such an occasion, we just find noting to do.
// Get drive register
{
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
ScopedSharedLock drl(dr);
dr.fetch();
auto drs = dr.getDriveState(mountInfo.drive);
if (!drs.desiredDriveState.up && drs.desiredDriveState.forceDown) {
logContext.log(log::INFO, "In OStoreDB::RetrieveMount::getNextJob(): returning no job as we are forcibly going down.");
return nullptr;
}
}
auto rql = re.dumpRetrieveQueues();
rel.release();
std::string rqAddress;
for (auto & rqp: rql) {
if (rqp.vid == mountInfo.vid)
rqAddress = rqp.address;
}
// The retrieve queue is gone. There is no more job.
if (!rqAddress.size())
return nullptr;
// Try and open the retrieve queue. It could be gone by now.
try {
objectstore::RetrieveQueue rq(rqAddress, m_objectStore);
objectstore::ScopedExclusiveLock rqlock;
try {
rqlock.lock(rq);
rq.fetch();
} catch (cta::exception::Exception & ex) {
// The queue is now absent. We can remove its reference in the root entry.
// A new queue could have been added in the mean time, and be non-empty.
// We will then fail to remove from the RootEntry (non-fatal).
// TODO: We still conclude that the queue is empty on this unlikely event.
// (cont'd): A better approach would be to retart the process of this function
// from scratch.
rel.lock(re);
re.fetch();
try {
re.removeRetrieveQueueAndCommit(mountInfo.vid);
} catch (RootEntry::RetrieveQueueNotEmpty & ex) {
// TODO: improve: if we fail here we could retry to fetch a job.
return nullptr;
}
}
// Pop jobs until we find one actually belonging to the queue.
// Any job not really belonging is an uncommitted pop, which we will
// re-do here.
while (rq.dumpJobs().size()) {
// First take a lock on and download the job
// If the request is not around anymore, we will just move the the next
// Prepare the return value
auto job=rq.dumpJobs().front();
std::unique_ptr<OStoreDB::RetrieveJob> privateRet(new OStoreDB::RetrieveJob(
job.address, m_objectStore, m_catalogue, m_logger, m_agentReference, *this));
privateRet->selectedCopyNb = job.copyNb;
objectstore::ScopedExclusiveLock rrl;
try {
rrl.lock(privateRet->m_retrieveRequest);
privateRet->m_retrieveRequest.fetch();
if(privateRet->m_retrieveRequest.getOwner() != rq.getAddressIfSet()) {
rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet());
continue;
}
} catch (cta::exception::Exception &) {
// we failed to access the object. It might be missing.
// Just pop this job from the queue and move to the next.
rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet());
// Commit in case we do not pass by again.
rq.commit();
continue;
}
// Take ownership of the job
// Add to ownership
m_agentReference.addToOwnership(privateRet->m_retrieveRequest.getAddressIfSet(), m_objectStore);
// Make the ownership official
privateRet->m_retrieveRequest.setOwner(m_agentReference.getAgentAddress());
privateRet->m_retrieveRequest.commit();
// Remove the job from the archive queue
rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet());
// We can commit and release the retrieve queue lock, we will only fill up
// memory structure from here on.
rq.commit();
rqlock.release();
privateRet->retrieveRequest = privateRet->m_retrieveRequest.getSchedulerRequest();
privateRet->archiveFile = privateRet->m_retrieveRequest.getArchiveFile();
privateRet->m_jobOwned = true;
privateRet->m_mountId = mountInfo.mountId;
return std::unique_ptr<SchedulerDatabase::RetrieveJob> (std::move(privateRet));
}
return std::unique_ptr<SchedulerDatabase::RetrieveJob>();
} catch (cta::exception::Exception & ex) {
return nullptr;
}
}
//------------------------------------------------------------------------------
// OStoreDB::RetrieveMount::getNextJobBatch()
//------------------------------------------------------------------------------
......
......@@ -172,7 +172,6 @@ public:
public:
const MountInfo & getMountInfo() override;
std::list<std::unique_ptr<RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) override;
std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext) override;
void complete(time_t completionTime) override;
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override;
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override;
......
......@@ -34,28 +34,28 @@ cta::RetrieveMount::RetrieveMount(
}
//------------------------------------------------------------------------------
// getMountType
// getMountType()
//------------------------------------------------------------------------------
cta::common::dataStructures::MountType cta::RetrieveMount::getMountType() const{
return cta::common::dataStructures::MountType::Retrieve;
}
//------------------------------------------------------------------------------
// getNbFiles
// getNbFiles()
//------------------------------------------------------------------------------
uint32_t cta::RetrieveMount::getNbFiles() const {
return m_dbMount->nbFilesCurrentlyOnTape;
}
//------------------------------------------------------------------------------
// getVid
// getVid()
//------------------------------------------------------------------------------
std::string cta::RetrieveMount::getVid() const{
return m_dbMount->mountInfo.vid;
}
//------------------------------------------------------------------------------
// getMountTransactionId
// getMountTransactionId()
//------------------------------------------------------------------------------
std::string cta::RetrieveMount::getMountTransactionId() const{
std::stringstream id;
......@@ -66,25 +66,28 @@ std::string cta::RetrieveMount::getMountTransactionId() const{
}
//------------------------------------------------------------------------------
// getNextJob
// getNextJobBatch()
//------------------------------------------------------------------------------
std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob(log::LogContext & logContext) {
std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested,
log::LogContext& logContext) {
if (!m_sessionRunning)
throw SessionNotRunning("In RetrieveMount::getNextJob(): trying to get job from complete/not started session");
throw SessionNotRunning("In RetrieveMount::getNextJobBatch(): trying to get job from complete/not started session");
// Try and get a new job from the DB
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> dbJob(m_dbMount->getNextJob(logContext).release());
if (!dbJob.get())
return std::unique_ptr<cta::RetrieveJob>();
// We have something to retrieve: prepare the response
std::unique_ptr<cta::RetrieveJob> ret (new RetrieveJob(*this,
dbJob->retrieveRequest, dbJob->archiveFile, dbJob->selectedCopyNb,
PositioningMethod::ByBlock));
ret->m_dbJob.reset(dbJob.release());
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch(m_dbMount->getNextJobBatch(filesRequested,
bytesRequested, logContext));
std::list<std::unique_ptr<RetrieveJob>> ret;
// We prepare the response
for (auto & sdrj: dbJobBatch) {
ret.emplace_back(new RetrieveJob(*this,
sdrj->retrieveRequest, sdrj->archiveFile, sdrj->selectedCopyNb,
PositioningMethod::ByBlock));
ret.back()->m_dbJob.reset(sdrj.release());
}
return ret;
}
//------------------------------------------------------------------------------
// tapeComplete())
// tapeComplete()
//------------------------------------------------------------------------------
void cta::RetrieveMount::tapeComplete() {
m_tapeRunning = false;
......@@ -101,7 +104,7 @@ void cta::RetrieveMount::tapeComplete() {
}
//------------------------------------------------------------------------------
// diskComplete())
// diskComplete()
//------------------------------------------------------------------------------
void cta::RetrieveMount::diskComplete() {
m_diskRunning = false;
......@@ -116,7 +119,7 @@ void cta::RetrieveMount::diskComplete() {
}
//------------------------------------------------------------------------------
// abort())
// abort()
//------------------------------------------------------------------------------
void cta::RetrieveMount::abort() {
diskComplete();
......@@ -138,7 +141,7 @@ void cta::RetrieveMount::setTapeSessionStats(const castor::tape::tapeserver::dae
}
//------------------------------------------------------------------------------
// bothSidesComplete())
// bothSidesComplete()
//------------------------------------------------------------------------------
bool cta::RetrieveMount::bothSidesComplete() {
return !(m_diskRunning || m_tapeRunning);
......
......@@ -117,13 +117,19 @@ namespace cta {
virtual bool bothSidesComplete();
CTA_GENERATE_EXCEPTION_CLASS(SessionNotRunning);
/**
* Job factory
*
* @return A unique_ptr to the next archive job or NULL if there are no more
* archive jobs left for this tape mount.
*/
virtual std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext);
* Batch job factory
*
* @param filesRequested the number of files requested
* @param bytesRequested the number of bytes requested
* @param logContext
* @return a list of unique_ptr to the next retrieve jobs. The list is empty
* when no more jobs can be found. Will return jobs (if available) until one
* of the 2 criteria is fulfilled.
*/
virtual std::list<std::unique_ptr<RetrieveJob>> getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, log::LogContext &logContext);
/**
* Destructor.
......
......@@ -323,7 +323,6 @@ public:
virtual const MountInfo & getMountInfo() = 0;
virtual std::list<std::unique_ptr<RetrieveJob>> getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, log::LogContext& logContext) = 0;
virtual std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext) = 0;
virtual void complete(time_t completionTime) = 0;
virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0;
virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0;
......
......@@ -517,11 +517,13 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release()));
ASSERT_NE((cta::RetrieveMount*)NULL, retrieveMount.get());
std::unique_ptr<cta::RetrieveJob> retrieveJob;
retrieveJob.reset(retrieveMount->getNextJob(lc).release());
auto jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(1, jobBatch.size());
retrieveJob.reset(jobBatch.front().release());
ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get());
retrieveJob->complete();
retrieveJob.reset(retrieveMount->getNextJob(lc).release());
ASSERT_EQ((cta::RetrieveJob*)NULL, retrieveJob.get());
jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(0, jobBatch.size());
}
}
......
......@@ -33,15 +33,23 @@ namespace cta {
~MockRetrieveMount() throw() {
}
std::unique_ptr<cta::RetrieveJob> getNextJob(log::LogContext & logContext) override {
std::list<std::unique_ptr<cta::RetrieveJob> > getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, log::LogContext & logContext) override {
std::list<std::unique_ptr<cta::RetrieveJob> > ret;
// Count the attempt to get a file (even if not successful).
getJobs++;
if(m_jobs.empty()) {
return std::unique_ptr<cta::RetrieveJob>();
} else {
std::unique_ptr<cta::RetrieveJob> job = std::move(m_jobs.front());
while (m_jobs.size()) {
ret.emplace_back(m_jobs.front().release());
m_jobs.pop_front();
return job;
// Count the next attempt to get the file"
if (filesRequested <= 1 || bytesRequested <= ret.back()->archiveFile.fileSize)
break;
else
getJobs++;
bytesRequested -= ret.back()->archiveFile.fileSize;
filesRequested--;
}
return ret;
}
virtual std::string getMountTransactionId() const override {
......
......@@ -42,7 +42,6 @@ namespace unitTests{
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext &) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
......
......@@ -37,7 +37,6 @@ namespace unitTests{
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext & logContext) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
......
......@@ -205,15 +205,9 @@ void RecallTaskInjector::injectBulkRecalls() {
bool RecallTaskInjector::synchronousFetch()
{
try {
uint64_t files=0;
uint64_t bytes=0;
while(files<=m_maxFiles && bytes<=m_maxBytes) {
std::unique_ptr<cta::RetrieveJob> job=m_retrieveMount.getNextJob(m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.fileSize;
m_jobs.emplace_back(job.release());
}
auto jobsList = m_retrieveMount.getNextJobBatch(m_maxFiles, m_maxBytes, m_lc);
for (auto & j: jobsList)
m_jobs.emplace_back(j.release());
} catch (cta::exception::Exception & ex) {
cta::log::ScopedParamContainer scoped(m_lc);
scoped.add("transactionId", m_retrieveMount.getMountTransactionId())
......@@ -289,16 +283,9 @@ void RecallTaskInjector::WorkerThread::run()
break;
}
m_parent.m_lc.log(cta::log::DEBUG,"RecallJobInjector:run: about to call client interface");
uint64_t files=0;
uint64_t bytes=0;
while(files<=req.filesRequested && bytes<=req.bytesRequested) {
std::unique_ptr<cta::RetrieveJob> job=m_parent.m_retrieveMount.getNextJob(m_parent.m_lc);
if(!job.get()) break;
files++;
bytes+=job->archiveFile.fileSize;
m_parent.m_jobs.emplace_back(job.release());
}
auto jobsList = m_parent.m_retrieveMount.getNextJobBatch(req.filesRequested, req.bytesRequested, m_parent.m_lc);
for (auto & j: jobsList)
m_parent.m_jobs.emplace_back(j.release());
LogContext::ScopedParam sp01(m_parent.m_lc, Param("transactionId", m_parent.m_retrieveMount.getMountTransactionId()));
if (m_parent.m_jobs.empty()) {
......
......@@ -132,7 +132,6 @@ namespace unitTests
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); }
std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");}
std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext & logContext) override { throw std::runtime_error("Not implemented");}
void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); }
void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); }
......@@ -168,8 +167,8 @@ namespace unitTests
tapeserver::daemon::RecallTaskInjector rti(mm, tapeRead, diskWrite, trm, maxNbJobsInjectedAtOnce, blockSize, lc);
ASSERT_EQ(true, rti.synchronousFetch());
ASSERT_EQ(maxNbJobsInjectedAtOnce+1, diskWrite.m_tasks.size());
ASSERT_EQ(maxNbJobsInjectedAtOnce+1, tapeRead.m_tasks.size());
ASSERT_EQ(maxNbJobsInjectedAtOnce, diskWrite.m_tasks.size());
ASSERT_EQ(maxNbJobsInjectedAtOnce, tapeRead.m_tasks.size());
rti.startThreads();
rti.requestInjection(false);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment