Commit 12529714 authored by Cristina Moraru's avatar Cristina Moraru
Browse files

Modify RAO synchronization



Change RAO synchronization:
  * move all RAO operations right after mounting,
before the exection of any tape read task
  * while waiting for the tape to be mounted, the
ReacallTaskInjector fetches more tape read jobs
  * once the tape is mounted and the RecallTaskInjector
gets access to the drive performs RAO query for all queued
files
  * after performing RAO query for this files, RAO is deactivated
Signed-off-by: default avatarCristina Moraru <cristina-gabriela.moraru@cern.ch>
parent 97f6f502
......@@ -706,8 +706,8 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
catalogue.createTape(s_adminOnAdminHost, s_vid, s_libraryName, s_tapePoolName, cta::nullopt, capacityInBytes,
notDisabled, notFull, tapeComment);
int MAX_RECALLS = 62;
int MAX_BULK_RECALLS = 27;
int MAX_RECALLS = 50;
int MAX_BULK_RECALLS = 31;
std::vector<int> expectedOrder;
std::vector<std::string> expectedFseqOrderLog;
......@@ -729,6 +729,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
size_t archiveFileSize=sizeof(data);
castor::tape::SCSI::Structures::zeroStruct(&data);
int fseq;
bool isFirst = true;
for (fseq=1; fseq <= MAX_RECALLS ; fseq ++) {
// Create a path to a remote destination file
std::ostringstream remoteFilePath;
......@@ -783,7 +784,7 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
bool apply_rao = false;
bool add_expected = false;
if (MAX_BULK_RECALLS < 10) {
if (MAX_BULK_RECALLS < 2) {
if (expectedOrder.size() % MAX_BULK_RECALLS == 0 ||
fseq % MAX_RECALLS == 0) {
add_expected = true;
......@@ -791,23 +792,18 @@ TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
}
else if (MAX_BULK_RECALLS >= 30) {
if ((expectedOrder.size() % 30 == 0) ||
(((fseq % MAX_RECALLS == 0) || (fseq % MAX_BULK_RECALLS == 0)) &&
(expectedOrder.size() >= 10))) {
apply_rao = true;
add_expected = true;
}
else if (((fseq % MAX_RECALLS == 0) || (fseq % MAX_BULK_RECALLS == 0)) &&
(expectedOrder.size() < 10)) {
(fseq % MAX_RECALLS == 0) || (fseq % MAX_BULK_RECALLS == 0)) {
apply_rao = true & isFirst;
add_expected = true;
}
}
else if ((fseq % MAX_BULK_RECALLS == 0) || (fseq % MAX_RECALLS == 0)) {
if (expectedOrder.size() >= 10)
apply_rao = true;
add_expected = true;
apply_rao = true & isFirst;
add_expected = true;
}
if (apply_rao) {
std::reverse(expectedOrder.begin(), expectedOrder.end());
isFirst = false;
}
if (add_expected) {
std::stringstream expectedLogLine;
......
......@@ -99,28 +99,25 @@ void RecallTaskInjector::setDriveInterface(castor::tape::tapeserver::drive::Driv
//------------------------------------------------------------------------------
void RecallTaskInjector::initRAO() {
m_useRAO = true;
accessToDrive = false;
m_raoPromise.reset(new std::promise<void>);
m_raoFuture = m_raoPromise->get_future();
m_raoFuture = m_raoPromise.get_future();
m_raoLimits = m_drive->getLimitUDS();
}
//------------------------------------------------------------------------------
//waitForPromise
//------------------------------------------------------------------------------
void RecallTaskInjector::waitForPromise() {
if (accessToDrive)
return;
m_raoFuture.wait();
accessToDrive = true;
m_raoPromise.reset(new std::promise<void>);
m_raoFuture = m_raoPromise->get_future();
bool RecallTaskInjector::waitForPromise() {
std::chrono::milliseconds duration (1000);
std::future_status status = m_raoFuture.wait_for(duration);
if (status == std::future_status::ready)
return true;
return false;
}
//------------------------------------------------------------------------------
//setPromise
//------------------------------------------------------------------------------
void RecallTaskInjector::setPromise() {
try {
m_raoPromise->set_value();
m_raoPromise.set_value();
} catch (const std::exception &exc) {}
}
//------------------------------------------------------------------------------
......@@ -130,7 +127,6 @@ void RecallTaskInjector::injectBulkRecalls() {
uint32_t block_size = 262144;
uint32_t njobs = m_jobs.size();
std::map<uint64_t, uint32_t> filesMap;
std::vector<uint32_t> raoOrder;
if (m_useRAO) {
......@@ -138,34 +134,30 @@ void RecallTaskInjector::injectBulkRecalls() {
for (uint32_t i = 0; i < njobs; i++) {
cta::RetrieveJob *job = m_jobs.at(i).get();
uint64_t fseq = job->selectedTapeFile().fSeq;
castor::tape::SCSI::Structures::RAO::blockLims lims;
strncpy((char*)lims.fseq, std::to_string(fseq).c_str(), sizeof(fseq));
strncpy((char*)lims.fseq, std::to_string(i).c_str(), sizeof(i));
lims.begin = job->selectedTapeFile().blockId;
lims.end = job->archiveFile.fileSize / block_size;
lims.end = job->selectedTapeFile().blockId + 8 +
/* ceiling the number of blocks */
((job->archiveFile.fileSize + block_size - 1) / block_size);
files.push_back(lims);
filesMap[fseq] = i;
if (files.size() == m_raoLimits.maxSupported ||
((i == njobs - 1) && (files.size() >= 10))) {
((i == njobs - 1) && (files.size() > 1))) {
/* We do a RAO query if:
* 1. the maximum number of files supported by the drive
* for RAO query has been reached
* 2. the end of the jobs list has been reached and there are at least
* 10 unordered files
* 2 unordered files
*/
/* RecallTaskInjector is waiting to have access to the drive in order
* to perform the RAO query
*/
waitForPromise();
m_drive->queryRAO(files, m_raoLimits.maxSupported);
/* Add the RAO sorted files to the new list*/
for (auto fit = files.begin(); fit != files.end(); fit++) {
uint64_t fseq = atoi((char*)fit->fseq);
raoOrder.push_back(filesMap[fseq]);
uint64_t id = atoi((char*)fit->fseq);
raoOrder.push_back(id);
}
files.clear();
}
......@@ -173,8 +165,8 @@ void RecallTaskInjector::injectBulkRecalls() {
/* Copy the rest of the files in the new ordered list */
for (auto fit = files.begin(); fit != files.end(); fit++) {
uint64_t fseq = atoi((char*)fit->fseq);
raoOrder.push_back(filesMap[fseq]);
uint64_t id = atoi((char*)fit->fseq);
raoOrder.push_back(id);
}
files.clear();
}
......@@ -200,9 +192,6 @@ void RecallTaskInjector::injectBulkRecalls() {
DiskWriteTask * dwt = new DiskWriteTask(job, m_memManager);
TapeReadTask * trt = new TapeReadTask(job, *dwt, m_memManager);
if (accessToDrive)
accessToDrive = false;
m_diskWriter.push(dwt);
m_tapeReader.push(trt);
......@@ -276,8 +265,23 @@ void RecallTaskInjector::WorkerThread::run()
using cta::log::LogContext;
m_parent.m_lc.pushOrReplace(Param("thread", "RecallTaskInjector"));
m_parent.m_lc.log(cta::log::DEBUG, "Starting RecallTaskInjector thread");
if (m_parent.m_useRAO)
if (m_parent.m_useRAO) {
bool moreJobs = true;
/* RecallTaskInjector is waiting to have access to the drive in order
* to perform the RAO query; while waiting, it is fetching more jobs
*/
while (true) {
if (m_parent.waitForPromise()) break;
if (moreJobs) {
/* Fetching while there are still jobs to fetch
* Otherwise, we are just waiting for the promise
*/
moreJobs = m_parent.synchronousFetch();
}
}
m_parent.injectBulkRecalls();
m_parent.m_useRAO = false;
}
try{
while (1) {
Request req = m_parent.m_queue.pop();
......
......@@ -125,7 +125,7 @@ public:
*/
void initRAO();
void waitForPromise();
bool waitForPromise();
void setPromise();
......@@ -229,10 +229,8 @@ private:
* The promise for reordering the read tasks according to RAO by the
* RecallTaskInjector. The tasks to be run are placed in the m_tasks queue
*/
std::unique_ptr<std::promise<void>> m_raoPromise;
std::promise<void> m_raoPromise;
std::future<void> m_raoFuture;
bool accessToDrive = false;
};
} //end namespace daemon
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment