Commit 7abc73f7 authored by Cristina Moraru's avatar Cristina Moraru
Browse files

Extend RecallTaskInjector to perform RAO query



This patch extends RecallTaskInjector to perform a
Recommended Access Query to the tape drive to reorder
a set of jobs. This patch also adds a unit test for the
newly added RAO functionality
Signed-off-by: default avatarCristina Moraru <cristina-gabriela.moraru@cern.ch>
parent 0ad2455c
......@@ -41,6 +41,7 @@ castor::tape::tapeserver::daemon::DataTransferConfig::DataTransferConfig()
maxFilesBeforeFlush(0),
nbDiskThreads(0),
useLbp(false),
useRAO(false),
externalEncryptionKeyScript("") {}
//------------------------------------------------------------------------------
......@@ -85,6 +86,8 @@ castor::tape::tapeserver::daemon::DataTransferConfig
"TapeServer", "XrootTimeout", 0, log);
const std::string useLBP = castorConf.getConfEntString(
"TapeServer", "UseLogicalBlockProtection", "no", log);
const std::string useRAO = castorConf.getConfEntString(
"TapeServer", "UseRecommendedAccessOrder", "no", log);
if (!strcasecmp(useLBP.c_str(), "yes") || !strcmp(useLBP.c_str(), "1")) {
config.useLbp = true;
......@@ -92,6 +95,12 @@ castor::tape::tapeserver::daemon::DataTransferConfig
config.useLbp = false;
}
if (!strcasecmp(useRAO.c_str(), "yes") || !strcmp(useRAO.c_str(), "1")) {
config.useRAO = true;
} else {
config.useRAO = false;
}
config.externalEncryptionKeyScript = castorConf.getConfEntString("TapeServer",
"ExternalEncryptionKeyScript", "");
......
......@@ -112,6 +112,12 @@ struct DataTransferConfig {
*/
bool useLbp;
/**
* The boolean variable describing to use on not to use Recommended
* Access Order
*/
bool useRAO;
/**
* The path to the operator provided encyption control script (or empty string)
*/
......
......@@ -203,7 +203,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
TapeReadSingleThread trst(*drive, m_mc, tsr, m_volInfo,
m_castorConf.bulkRequestRecallMaxFiles,m_capUtils,rwd,lc,rrp,
m_castorConf.useLbp, m_castorConf.externalEncryptionKeyScript);
m_castorConf.useLbp, m_castorConf.useRAO, m_castorConf.externalEncryptionKeyScript);
DiskWriteThreadPool dwtp(m_castorConf.nbDiskThreads,
rrp,
rwd,
......@@ -219,10 +219,18 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
trst.setTaskInjector(&rti);
rrp.setWatchdog(rwd);
rti.setDriveInterface(trst.getDriveReference());
// We are now ready to put everything in motion. First step is to check
// we get any concrete job to be done from the client (via the task injector)
cta::utils::Timer timer;
if (rti.synchronousInjection()) { //adapt the recall task injector (starting from synchronousInjection)
// The RecallTaskInjector and the TapeReadSingleThread share the promise
if (m_castorConf.useRAO) {
rti.initRAO();
}
if (rti.synchronousFetch()) { //adapt the recall task injector (starting from synchronousFetch)
// We got something to recall. Time to start the machinery
trst.setWaitForInstructionsTime(timer.secs());
rwd.startThread();
......
......@@ -658,6 +658,222 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) {
"mountTotalReadRetries=\"25\" mountTotalWriteRetries=\"25\" mountWriteTransients=\"10\""));
}
TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
std::string vid = s_vid;
// cta::MountType::Enum mountType = cta::MountType::RETRIEVE;
// 3) Prepare the necessary environment (logger, plus system wrapper),
castor::tape::System::mockWrapper mockSys;
mockSys.delegateToFake();
mockSys.disableGMockCallsCounting();
mockSys.fake.setupForVirtualDriveSLC6();
//delete is unnecessary
//pointer with ownership will be passed to the application,
//which will do the delete
mockSys.fake.m_pathToDrive["/dev/nst0"] = new castor::tape::tapeserver::drive::FakeDrive;
// 4) Create the scheduler
auto & catalogue = getCatalogue();
auto & scheduler = getScheduler();
// Always use the same requester
const cta::common::dataStructures::SecurityIdentity requester;
// List to remember the path of each remote file so that the existance of the
// files can be tested for at the end of the test
std::list<std::string> remoteFilePaths;
// 5) Create the environment for the migration to happen (library + tape)
const std::string libraryComment = "Library comment";
catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName,
libraryComment);
{
auto libraries = catalogue.getLogicalLibraries();
ASSERT_EQ(1, libraries.size());
ASSERT_EQ(s_libraryName, libraries.front().name);
ASSERT_EQ(libraryComment, libraries.front().comment);
}
const uint64_t capacityInBytes = 12345678;
const std::string tapeComment = "Tape comment";
bool notDisabled = false;
bool notFull = false;
catalogue.createTape(s_adminOnAdminHost, s_vid, s_libraryName, s_tapePoolName, cta::nullopt, capacityInBytes,
notDisabled, notFull, tapeComment);
int MAX_RECALLS = 62;
int MAX_BULK_RECALLS = 27;
std::vector<int> expectedOrder;
std::vector<std::string> expectedFseqOrderLog;
// 6) Prepare files for reading by writing them to the mock system
{
// Label the tape
castor::tape::tapeFile::LabelSession ls(*mockSys.fake.m_pathToDrive["/dev/nst0"],
s_vid, false);
mockSys.fake.m_pathToDrive["/dev/nst0"]->rewind();
// And write to it
castor::tape::tapeserver::daemon::VolumeInfo volInfo;
volInfo.vid=s_vid;
castor::tape::tapeFile::WriteSession ws(*mockSys.fake.m_pathToDrive["/dev/nst0"],
volInfo , 0, true, false);
// Write a few files on the virtual tape and modify the archive name space
// so that it is in sync
uint8_t data[1000];
size_t archiveFileSize=sizeof(data);
castor::tape::SCSI::Structures::zeroStruct(&data);
int fseq;
for (fseq=1; fseq <= MAX_RECALLS ; fseq ++) {
// Create a path to a remote destination file
std::ostringstream remoteFilePath;
remoteFilePath << "file://" << m_tmpDir << "/test" << fseq;
remoteFilePaths.push_back(remoteFilePath.str());
// Create an archive file entry in the archive namespace
cta::catalogue::TapeFileWritten tapeFileWritten;
// Write the file to tape
cta::MockArchiveMount mam(catalogue);
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(mam, catalogue));
aj->tapeFile.fSeq = fseq;
aj->archiveFile.archiveFileID = fseq;
castor::tape::tapeFile::WriteFile wf(&ws, *aj, archiveFileSize);
tapeFileWritten.blockId = wf.getBlockId();
// Write the data (one block)
wf.write(data, archiveFileSize);
// Close the file
wf.close();
// Create file entry in the archive namespace
tapeFileWritten.archiveFileId=fseq;
tapeFileWritten.checksumType="ADLER32";
tapeFileWritten.checksumValue=cta::utils::getAdler32String(data, archiveFileSize);
tapeFileWritten.vid=volInfo.vid;
tapeFileWritten.size=archiveFileSize;
tapeFileWritten.fSeq=fseq;
tapeFileWritten.copyNb=1;
tapeFileWritten.compressedSize=archiveFileSize; // No compression
tapeFileWritten.diskInstance = s_diskInstance;
tapeFileWritten.diskFileId = fseq;
tapeFileWritten.diskFilePath = remoteFilePath.str();
tapeFileWritten.diskFileUser = s_userName;
tapeFileWritten.diskFileGroup = "someGroup";
tapeFileWritten.diskFileRecoveryBlob = "B106";
tapeFileWritten.storageClassName = s_storageClassName;
tapeFileWritten.tapeDrive = "drive0";
catalogue.filesWrittenToTape(std::list<cta::catalogue::TapeFileWritten>{tapeFileWritten});
// Schedule the retrieval of the file
std::string diskInstance="disk_instance";
cta::common::dataStructures::RetrieveRequest rReq;
rReq.archiveFileID=fseq;
rReq.requester.name = s_userName;
rReq.requester.group = "someGroup";
rReq.dstURL = remoteFilePaths.back();
std::list<std::string> archiveFilePaths;
scheduler.queueRetrieve(diskInstance, rReq, logContext);
expectedOrder.push_back(fseq);
bool apply_rao = false;
bool add_expected = false;
if (MAX_BULK_RECALLS < 10) {
if (expectedOrder.size() % MAX_BULK_RECALLS == 0 ||
fseq % MAX_RECALLS == 0) {
add_expected = true;
}
}
else if (MAX_BULK_RECALLS >= 30) {
if ((expectedOrder.size() % 30 == 0) ||
(((fseq % MAX_RECALLS == 0) || (fseq % MAX_BULK_RECALLS == 0)) &&
(expectedOrder.size() >= 10))) {
apply_rao = true;
add_expected = true;
}
else if (((fseq % MAX_RECALLS == 0) || (fseq % MAX_BULK_RECALLS == 0)) &&
(expectedOrder.size() < 10)) {
add_expected = true;
}
}
else if ((fseq % MAX_BULK_RECALLS == 0) || (fseq % MAX_RECALLS == 0)) {
if (expectedOrder.size() >= 10)
apply_rao = true;
add_expected = true;
}
if (apply_rao) {
std::reverse(expectedOrder.begin(), expectedOrder.end());
}
if (add_expected) {
std::stringstream expectedLogLine;
std::copy(expectedOrder.begin(), expectedOrder.end(),
std::ostream_iterator<int>(expectedLogLine, " "));
expectedFseqOrderLog.push_back(expectedLogLine.str());
expectedOrder.clear();
}
}
}
// 6) Report the drive's existence and put it up in the drive register.
cta::tape::daemon::TpconfigLine driveConfig("T10D6116", "TestLogicalLibrary", "/dev/tape_T10D6116", "manual");
cta::common::dataStructures::DriveInfo driveInfo;
driveInfo.driveName=driveConfig.unitName;
driveInfo.logicalLibrary=driveConfig.rawLibrarySlot;
// We need to create the drive in the registry before being able to put it up.
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down);
scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false);
// 7) Create the data transfer session
DataTransferConfig castorConf;
castorConf.bufsz = 1024*1024; // 1 MB memory buffers
castorConf.nbBufs = 10;
castorConf.bulkRequestRecallMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestRecallMaxFiles = MAX_BULK_RECALLS - 1;
castorConf.nbDiskThreads = 1;
castorConf.useRAO = true;
cta::log::DummyLogger dummyLog("dummy");
cta::mediachanger::MediaChangerFacade mc(dummyLog);
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
castor::tape::tapeserver::daemon::DataTransferSession sess("tapeHost", logger, mockSys,
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
// 8) Run the data transfer session
sess.execute();
// 9) Check the session git the correct VID
ASSERT_EQ(s_vid, sess.getVid());
// 10) Check the remote files exist and have the correct size
for(auto & path: remoteFilePaths) {
struct stat statBuf;
bzero(&statBuf, sizeof(statBuf));
const int statRc = stat(path.substr(7).c_str(), &statBuf); //remove the "file://" for stat-ing
ASSERT_EQ(0, statRc);
ASSERT_EQ(1000, statBuf.st_size); //same size of data
}
// 10) Check logs
std::string logToCheck = logger.getLog();
logToCheck += "";
ASSERT_NE(std::string::npos, logToCheck.find("firmwareVersion=\"123A\" serialNumber=\"123456\" "
"mountTotalCorrectedReadErrors=\"5\" mountTotalReadBytesProcessed=\"4096\" "
"mountTotalUncorrectedReadErrors=\"1\" mountTotalNonMediumErrorCounts=\"2\""));
ASSERT_NE(std::string::npos, logToCheck.find("firmwareVersion=\"123A\" serialNumber=\"123456\" lifetimeMediumEfficiencyPrct=\"100\" "
"mountReadEfficiencyPrct=\"100\" mountWriteEfficiencyPrct=\"100\" "
"mountReadTransients=\"10\" "
"mountServoTemps=\"10\" mountServoTransients=\"5\" mountTemps=\"100\" "
"mountTotalReadRetries=\"25\" mountTotalWriteRetries=\"25\" mountWriteTransients=\"10\""));
for (std::string s : expectedFseqOrderLog) {
ASSERT_NE(std::string::npos, logToCheck.find(s));
}
}
TEST_P(DataTransferSessionTest, DataTransferSessionNoSuchDrive) {
// 0) Prepare the logger for everyone
......
......@@ -27,6 +27,8 @@
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/VolumeInfo.hpp"
#include "castor/tape/tapeserver/SCSI/Structures.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "scheduler/RetrieveJob.hpp"
#include <stdint.h>
......@@ -46,8 +48,8 @@ RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm,
uint64_t maxFiles, uint64_t byteSizeThreshold,cta::log::LogContext lc) :
m_thread(*this),m_memManager(mm),
m_tapeReader(tapeReader),m_diskWriter(diskWriter),
m_retrieveMount(retrieveMount),m_lc(lc),m_maxFiles(maxFiles),m_maxBytes(byteSizeThreshold)
{}
m_retrieveMount(retrieveMount),m_lc(lc),m_maxFiles(maxFiles),m_maxBytes(byteSizeThreshold),
m_useRAO(false) {}
//------------------------------------------------------------------------------
//destructor
//------------------------------------------------------------------------------
......@@ -60,6 +62,11 @@ RecallTaskInjector::~RecallTaskInjector(){
void RecallTaskInjector::finish(){
cta::threading::MutexLocker ml(m_producerProtection);
m_queue.push(Request());
/* Since this is the ending request, the RecallTaskInjector does not need
* to wait to have access to the drive
*/
if (m_useRAO)
setPromise();
}
//------------------------------------------------------------------------------
//requestInjection
......@@ -82,41 +89,135 @@ void RecallTaskInjector::startThreads() {
m_thread.start();
}
//------------------------------------------------------------------------------
//setDriveInterface
//------------------------------------------------------------------------------
void RecallTaskInjector::setDriveInterface(castor::tape::tapeserver::drive::DriveInterface *di) {
m_drive = di;
}
//------------------------------------------------------------------------------
//initRAO
//------------------------------------------------------------------------------
void RecallTaskInjector::initRAO() {
m_useRAO = true;
accessToDrive = false;
m_raoPromise.reset(new std::promise<void>);
m_raoFuture = m_raoPromise->get_future();
m_raoLimits = m_drive->getLimitUDS();
}
//------------------------------------------------------------------------------
//waitForPromise
//------------------------------------------------------------------------------
void RecallTaskInjector::waitForPromise() {
if (accessToDrive)
return;
m_raoFuture.wait();
accessToDrive = true;
m_raoPromise.reset(new std::promise<void>);
m_raoFuture = m_raoPromise->get_future();
}
//------------------------------------------------------------------------------
//setPromise
//------------------------------------------------------------------------------
void RecallTaskInjector::setPromise() {
try {
m_raoPromise->set_value();
} catch (const std::exception &exc) {}
}
//------------------------------------------------------------------------------
//injectBulkRecalls
//------------------------------------------------------------------------------
void RecallTaskInjector::injectBulkRecalls(std::vector<std::unique_ptr<cta::RetrieveJob>>& jobs) {
for (auto it = jobs.begin(); it != jobs.end(); ++it) {
void RecallTaskInjector::injectBulkRecalls() {
uint32_t block_size = 262144;
uint32_t njobs = m_jobs.size();
std::map<uint64_t, uint32_t> filesMap;
std::vector<uint32_t> raoOrder;
if (m_useRAO) {
std::list<castor::tape::SCSI::Structures::RAO::blockLims> files;
for (uint32_t i = 0; i < njobs; i++) {
cta::RetrieveJob *job = m_jobs.at(i).get();
uint64_t fseq = job->selectedTapeFile().fSeq;
castor::tape::SCSI::Structures::RAO::blockLims lims;
strncpy((char*)lims.fseq, std::to_string(fseq).c_str(), sizeof(fseq));
lims.begin = job->selectedTapeFile().blockId;
lims.end = job->archiveFile.fileSize / block_size;
files.push_back(lims);
filesMap[fseq] = i;
if (files.size() == m_raoLimits.maxSupported ||
((i == njobs - 1) && (files.size() >= 10))) {
/* We do a RAO query if:
* 1. the maximum number of files supported by the drive
* for RAO query has been reached
* 2. the end of the jobs list has been reached and there are at least
* 10 unordered files
*/
/* RecallTaskInjector is waiting to have access to the drive in order
* to perform the RAO query
*/
waitForPromise();
m_drive->queryRAO(files, m_raoLimits.maxSupported);
/* Add the RAO sorted files to the new list*/
for (auto fit = files.begin(); fit != files.end(); fit++) {
uint64_t fseq = atoi((char*)fit->fseq);
raoOrder.push_back(filesMap[fseq]);
}
files.clear();
}
}
(*it)->positioningMethod=cta::PositioningMethod::ByBlock;
/* Copy the rest of the files in the new ordered list */
for (auto fit = files.begin(); fit != files.end(); fit++) {
uint64_t fseq = atoi((char*)fit->fseq);
raoOrder.push_back(filesMap[fseq]);
}
files.clear();
}
std::string queryOrderLog = "Query fseq order:";
for (uint32_t i = 0; i < njobs; i++) {
uint32_t index = m_useRAO ? raoOrder.at(i) : i;
cta::RetrieveJob *job = m_jobs.at(index).release();
queryOrderLog += std::to_string(job->selectedTapeFile().fSeq) + " ";
job->positioningMethod=cta::PositioningMethod::ByBlock;
LogContext::ScopedParam sp[]={
LogContext::ScopedParam(m_lc, Param("fileId", (*it)->retrieveRequest.archiveFileID)),
LogContext::ScopedParam(m_lc, Param("fSeq", (*it)->selectedTapeFile().fSeq)),
LogContext::ScopedParam(m_lc, Param("blockID", (*it)->selectedTapeFile().blockId)),
LogContext::ScopedParam(m_lc, Param("dstURL", (*it)->retrieveRequest.dstURL))
LogContext::ScopedParam(m_lc, Param("fileId", job->retrieveRequest.archiveFileID)),
LogContext::ScopedParam(m_lc, Param("fSeq", job->selectedTapeFile().fSeq)),
LogContext::ScopedParam(m_lc, Param("blockID", job->selectedTapeFile().blockId)),
LogContext::ScopedParam(m_lc, Param("dstURL", job->retrieveRequest.dstURL))
};
tape::utils::suppresUnusedVariable(sp);
m_lc.log(cta::log::INFO, "Recall task created");
cta::RetrieveJob *job = it->get();
DiskWriteTask * dwt = new DiskWriteTask(it->release(), m_memManager);
DiskWriteTask * dwt = new DiskWriteTask(job, m_memManager);
TapeReadTask * trt = new TapeReadTask(job, *dwt, m_memManager);
if (accessToDrive)
accessToDrive = false;
m_diskWriter.push(dwt);
m_tapeReader.push(trt);
m_lc.log(cta::log::INFO, "Created tasks for recalling a file");
}
LogContext::ScopedParam sp03(m_lc, Param("nbFile", jobs.size()));
m_lc.log(cta::log::INFO, queryOrderLog);
m_jobs.clear();
LogContext::ScopedParam sp03(m_lc, Param("nbFile", m_jobs.size()));
m_lc.log(cta::log::INFO, "Finished processing batch of recall tasks from client");
}
//------------------------------------------------------------------------------
//synchronousInjection
//------------------------------------------------------------------------------
bool RecallTaskInjector::synchronousInjection()
bool RecallTaskInjector::synchronousFetch()
{
std::vector<std::unique_ptr<cta::RetrieveJob>> jobs;
try {
uint64_t files=0;
uint64_t bytes=0;
......@@ -125,7 +226,7 @@ bool RecallTaskInjector::synchronousInjection()
if(!job.get()) break;
files++;
bytes+=job->archiveFile.fileSize;
jobs.emplace_back(job.release());
m_jobs.emplace_back(job.release());
}
} catch (cta::exception::Exception & ex) {
cta::log::ScopedParamContainer scoped(m_lc);
......@@ -139,12 +240,13 @@ bool RecallTaskInjector::synchronousInjection()
cta::log::ScopedParamContainer scoped(m_lc);
scoped.add("byteSizeThreshold",m_maxBytes)
.add("maxFiles", m_maxFiles);
if(jobs.empty()) {
if(m_jobs.empty()) {
m_lc.log(cta::log::ERR, "No files to recall: empty mount");
return false;
}
else {
injectBulkRecalls(jobs);
if (! m_useRAO)
injectBulkRecalls();
return true;
}
}
......@@ -174,18 +276,18 @@ void RecallTaskInjector::WorkerThread::run()
using cta::log::LogContext;
m_parent.m_lc.pushOrReplace(Param("thread", "RecallTaskInjector"));
m_parent.m_lc.log(cta::log::DEBUG, "Starting RecallTaskInjector thread");
if (m_parent.m_useRAO)
m_parent.injectBulkRecalls();
try{
while (1) {
Request req = m_parent.m_queue.pop();
if (req.end) {
m_parent.m_lc.log(cta::log::INFO,"Received a end notification from tape thread: triggering the end of session.");
m_parent.signalEndDataMovement();
break;
}
m_parent.m_lc.log(cta::log::DEBUG,"RecallJobInjector:run: about to call client interface");
std::vector<std::unique_ptr<cta::RetrieveJob>> jobs;
uint64_t files=0;
uint64_t bytes=0;
while(files<=req.filesRequested && bytes<=req.bytesRequested) {
......@@ -193,12 +295,12 @@ void RecallTaskInjector::WorkerThread::run()
if(!job.get()) break;
files++;
bytes+=job->archiveFile.fileSize;
jobs.emplace_back(job.release());
m_parent.m_jobs.emplace_back(job.release());
}
LogContext::ScopedParam sp01(m_parent.m_lc, Param("transactionId", m_parent.m_retrieveMount.getMountTransactionId()));
if (jobs.empty()) {
if (m_parent.m_jobs.empty()) {
if (req.lastCall) {
m_parent.m_lc.log(cta::log::INFO,"No more file to recall: triggering the end of session.");
m_parent.signalEndDataMovement();
......@@ -207,7 +309,7 @@ void RecallTaskInjector::WorkerThread::run()
m_parent.m_lc.log(cta::log::DEBUG,"In RecallJobInjector::WorkerThread::run(): got empty list, but not last call. NoOp.");
}
} else {
m_parent.injectBulkRecalls(jobs);
m_parent.injectBulkRecalls();
}
} // end of while(1)
} //end of try
......@@ -218,7 +320,6 @@ void RecallTaskInjector::WorkerThread::run()
m_parent.m_lc.logBacktrace(cta::log::ERR,ex.backtrace());
m_parent.m_lc.log(cta::log::ERR,"In RecallJobInjector::WorkerThread::run(): "
"could not retrieve a list of file to recall. End of session");
m_parent.signalEndDataMovement();
m_parent.deleteAllTasks();
}
......
......@@ -29,6 +29,9 @@
#include "common/threading/Thread.hpp"
#include "scheduler/RetrieveJob.hpp"
#include "scheduler/RetrieveMount.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include <future>
namespace castor{
namespace tape{
......@@ -99,7 +102,7 @@ public:
* @param byteSizeThreshold total bytes count at least requested
* @return true if there are jobs to be done, false otherwise
*/
bool synchronousInjection();
bool synchronousFetch();
/**
* Wait for the inner thread to finish
......@@ -110,6 +113,22 @@ public: