diff --git a/objectstore/RetrieveRequest.cpp b/objectstore/RetrieveRequest.cpp index 3539b03cfcc3e40b0f470f2916231be8792fb8ed..008e08485fa79f5ce88a3d30292e48fe44ff5115 100644 --- a/objectstore/RetrieveRequest.cpp +++ b/objectstore/RetrieveRequest.cpp @@ -387,19 +387,9 @@ auto RetrieveRequest::asyncUpdateOwner(uint16_t copyNumber, const std::string& o retRef.m_retieveRequest.dstURL = payload.schedulerrequest().dsturl(); retRef.m_retieveRequest.requester.name = payload.schedulerrequest().requester().name(); retRef.m_retieveRequest.requester.group = payload.schedulerrequest().requester().group(); - retRef.m_archiveFile.archiveFileID = payload.archivefile().archivefileid(); - retRef.m_archiveFile.checksumType = payload.archivefile().checksumtype(); - retRef.m_archiveFile.checksumValue = payload.archivefile().checksumvalue(); - retRef.m_archiveFile.creationTime = payload.archivefile().creationtime(); - retRef.m_archiveFile.diskFileId = payload.archivefile().diskfileid(); - retRef.m_archiveFile.diskFileInfo.group = payload.archivefile().diskfileinfo().group(); - retRef.m_archiveFile.diskFileInfo.owner = payload.archivefile().diskfileinfo().owner(); - retRef.m_archiveFile.diskFileInfo.path = payload.archivefile().diskfileinfo().path(); - retRef.m_archiveFile.diskFileInfo.recoveryBlob = payload.archivefile().diskfileinfo().recoveryblob(); - retRef.m_archiveFile.diskInstance = payload.archivefile().diskinstance(); - retRef.m_archiveFile.fileSize = payload.archivefile().filesize(); - retRef.m_archiveFile.reconciliationTime = payload.archivefile().reconciliationtime(); - retRef.m_archiveFile.storageClass = payload.archivefile().storageclass(); + objectstore::ArchiveFileSerDeser af; + af.deserialize(payload.archivefile()); + retRef.m_archiveFile = af; oh.set_payload(payload.SerializePartialAsString()); return oh.SerializeAsString(); } diff --git a/scheduler/ArchiveMount.cpp b/scheduler/ArchiveMount.cpp index 93b6723504a2b0b8dd2284ccc3930ecf488ee9a0..32e7a9dfe9ad33306a3330a1f87d7eecdb1b6311 100644 --- a/scheduler/ArchiveMount.cpp +++ b/scheduler/ArchiveMount.cpp @@ -99,6 +99,7 @@ void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta:: m_catalogue.filesWrittenToTape(tapeFilesWritten); } +//------------------------------------------------------------------------------ // getNextJobBatch //------------------------------------------------------------------------------ std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(uint64_t filesRequested, diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index d0bdc390b1d907ba54fb6da928458d0aacdc3424..cca6848b52eb3b6d98830be579e9ea71b237e7a7 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -1936,111 +1936,6 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo( return mountInfo; } -//------------------------------------------------------------------------------ -// OStoreDB::RetrieveMount::getNextJob() -//------------------------------------------------------------------------------ -auto OStoreDB::RetrieveMount::getNextJob(log::LogContext & logContext) -> std::unique_ptr<SchedulerDatabase::RetrieveJob> { - // Find the next file to retrieve - // Get the tape pool and then tape - objectstore::RootEntry re(m_objectStore); - objectstore::ScopedSharedLock rel(re); - re.fetch(); - // First, check we should not forcibly go down. In such an occasion, we just find noting to do. - // Get drive register - { - objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore); - ScopedSharedLock drl(dr); - dr.fetch(); - auto drs = dr.getDriveState(mountInfo.drive); - if (!drs.desiredDriveState.up && drs.desiredDriveState.forceDown) { - logContext.log(log::INFO, "In OStoreDB::RetrieveMount::getNextJob(): returning no job as we are forcibly going down."); - return nullptr; - } - } - auto rql = re.dumpRetrieveQueues(); - rel.release(); - std::string rqAddress; - for (auto & rqp: rql) { - if (rqp.vid == mountInfo.vid) - rqAddress = rqp.address; - } - // The retrieve queue is gone. There is no more job. - if (!rqAddress.size()) - return nullptr; - // Try and open the retrieve queue. It could be gone by now. - try { - objectstore::RetrieveQueue rq(rqAddress, m_objectStore); - objectstore::ScopedExclusiveLock rqlock; - try { - rqlock.lock(rq); - rq.fetch(); - } catch (cta::exception::Exception & ex) { - // The queue is now absent. We can remove its reference in the root entry. - // A new queue could have been added in the mean time, and be non-empty. - // We will then fail to remove from the RootEntry (non-fatal). - // TODO: We still conclude that the queue is empty on this unlikely event. - // (cont'd): A better approach would be to retart the process of this function - // from scratch. - rel.lock(re); - re.fetch(); - try { - re.removeRetrieveQueueAndCommit(mountInfo.vid); - } catch (RootEntry::RetrieveQueueNotEmpty & ex) { - // TODO: improve: if we fail here we could retry to fetch a job. - return nullptr; - } - } - // Pop jobs until we find one actually belonging to the queue. - // Any job not really belonging is an uncommitted pop, which we will - // re-do here. - while (rq.dumpJobs().size()) { - // First take a lock on and download the job - // If the request is not around anymore, we will just move the the next - // Prepare the return value - auto job=rq.dumpJobs().front(); - std::unique_ptr<OStoreDB::RetrieveJob> privateRet(new OStoreDB::RetrieveJob( - job.address, m_objectStore, m_catalogue, m_logger, m_agentReference, *this)); - privateRet->selectedCopyNb = job.copyNb; - objectstore::ScopedExclusiveLock rrl; - try { - rrl.lock(privateRet->m_retrieveRequest); - privateRet->m_retrieveRequest.fetch(); - if(privateRet->m_retrieveRequest.getOwner() != rq.getAddressIfSet()) { - rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet()); - continue; - } - } catch (cta::exception::Exception &) { - // we failed to access the object. It might be missing. - // Just pop this job from the queue and move to the next. - rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet()); - // Commit in case we do not pass by again. - rq.commit(); - continue; - } - // Take ownership of the job - // Add to ownership - m_agentReference.addToOwnership(privateRet->m_retrieveRequest.getAddressIfSet(), m_objectStore); - // Make the ownership official - privateRet->m_retrieveRequest.setOwner(m_agentReference.getAgentAddress()); - privateRet->m_retrieveRequest.commit(); - // Remove the job from the archive queue - rq.removeJob(privateRet->m_retrieveRequest.getAddressIfSet()); - // We can commit and release the retrieve queue lock, we will only fill up - // memory structure from here on. - rq.commit(); - rqlock.release(); - privateRet->retrieveRequest = privateRet->m_retrieveRequest.getSchedulerRequest(); - privateRet->archiveFile = privateRet->m_retrieveRequest.getArchiveFile(); - privateRet->m_jobOwned = true; - privateRet->m_mountId = mountInfo.mountId; - return std::unique_ptr<SchedulerDatabase::RetrieveJob> (std::move(privateRet)); - } - return std::unique_ptr<SchedulerDatabase::RetrieveJob>(); - } catch (cta::exception::Exception & ex) { - return nullptr; - } -} - //------------------------------------------------------------------------------ // OStoreDB::RetrieveMount::getNextJobBatch() //------------------------------------------------------------------------------ diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index c4de2f82ab2c0d56f96ae426ee0183903b29030c..513a4755c8f4726ec9eeba1697bc70ecb4883528 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -172,7 +172,6 @@ public: public: const MountInfo & getMountInfo() override; std::list<std::unique_ptr<RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) override; - std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext) override; void complete(time_t completionTime) override; void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override; void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override; diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp index bfd2740a1fe94e890ac2254c0fc198364a38a8ac..141877d9f49be7b6240ed83fb4cccb69a1b9d9df 100644 --- a/scheduler/RetrieveMount.cpp +++ b/scheduler/RetrieveMount.cpp @@ -34,28 +34,28 @@ cta::RetrieveMount::RetrieveMount( } //------------------------------------------------------------------------------ -// getMountType +// getMountType() //------------------------------------------------------------------------------ cta::common::dataStructures::MountType cta::RetrieveMount::getMountType() const{ return cta::common::dataStructures::MountType::Retrieve; } //------------------------------------------------------------------------------ -// getNbFiles +// getNbFiles() //------------------------------------------------------------------------------ uint32_t cta::RetrieveMount::getNbFiles() const { return m_dbMount->nbFilesCurrentlyOnTape; } //------------------------------------------------------------------------------ -// getVid +// getVid() //------------------------------------------------------------------------------ std::string cta::RetrieveMount::getVid() const{ return m_dbMount->mountInfo.vid; } //------------------------------------------------------------------------------ -// getMountTransactionId +// getMountTransactionId() //------------------------------------------------------------------------------ std::string cta::RetrieveMount::getMountTransactionId() const{ std::stringstream id; @@ -66,25 +66,28 @@ std::string cta::RetrieveMount::getMountTransactionId() const{ } //------------------------------------------------------------------------------ -// getNextJob +// getNextJobBatch() //------------------------------------------------------------------------------ -std::unique_ptr<cta::RetrieveJob> cta::RetrieveMount::getNextJob(log::LogContext & logContext) { +std::list<std::unique_ptr<cta::RetrieveJob> > cta::RetrieveMount::getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, + log::LogContext& logContext) { if (!m_sessionRunning) - throw SessionNotRunning("In RetrieveMount::getNextJob(): trying to get job from complete/not started session"); + throw SessionNotRunning("In RetrieveMount::getNextJobBatch(): trying to get job from complete/not started session"); // Try and get a new job from the DB - std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> dbJob(m_dbMount->getNextJob(logContext).release()); - if (!dbJob.get()) - return std::unique_ptr<cta::RetrieveJob>(); - // We have something to retrieve: prepare the response - std::unique_ptr<cta::RetrieveJob> ret (new RetrieveJob(*this, - dbJob->retrieveRequest, dbJob->archiveFile, dbJob->selectedCopyNb, - PositioningMethod::ByBlock)); - ret->m_dbJob.reset(dbJob.release()); + std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> dbJobBatch(m_dbMount->getNextJobBatch(filesRequested, + bytesRequested, logContext)); + std::list<std::unique_ptr<RetrieveJob>> ret; + // We prepare the response + for (auto & sdrj: dbJobBatch) { + ret.emplace_back(new RetrieveJob(*this, + sdrj->retrieveRequest, sdrj->archiveFile, sdrj->selectedCopyNb, + PositioningMethod::ByBlock)); + ret.back()->m_dbJob.reset(sdrj.release()); + } return ret; } //------------------------------------------------------------------------------ -// tapeComplete()) +// tapeComplete() //------------------------------------------------------------------------------ void cta::RetrieveMount::tapeComplete() { m_tapeRunning = false; @@ -101,7 +104,7 @@ void cta::RetrieveMount::tapeComplete() { } //------------------------------------------------------------------------------ -// diskComplete()) +// diskComplete() //------------------------------------------------------------------------------ void cta::RetrieveMount::diskComplete() { m_diskRunning = false; @@ -116,7 +119,7 @@ void cta::RetrieveMount::diskComplete() { } //------------------------------------------------------------------------------ -// abort()) +// abort() //------------------------------------------------------------------------------ void cta::RetrieveMount::abort() { diskComplete(); @@ -138,7 +141,7 @@ void cta::RetrieveMount::setTapeSessionStats(const castor::tape::tapeserver::dae } //------------------------------------------------------------------------------ -// bothSidesComplete()) +// bothSidesComplete() //------------------------------------------------------------------------------ bool cta::RetrieveMount::bothSidesComplete() { return !(m_diskRunning || m_tapeRunning); diff --git a/scheduler/RetrieveMount.hpp b/scheduler/RetrieveMount.hpp index 6a35c057e41ea5c6614b4c8b9a5a840ac5b2ebf3..1b6c577ac33570361e7659de0d5badb368054883 100644 --- a/scheduler/RetrieveMount.hpp +++ b/scheduler/RetrieveMount.hpp @@ -117,13 +117,19 @@ namespace cta { virtual bool bothSidesComplete(); CTA_GENERATE_EXCEPTION_CLASS(SessionNotRunning); + /** - * Job factory - * - * @return A unique_ptr to the next archive job or NULL if there are no more - * archive jobs left for this tape mount. - */ - virtual std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext); + * Batch job factory + * + * @param filesRequested the number of files requested + * @param bytesRequested the number of bytes requested + * @param logContext + * @return a list of unique_ptr to the next retrieve jobs. The list is empty + * when no more jobs can be found. Will return jobs (if available) until one + * of the 2 criteria is fulfilled. + */ + virtual std::list<std::unique_ptr<RetrieveJob>> getNextJobBatch(uint64_t filesRequested, + uint64_t bytesRequested, log::LogContext &logContext); /** * Destructor. diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index eff6e30dfc2cdf7849dfa83c1a8f6a75bab477ce..89426ea1ed58d13c70b96af988323dee05bea536 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -323,7 +323,6 @@ public: virtual const MountInfo & getMountInfo() = 0; virtual std::list<std::unique_ptr<RetrieveJob>> getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext& logContext) = 0; - virtual std::unique_ptr<RetrieveJob> getNextJob(log::LogContext & logContext) = 0; virtual void complete(time_t completionTime) = 0; virtual void setDriveStatus(common::dataStructures::DriveStatus status, time_t completionTime) = 0; virtual void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) = 0; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 6761270966d260b9bbc34b4673fafd357d167ce9..35bcc403a48334a5bf0d5147d2b955c1f8499306 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -517,11 +517,13 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) { retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); ASSERT_NE((cta::RetrieveMount*)NULL, retrieveMount.get()); std::unique_ptr<cta::RetrieveJob> retrieveJob; - retrieveJob.reset(retrieveMount->getNextJob(lc).release()); + auto jobBatch = retrieveMount->getNextJobBatch(1,1,lc); + ASSERT_EQ(1, jobBatch.size()); + retrieveJob.reset(jobBatch.front().release()); ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get()); retrieveJob->complete(); - retrieveJob.reset(retrieveMount->getNextJob(lc).release()); - ASSERT_EQ((cta::RetrieveJob*)NULL, retrieveJob.get()); + jobBatch = retrieveMount->getNextJobBatch(1,1,lc); + ASSERT_EQ(0, jobBatch.size()); } } diff --git a/scheduler/testingMocks/MockRetrieveMount.hpp b/scheduler/testingMocks/MockRetrieveMount.hpp index e0af365e77d343d2e073ad74ff04ff4da60c14c6..62f77972b8503a845f9e4c409868bd8807091811 100644 --- a/scheduler/testingMocks/MockRetrieveMount.hpp +++ b/scheduler/testingMocks/MockRetrieveMount.hpp @@ -33,15 +33,23 @@ namespace cta { ~MockRetrieveMount() throw() { } - std::unique_ptr<cta::RetrieveJob> getNextJob(log::LogContext & logContext) override { + std::list<std::unique_ptr<cta::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, + uint64_t bytesRequested, log::LogContext & logContext) override { + std::list<std::unique_ptr<cta::RetrieveJob> > ret; + // Count the attempt to get a file (even if not successful). getJobs++; - if(m_jobs.empty()) { - return std::unique_ptr<cta::RetrieveJob>(); - } else { - std::unique_ptr<cta::RetrieveJob> job = std::move(m_jobs.front()); + while (m_jobs.size()) { + ret.emplace_back(m_jobs.front().release()); m_jobs.pop_front(); - return job; + // Count the next attempt to get the file" + if (filesRequested <= 1 || bytesRequested <= ret.back()->archiveFile.fileSize) + break; + else + getJobs++; + bytesRequested -= ret.back()->archiveFile.fileSize; + filesRequested--; } + return ret; } virtual std::string getMountTransactionId() const override { diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp index f999dd375e87ccf8100ad9bd9618462ec33ea8cd..d1498733bfdd8177bdd5281bb663ce8dfdf47274 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteTaskTest.cpp @@ -42,7 +42,6 @@ namespace unitTests{ class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} - std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext &) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp index f9b7d49ccdedd66bebdae25fe2a170025de34646..d9bb8cfe65841de99aa760d307447337ebe5b5e3 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/DiskWriteThreadPoolTest.cpp @@ -37,7 +37,6 @@ namespace unitTests{ class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} - std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext & logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp index 820153025128a368d4d5fe0b413ade16f9aa5ca4..bc07ee3f9045fe5d57aa127a584a192ea3917f05 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjector.cpp @@ -205,15 +205,9 @@ void RecallTaskInjector::injectBulkRecalls() { bool RecallTaskInjector::synchronousFetch() { try { - uint64_t files=0; - uint64_t bytes=0; - while(files<=m_maxFiles && bytes<=m_maxBytes) { - std::unique_ptr<cta::RetrieveJob> job=m_retrieveMount.getNextJob(m_lc); - if(!job.get()) break; - files++; - bytes+=job->archiveFile.fileSize; - m_jobs.emplace_back(job.release()); - } + auto jobsList = m_retrieveMount.getNextJobBatch(m_maxFiles, m_maxBytes, m_lc); + for (auto & j: jobsList) + m_jobs.emplace_back(j.release()); } catch (cta::exception::Exception & ex) { cta::log::ScopedParamContainer scoped(m_lc); scoped.add("transactionId", m_retrieveMount.getMountTransactionId()) @@ -289,16 +283,9 @@ void RecallTaskInjector::WorkerThread::run() break; } m_parent.m_lc.log(cta::log::DEBUG,"RecallJobInjector:run: about to call client interface"); - uint64_t files=0; - uint64_t bytes=0; - while(files<=req.filesRequested && bytes<=req.bytesRequested) { - std::unique_ptr<cta::RetrieveJob> job=m_parent.m_retrieveMount.getNextJob(m_parent.m_lc); - if(!job.get()) break; - files++; - bytes+=job->archiveFile.fileSize; - m_parent.m_jobs.emplace_back(job.release()); - } - + auto jobsList = m_parent.m_retrieveMount.getNextJobBatch(req.filesRequested, req.bytesRequested, m_parent.m_lc); + for (auto & j: jobsList) + m_parent.m_jobs.emplace_back(j.release()); LogContext::ScopedParam sp01(m_parent.m_lc, Param("transactionId", m_parent.m_retrieveMount.getMountTransactionId())); if (m_parent.m_jobs.empty()) { diff --git a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp index 93567c66b45e2bbb5f0cfc8319862418d318f3d7..71c946d5e08e8711fd3b107c3f2e2db63a38336d 100644 --- a/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp +++ b/tapeserver/castor/tape/tapeserver/daemon/RecallTaskInjectorTest.cpp @@ -132,7 +132,6 @@ namespace unitTests class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount { const MountInfo & getMountInfo() override { throw std::runtime_error("Not implemented"); } std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> > getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, cta::log::LogContext& logContext) override { throw std::runtime_error("Not implemented");} - std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob(cta::log::LogContext & logContext) override { throw std::runtime_error("Not implemented");} void complete(time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setDriveStatus(cta::common::dataStructures::DriveStatus status, time_t completionTime) override { throw std::runtime_error("Not implemented"); } void setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) override { throw std::runtime_error("Not implemented"); } @@ -168,8 +167,8 @@ namespace unitTests tapeserver::daemon::RecallTaskInjector rti(mm, tapeRead, diskWrite, trm, maxNbJobsInjectedAtOnce, blockSize, lc); ASSERT_EQ(true, rti.synchronousFetch()); - ASSERT_EQ(maxNbJobsInjectedAtOnce+1, diskWrite.m_tasks.size()); - ASSERT_EQ(maxNbJobsInjectedAtOnce+1, tapeRead.m_tasks.size()); + ASSERT_EQ(maxNbJobsInjectedAtOnce, diskWrite.m_tasks.size()); + ASSERT_EQ(maxNbJobsInjectedAtOnce, tapeRead.m_tasks.size()); rti.startThreads(); rti.requestInjection(false);