Commit be53ba7c authored by Eric Cano's avatar Eric Cano
Browse files

Merge branch 'rao_branch' into 'master'

Rao branch

See merge request !2
parents b7cf5b21 fbd5aa91
......@@ -739,6 +739,11 @@ namespace SCSI {
* Addition for the mode page Length for the Control Data Protection Mode Page
*/
const unsigned char controlDataProtectionModePageLengthAddition = 4;
class modeRAO {
public:
static const uint16_t DEFAULT_RRAO_ALLOCATION = 64000;
};
} // namespace SCSI
} // namespace tape
} // namespace castor
......@@ -172,6 +172,16 @@ namespace SCSI {
*((uint16_t *) t) = htons(val);
}
/**
* Helper function setting in place a 64 bits SCSI number from a value
* expressed in the local endianness.
* @param t pointer to the char array at the 64 bits value position.
* @param val the value.
*/
inline void setU64(unsigned char(& t)[8], uint64_t val) {
*((uint64_t *) t) = htobe64(val);
}
/**
* Inquiry CDB as described in SPC-4.
*/
......@@ -1170,6 +1180,140 @@ namespace SCSI {
unsigned char keyData[SCSI::encryption::ENC_KEY_LENGTH];
};
}
namespace RAO {
/**
* Receive RAO Command Descriptor Block (CDB)
*/
class recieveRAO_t {
public:
recieveRAO_t() {
zeroStruct(this);
opcode = SCSI::Commands::MAINTENANCE_IN;
}
unsigned char opcode;
unsigned char serviceAction :5;
unsigned char :2;
unsigned char udsLimits :1;
unsigned char raoListOffset[4];
unsigned char allocationLength[4];
unsigned char udsType :3;
unsigned char :5;
unsigned char control;
};
/**
* UDS (User Data Segments) limits page
*/
class udsLimitsPage_t {
public:
udsLimitsPage_t() {
zeroStruct(this);
}
unsigned int maxSupported;
unsigned int maxSize;
};
/**
* Generate RAO CDB
*/
class generateRAO_t {
public:
generateRAO_t() {
zeroStruct(this);
opcode = SCSI::Commands::MAINTENANCE_OUT;
raoProcess = 2;
}
unsigned char opcode;
unsigned char serviceAction :5;
unsigned char :3;
unsigned char raoProcess :3;
unsigned char :5;
unsigned char udsType :3;
unsigned char :5;
unsigned char reserved[2];
unsigned char paramsListLength[4];
unsigned char reserved2;
unsigned char control;
};
class udsDescriptor {
public:
udsDescriptor() {
zeroStruct(this);
setU16(descriptorLength, 0x1e);
}
unsigned char descriptorLength[2];
unsigned char reserved[3];
unsigned char udsName[10];
unsigned char partitionNumber;
unsigned char beginLogicalObjID[8];
unsigned char endLogicalObjID[8];
};
/**
* RAO list struct
*/
class raoList {
public:
raoList() {
zeroStruct(this);
}
unsigned char raoProcess :3;
unsigned char :5;
unsigned char status :3;
unsigned char :5;
unsigned char res[2];
unsigned char raoDescriptorListLength[4];
udsDescriptor udsDescriptors[2000];
};
/**
* Generate RAO parameters
*/
class generateRAOParams_t {
public:
generateRAOParams_t() {
zeroStruct(this);
}
unsigned char res[4];
unsigned char additionalData[4];
udsDescriptor userDataSegmentDescriptors[2000];
};
/**
* Block Limits
*/
class blockLims {
public:
blockLims() {
zeroStruct(this);
}
unsigned char fseq[10];
uint64_t begin;
uint64_t end;
};
}
template <size_t n>
/**
......
......@@ -41,6 +41,7 @@ castor::tape::tapeserver::daemon::DataTransferConfig::DataTransferConfig()
maxFilesBeforeFlush(0),
nbDiskThreads(0),
useLbp(false),
useRAO(false),
externalEncryptionKeyScript("") {}
//------------------------------------------------------------------------------
......@@ -85,6 +86,8 @@ castor::tape::tapeserver::daemon::DataTransferConfig
"TapeServer", "XrootTimeout", 0, log);
const std::string useLBP = castorConf.getConfEntString(
"TapeServer", "UseLogicalBlockProtection", "no", log);
const std::string useRAO = castorConf.getConfEntString(
"TapeServer", "UseRecommendedAccessOrder", "no", log);
if (!strcasecmp(useLBP.c_str(), "yes") || !strcmp(useLBP.c_str(), "1")) {
config.useLbp = true;
......@@ -92,6 +95,12 @@ castor::tape::tapeserver::daemon::DataTransferConfig
config.useLbp = false;
}
if (!strcasecmp(useRAO.c_str(), "yes") || !strcmp(useRAO.c_str(), "1")) {
config.useRAO = true;
} else {
config.useRAO = false;
}
config.externalEncryptionKeyScript = castorConf.getConfEntString("TapeServer",
"ExternalEncryptionKeyScript", "");
......
......@@ -112,6 +112,12 @@ struct DataTransferConfig {
*/
bool useLbp;
/**
* The boolean variable describing to use on not to use Recommended
* Access Order
*/
bool useRAO;
/**
* The path to the operator provided encyption control script (or empty string)
*/
......
......@@ -203,7 +203,7 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
TapeReadSingleThread trst(*drive, m_mc, tsr, m_volInfo,
m_castorConf.bulkRequestRecallMaxFiles,m_capUtils,rwd,lc,rrp,
m_castorConf.useLbp, m_castorConf.externalEncryptionKeyScript);
m_castorConf.useLbp, m_castorConf.useRAO, m_castorConf.externalEncryptionKeyScript);
DiskWriteThreadPool dwtp(m_castorConf.nbDiskThreads,
rrp,
rwd,
......@@ -219,10 +219,18 @@ castor::tape::tapeserver::daemon::Session::EndOfSessionAction
trst.setTaskInjector(&rti);
rrp.setWatchdog(rwd);
rti.setDriveInterface(trst.getDriveReference());
// We are now ready to put everything in motion. First step is to check
// we get any concrete job to be done from the client (via the task injector)
cta::utils::Timer timer;
if (rti.synchronousInjection()) { //adapt the recall task injector (starting from synchronousInjection)
// The RecallTaskInjector and the TapeReadSingleThread share the promise
if (m_castorConf.useRAO) {
rti.initRAO();
}
if (rti.synchronousFetch()) { //adapt the recall task injector (starting from synchronousFetch)
// We got something to recall. Time to start the machinery
trst.setWaitForInstructionsTime(timer.secs());
rwd.startThread();
......
......@@ -660,6 +660,218 @@ TEST_P(DataTransferSessionTest, DataTransferSessionWrongRecall) {
"mountTotalReadRetries=\"25\" mountTotalWriteRetries=\"25\" mountWriteTransients=\"10\""));
}
TEST_P(DataTransferSessionTest, DataTransferSessionRAORecall) {
// 0) Prepare the logger for everyone
cta::log::StringLogger logger("tapeServerUnitTest",cta::log::DEBUG);
cta::log::LogContext logContext(logger);
setupDefaultCatalogue();
// 1) prepare the fake scheduler
std::string vid = s_vid;
// cta::MountType::Enum mountType = cta::MountType::RETRIEVE;
// 3) Prepare the necessary environment (logger, plus system wrapper),
castor::tape::System::mockWrapper mockSys;
mockSys.delegateToFake();
mockSys.disableGMockCallsCounting();
mockSys.fake.setupForVirtualDriveSLC6();
//delete is unnecessary
//pointer with ownership will be passed to the application,
//which will do the delete
mockSys.fake.m_pathToDrive["/dev/nst0"] = new castor::tape::tapeserver::drive::FakeDrive;
// 4) Create the scheduler
auto & catalogue = getCatalogue();
auto & scheduler = getScheduler();
// Always use the same requester
const cta::common::dataStructures::SecurityIdentity requester;
// List to remember the path of each remote file so that the existance of the
// files can be tested for at the end of the test
std::list<std::string> remoteFilePaths;
// 5) Create the environment for the migration to happen (library + tape)
const std::string libraryComment = "Library comment";
catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName,
libraryComment);
{
auto libraries = catalogue.getLogicalLibraries();
ASSERT_EQ(1, libraries.size());
ASSERT_EQ(s_libraryName, libraries.front().name);
ASSERT_EQ(libraryComment, libraries.front().comment);
}
const uint64_t capacityInBytes = 12345678;
const std::string tapeComment = "Tape comment";
bool notDisabled = false;
bool notFull = false;
catalogue.createTape(s_adminOnAdminHost, s_vid, s_libraryName, s_tapePoolName, capacityInBytes,
notDisabled, notFull, tapeComment);
int MAX_RECALLS = 50;
int MAX_BULK_RECALLS = 31;
std::vector<int> expectedOrder;
std::vector<std::string> expectedFseqOrderLog;
// 6) Prepare files for reading by writing them to the mock system
{
// Label the tape
castor::tape::tapeFile::LabelSession ls(*mockSys.fake.m_pathToDrive["/dev/nst0"],
s_vid, false);
mockSys.fake.m_pathToDrive["/dev/nst0"]->rewind();
// And write to it
castor::tape::tapeserver::daemon::VolumeInfo volInfo;
volInfo.vid=s_vid;
castor::tape::tapeFile::WriteSession ws(*mockSys.fake.m_pathToDrive["/dev/nst0"],
volInfo , 0, true, false);
// Write a few files on the virtual tape and modify the archive name space
// so that it is in sync
uint8_t data[1000];
size_t archiveFileSize=sizeof(data);
castor::tape::SCSI::Structures::zeroStruct(&data);
int fseq;
bool isFirst = true;
for (fseq=1; fseq <= MAX_RECALLS ; fseq ++) {
// Create a path to a remote destination file
std::ostringstream remoteFilePath;
remoteFilePath << "file://" << m_tmpDir << "/test" << fseq;
remoteFilePaths.push_back(remoteFilePath.str());
// Create an archive file entry in the archive namespace
cta::catalogue::TapeFileWritten tapeFileWritten;
// Write the file to tape
cta::MockArchiveMount mam(catalogue);
std::unique_ptr<cta::ArchiveJob> aj(new cta::MockArchiveJob(mam, catalogue));
aj->tapeFile.fSeq = fseq;
aj->archiveFile.archiveFileID = fseq;
castor::tape::tapeFile::WriteFile wf(&ws, *aj, archiveFileSize);
tapeFileWritten.blockId = wf.getBlockId();
// Write the data (one block)
wf.write(data, archiveFileSize);
// Close the file
wf.close();
// Create file entry in the archive namespace
tapeFileWritten.archiveFileId=fseq;
tapeFileWritten.checksumType="ADLER32";
tapeFileWritten.checksumValue=cta::utils::getAdler32String(data, archiveFileSize);
tapeFileWritten.vid=volInfo.vid;
tapeFileWritten.size=archiveFileSize;
tapeFileWritten.fSeq=fseq;
tapeFileWritten.copyNb=1;
tapeFileWritten.compressedSize=archiveFileSize; // No compression
tapeFileWritten.diskInstance = s_diskInstance;
tapeFileWritten.diskFileId = fseq;
tapeFileWritten.diskFilePath = remoteFilePath.str();
tapeFileWritten.diskFileUser = s_userName;
tapeFileWritten.diskFileGroup = "someGroup";
tapeFileWritten.diskFileRecoveryBlob = "B106";
tapeFileWritten.storageClassName = s_storageClassName;
tapeFileWritten.tapeDrive = "drive0";
catalogue.filesWrittenToTape(std::set<cta::catalogue::TapeFileWritten>{tapeFileWritten});
// Schedule the retrieval of the file
std::string diskInstance="disk_instance";
cta::common::dataStructures::RetrieveRequest rReq;
rReq.archiveFileID=fseq;
rReq.requester.name = s_userName;
rReq.requester.group = "someGroup";
rReq.dstURL = remoteFilePaths.back();
std::list<std::string> archiveFilePaths;
scheduler.queueRetrieve(diskInstance, rReq, logContext);
expectedOrder.push_back(fseq);
bool apply_rao = false;
bool add_expected = false;
if (MAX_BULK_RECALLS < 2) {
if (expectedOrder.size() % MAX_BULK_RECALLS == 0 ||
fseq % MAX_RECALLS == 0) {
add_expected = true;
}
}
else if (MAX_BULK_RECALLS >= 30) {
if ((expectedOrder.size() % 30 == 0) ||
(fseq % MAX_RECALLS == 0) || (fseq % MAX_BULK_RECALLS == 0)) {
apply_rao = true & isFirst;
add_expected = true;
}
}
else if ((fseq % MAX_BULK_RECALLS == 0) || (fseq % MAX_RECALLS == 0)) {
apply_rao = true & isFirst;
add_expected = true;
}
if (apply_rao) {
std::reverse(expectedOrder.begin(), expectedOrder.end());
isFirst = false;
}
if (add_expected) {
std::stringstream expectedLogLine;
std::copy(expectedOrder.begin(), expectedOrder.end(),
std::ostream_iterator<int>(expectedLogLine, " "));
expectedFseqOrderLog.push_back(expectedLogLine.str());
expectedOrder.clear();
}
}
}
// 6) Report the drive's existence and put it up in the drive register.
cta::tape::daemon::TpconfigLine driveConfig("T10D6116", "TestLogicalLibrary", "/dev/tape_T10D6116", "manual");
cta::common::dataStructures::DriveInfo driveInfo;
driveInfo.driveName=driveConfig.unitName;
driveInfo.logicalLibrary=driveConfig.rawLibrarySlot;
// We need to create the drive in the registry before being able to put it up.
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down);
scheduler.setDesiredDriveState(s_adminOnAdminHost, driveConfig.unitName, true, false);
// 7) Create the data transfer session
DataTransferConfig castorConf;
castorConf.bufsz = 1024*1024; // 1 MB memory buffers
castorConf.nbBufs = 10;
castorConf.bulkRequestRecallMaxBytes = UINT64_C(100)*1000*1000*1000;
castorConf.bulkRequestRecallMaxFiles = MAX_BULK_RECALLS - 1;
castorConf.nbDiskThreads = 1;
castorConf.useRAO = true;
cta::log::DummyLogger dummyLog("dummy");
cta::mediachanger::MediaChangerFacade mc(dummyLog);
cta::server::ProcessCap capUtils;
castor::messages::TapeserverProxyDummy initialProcess;
castor::tape::tapeserver::daemon::DataTransferSession sess("tapeHost", logger, mockSys,
driveConfig, mc, initialProcess, capUtils, castorConf, scheduler);
// 8) Run the data transfer session
sess.execute();
// 9) Check the session git the correct VID
ASSERT_EQ(s_vid, sess.getVid());
// 10) Check the remote files exist and have the correct size
for(auto & path: remoteFilePaths) {
struct stat statBuf;
bzero(&statBuf, sizeof(statBuf));
const int statRc = stat(path.substr(7).c_str(), &statBuf); //remove the "file://" for stat-ing
ASSERT_EQ(0, statRc);
ASSERT_EQ(1000, statBuf.st_size); //same size of data
}
// 10) Check logs
std::string logToCheck = logger.getLog();
logToCheck += "";
ASSERT_NE(std::string::npos, logToCheck.find("firmwareVersion=\"123A\" serialNumber=\"123456\" "
"mountTotalCorrectedReadErrors=\"5\" mountTotalReadBytesProcessed=\"4096\" "
"mountTotalUncorrectedReadErrors=\"1\" mountTotalNonMediumErrorCounts=\"2\""));
ASSERT_NE(std::string::npos, logToCheck.find("firmwareVersion=\"123A\" serialNumber=\"123456\" lifetimeMediumEfficiencyPrct=\"100\" "
"mountReadEfficiencyPrct=\"100\" mountWriteEfficiencyPrct=\"100\" "
"mountReadTransients=\"10\" "
"mountServoTemps=\"10\" mountServoTransients=\"5\" mountTemps=\"100\" "
"mountTotalReadRetries=\"25\" mountTotalWriteRetries=\"25\" mountWriteTransients=\"10\""));
for (std::string s : expectedFseqOrderLog) {
ASSERT_NE(std::string::npos, logToCheck.find(s));
}
}
TEST_P(DataTransferSessionTest, DataTransferSessionNoSuchDrive) {
// 0) Prepare the logger for everyone
......
......@@ -27,6 +27,8 @@
#include "castor/tape/tapeserver/daemon/TapeReadTask.hpp"
#include "castor/tape/tapeserver/daemon/TapeReadSingleThread.hpp"
#include "castor/tape/tapeserver/daemon/VolumeInfo.hpp"
#include "castor/tape/tapeserver/SCSI/Structures.hpp"
#include "castor/tape/tapeserver/drive/DriveInterface.hpp"
#include "scheduler/RetrieveJob.hpp"
#include <stdint.h>
......@@ -46,8 +48,8 @@ RecallTaskInjector::RecallTaskInjector(RecallMemoryManager & mm,
uint64_t maxFiles, uint64_t byteSizeThreshold,cta::log::LogContext lc) :
m_thread(*this),m_memManager(mm),
m_tapeReader(tapeReader),m_diskWriter(diskWriter),
m_retrieveMount(retrieveMount),m_lc(lc),m_maxFiles(maxFiles),m_maxBytes(byteSizeThreshold)
{}
m_retrieveMount(retrieveMount),m_lc(lc),m_maxFiles(maxFiles),m_maxBytes(byteSizeThreshold),
m_useRAO(false) {}
//------------------------------------------------------------------------------
//destructor
//------------------------------------------------------------------------------
......@@ -82,41 +84,126 @@ void RecallTaskInjector::startThreads() {
m_thread.start();
}
//------------------------------------------------------------------------------
//setDriveInterface
//------------------------------------------------------------------------------
void RecallTaskInjector::setDriveInterface(castor::tape::tapeserver::drive::DriveInterface *di) {
m_drive = di;
}
//------------------------------------------------------------------------------
//initRAO
//------------------------------------------------------------------------------
void RecallTaskInjector::initRAO() {
m_useRAO = true;
m_raoFuture = m_raoPromise.get_future();
m_raoLimits = m_drive->getLimitUDS();
}
//------------------------------------------------------------------------------
//waitForPromise
//------------------------------------------------------------------------------
bool RecallTaskInjector::waitForPromise() {
std::chrono::milliseconds duration (1000);
std::future_status status = m_raoFuture.wait_for(duration);
if (status == std::future_status::ready)
return true;
return false;
}
//------------------------------------------------------------------------------
//setPromise
//------------------------------------------------------------------------------
void RecallTaskInjector::setPromise() {
try {
m_raoPromise.set_value();
} catch (const std::exception &exc) {
throw cta::exception::Exception(std::string("In RecallTaskInjector::setPromise() got std::exception: ") + exc.what());
}
}
//------------------------------------------------------------------------------
//injectBulkRecalls
//------------------------------------------------------------------------------
void RecallTaskInjector::injectBulkRecalls(std::vector<std::unique_ptr<cta::RetrieveJob>>& jobs) {
for (auto it = jobs.begin(); it != jobs.end(); ++it) {
void RecallTaskInjector::injectBulkRecalls() {
uint32_t block_size = 262144;
uint32_t njobs = m_jobs.size();
std::vector<uint32_t> raoOrder;
if (m_useRAO) {
std::list<castor::tape::SCSI::Structures::RAO::blockLims> files;
for (uint32_t i = 0; i < njobs; i++) {
cta::RetrieveJob *job = m_jobs.at(i).get();
castor::tape::SCSI::Structures::RAO::blockLims lims;
strncpy((char*)lims.fseq, std::to_string(i).c_str(), sizeof(i));
lims.begin = job->selectedTapeFile().blockId;
lims.end = job->selectedTapeFile().blockId + 8 +
/* ceiling the number of blocks */
((job->archiveFile.fileSize + block_size - 1) / block_size);
files.push_back(lims);
if (files.size() == m_raoLimits.maxSupported ||
((i == njobs - 1) && (files.size() > 1))) {
/* We do a RAO query if:
* 1. the maximum number of files supported by the drive
* for RAO query has been reached
* 2. the end of the jobs list has been reached and there are at least
* 2 unordered files
*/
m_drive->queryRAO(files, m_raoLimits.maxSupported);
/* Add the RAO sorted files to the new list*/
for (auto fit = files.begin(); fit != files.end(); fit++) {
uint64_t id = atoi((char*)fit->fseq);
raoOrder.push_back(id);
}
files.clear();
}
}
(*it)->positioningMethod=cta::PositioningMethod::ByBlock;
/* Copy the rest of the files in the new ordered list */
for (auto fit = files.begin(); fit != files.end(); fit++) {
uint64_t id = atoi((char*)fit->fseq);
raoOrder.push_back(id);
}
files.clear();
}
std::string queryOrderLog = "Query fseq order:";
for (uint32_t i = 0; i < njobs; i++) {