Commit fa44b29f authored by Steven Murray's avatar Steven Murray
Browse files

cta/CTA#777 Minimize mounts for dual copy tape pool recalls

Added the following unit-test for the temporary hack:

  SchedulerTest.archive_report_and_retrieve_new_dual_copy_file
parent 05de51f7
......@@ -621,6 +621,431 @@ TEST_P(SchedulerTest, archive_report_and_retrieve_new_file) {
}
}
TEST_P(SchedulerTest, archive_report_and_retrieve_new_dual_copy_file) {
using namespace cta;
Scheduler &scheduler = getScheduler();
auto &catalogue = getCatalogue();
// Setup catalogue for dual tape copies
const std::string tapePool1Name = "tape_pool_1";
const std::string tapePool2Name = "tape_pool_2";
const std::string dualCopyStorageClassName = "dual_copy";
{
using namespace cta;
const std::string mountPolicyName = s_mountPolicyName;
const uint64_t archivePriority = s_archivePriority;
const uint64_t minArchiveRequestAge = s_minArchiveRequestAge;
const uint64_t retrievePriority = s_retrievePriority;
const uint64_t minRetrieveRequestAge = s_minRetrieveRequestAge;
const std::string mountPolicyComment = "create mount group";
catalogue::CreateMountPolicyAttributes mountPolicy;
mountPolicy.name = mountPolicyName;
mountPolicy.archivePriority = archivePriority;
mountPolicy.minArchiveRequestAge = minArchiveRequestAge;
mountPolicy.retrievePriority = retrievePriority;
mountPolicy.minRetrieveRequestAge = minRetrieveRequestAge;
mountPolicy.comment = mountPolicyComment;
ASSERT_TRUE(catalogue.getMountPolicies().empty());
catalogue.createMountPolicy(
s_adminOnAdminHost,
mountPolicy);
const std::list<common::dataStructures::MountPolicy> groups = catalogue.getMountPolicies();
ASSERT_EQ(1, groups.size());
const common::dataStructures::MountPolicy group = groups.front();
ASSERT_EQ(mountPolicyName, group.name);
ASSERT_EQ(archivePriority, group.archivePriority);
ASSERT_EQ(minArchiveRequestAge, group.archiveMinRequestAge);
ASSERT_EQ(retrievePriority, group.retrievePriority);
ASSERT_EQ(minRetrieveRequestAge, group.retrieveMinRequestAge);
ASSERT_EQ(mountPolicyComment, group.comment);
const std::string ruleComment = "create requester mount-rule";
catalogue.createRequesterMountRule(s_adminOnAdminHost, mountPolicyName, s_diskInstance, s_userName, ruleComment);
const std::list<common::dataStructures::RequesterMountRule> rules = catalogue.getRequesterMountRules();
ASSERT_EQ(1, rules.size());
const common::dataStructures::RequesterMountRule rule = rules.front();
ASSERT_EQ(s_userName, rule.name);
ASSERT_EQ(mountPolicyName, rule.mountPolicy);
ASSERT_EQ(ruleComment, rule.comment);
ASSERT_EQ(s_adminOnAdminHost.username, rule.creationLog.username);
ASSERT_EQ(s_adminOnAdminHost.host, rule.creationLog.host);
ASSERT_EQ(rule.creationLog, rule.lastModificationLog);
cta::common::dataStructures::VirtualOrganization vo;
vo.name = s_vo;
vo.comment = "comment";
vo.writeMaxDrives = 1;
vo.readMaxDrives = 1;
catalogue.createVirtualOrganization(s_adminOnAdminHost,vo);
common::dataStructures::StorageClass storageClass;
storageClass.name = dualCopyStorageClassName;
storageClass.nbCopies = 2;
storageClass.vo.name = vo.name;
storageClass.comment = "create dual copy storage class";
catalogue.createStorageClass(s_adminOnAdminHost, storageClass);
const uint16_t nbPartialTapes = 1;
const std::string tapePool1Comment = "Tape-pool for copy number 1";
const std::string tapePool2Comment = "Tape-pool for copy number 2";
const bool tapePoolEncryption = false;
const cta::optional<std::string> tapePoolSupply("value for the supply pool mechanism");
catalogue.createTapePool(s_adminOnAdminHost, tapePool1Name, vo.name, nbPartialTapes, tapePoolEncryption,
tapePoolSupply, tapePool1Comment);
catalogue.createTapePool(s_adminOnAdminHost, tapePool2Name, vo.name, nbPartialTapes, tapePoolEncryption,
tapePoolSupply, tapePool2Comment);
const std::string archiveRoute1Comment = "Archive-route for copy number 1";
const std::string archiveRoute2Comment = "Archive-route for copy number 2";
const uint32_t archiveRoute1CopyNb = 1;
const uint32_t archiveRoute2CopyNb = 2;
catalogue.createArchiveRoute(s_adminOnAdminHost, dualCopyStorageClassName, archiveRoute1CopyNb, tapePool1Name,
archiveRoute1Comment);
catalogue.createArchiveRoute(s_adminOnAdminHost, dualCopyStorageClassName, archiveRoute2CopyNb, tapePool2Name,
archiveRoute1Comment);
cta::catalogue::MediaType mediaType;
mediaType.name = s_mediaType;
mediaType.capacityInBytes = s_mediaTypeCapacityInBytes;
mediaType.cartridge = "cartridge";
mediaType.comment = "comment";
catalogue.createMediaType(s_adminOnAdminHost,mediaType);
}
#ifdef STDOUT_LOGGING
log::StdoutLogger dl("dummy", "unitTest");
#else
log::DummyLogger dl("", "");
#endif
log::LogContext lc(dl);
uint64_t archiveFileId;
{
// Queue an archive request.
cta::common::dataStructures::EntryLog creationLog;
creationLog.host="host2";
creationLog.time=0;
creationLog.username="admin1";
cta::common::dataStructures::DiskFileInfo diskFileInfo;
diskFileInfo.gid=GROUP_2;
diskFileInfo.owner_uid=CMS_USER;
diskFileInfo.path="path/to/file";
cta::common::dataStructures::ArchiveRequest request;
request.checksumBlob.insert(cta::checksum::ADLER32, 0x1234abcd);
request.creationLog=creationLog;
request.diskFileInfo=diskFileInfo;
request.diskFileID="diskFileID";
request.fileSize=100*1000*1000;
cta::common::dataStructures::RequesterIdentity requester;
requester.name = s_userName;
requester.group = "userGroup";
request.requester = requester;
request.srcURL="srcURL";
request.storageClass=dualCopyStorageClassName;
archiveFileId = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, request.storageClass, request.requester, lc);
scheduler.queueArchiveWithGivenId(archiveFileId, s_diskInstance, request, lc);
}
scheduler.waitSchedulerDbSubthreadsComplete();
// Check that we have the file in the queues
// TODO: for this to work all the time, we need an index of all requests
// (otherwise we miss the selected ones).
// Could also be limited to querying by ID (global index needed)
bool found=false;
for (auto & tp: scheduler.getPendingArchiveJobs(lc)) {
for (auto & req: tp.second) {
if (req.archiveFileID == archiveFileId)
found = true;
}
}
ASSERT_TRUE(found);
// Create the environment for the migration of copy 1 to happen (library +
// tape)
const std::string libraryComment = "Library comment";
const bool libraryIsDisabled = true;
catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName,
libraryIsDisabled, libraryComment);
{
auto libraries = catalogue.getLogicalLibraries();
ASSERT_EQ(1, libraries.size());
ASSERT_EQ(s_libraryName, libraries.front().name);
ASSERT_EQ(libraryComment, libraries.front().comment);
}
const std::string copy1TapeVid = "copy_1_tape";
{
using namespace cta;
catalogue::CreateTapeAttributes tape;
tape.vid = copy1TapeVid;
tape.mediaType = s_mediaType;
tape.vendor = s_vendor;
tape.logicalLibraryName = s_libraryName;
tape.tapePoolName = tapePool1Name;
tape.full = false;
tape.state = common::dataStructures::Tape::ACTIVE;
tape.comment = "Comment";
catalogue.createTape(s_adminOnAdminHost, tape);
}
const std::string driveName = "tape_drive";
catalogue.tapeLabelled(copy1TapeVid, driveName);
// Archive copy 1 to tape
{
// Emulate a tape server by asking for a mount and then a file (and succeed the transfer)
std::unique_ptr<cta::TapeMount> mount;
// This first initialization is normally done by the dataSession function.
cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName };
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc);
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc);
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
//Test that no mount is available when a logical library is disabled
ASSERT_EQ(nullptr, mount.get());
catalogue.setLogicalLibraryDisabled(s_adminOnAdminHost,s_libraryName,false);
//continue our test
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE(nullptr, mount.get());
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType());
auto & osdb=getSchedulerDB();
auto mi=osdb.getMountInfo(lc);
ASSERT_EQ(1, mi->existingOrNextMounts.size());
ASSERT_EQ(tapePool1Name, mi->existingOrNextMounts.front().tapePool);
ASSERT_EQ(copy1TapeVid, mi->existingOrNextMounts.front().vid);
std::unique_ptr<cta::ArchiveMount> archiveMount;
archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release()));
ASSERT_NE(nullptr, archiveMount.get());
std::list<std::unique_ptr<cta::ArchiveJob>> archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
ASSERT_NE(nullptr, archiveJobBatch.front().get());
std::unique_ptr<ArchiveJob> archiveJob = std::move(archiveJobBatch.front());
archiveJob->tapeFile.blockId = 1;
archiveJob->tapeFile.fSeq = 1;
archiveJob->tapeFile.checksumBlob.insert(cta::checksum::ADLER32, 0x1234abcd);
archiveJob->tapeFile.fileSize = archiveJob->archiveFile.fileSize;
archiveJob->tapeFile.copyNb = 1;
archiveJob->validate();
std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch;
std::queue<cta::catalogue::TapeItemWritten> sTapeItems;
std::queue<std::unique_ptr <cta::SchedulerDatabase::ArchiveJob >> failedToReportArchiveJobs;
sDBarchiveJobBatch.emplace(std::move(archiveJob));
archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems,failedToReportArchiveJobs, lc);
archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(0, archiveJobBatch.size());
archiveMount->complete();
}
{
// Check that there are no jobs to report because only 1 copy of a dual copy
// file has been archived
auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc);
ASSERT_EQ(0, jobsToReport.size());
}
// Create the environment for the migration of copy 2 to happen (library +
// tape)
catalogue.setLogicalLibraryDisabled(s_adminOnAdminHost,s_libraryName,true);
const std::string copy2TapeVid = "copy_2_tape";
{
using namespace cta;
catalogue::CreateTapeAttributes tape;
tape.vid = copy2TapeVid;
tape.mediaType = s_mediaType;
tape.vendor = s_vendor;
tape.logicalLibraryName = s_libraryName;
tape.tapePoolName = tapePool2Name;
tape.full = false;
tape.state = common::dataStructures::Tape::ACTIVE;
tape.comment = "Comment";
catalogue.createTape(s_adminOnAdminHost, tape);
}
catalogue.tapeLabelled(copy2TapeVid, driveName);
// Archive copy 2 to tape
{
// Emulate a tape server by asking for a mount and then a file (and succeed the transfer)
std::unique_ptr<cta::TapeMount> mount;
// This first initialization is normally done by the dataSession function.
cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName };
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc);
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc);
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
//Test that no mount is available when a logical library is disabled
ASSERT_EQ(nullptr, mount.get());
catalogue.setLogicalLibraryDisabled(s_adminOnAdminHost,s_libraryName,false);
//continue our test
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE(nullptr, mount.get());
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType());
auto & osdb=getSchedulerDB();
auto mi=osdb.getMountInfo(lc);
ASSERT_EQ(1, mi->existingOrNextMounts.size());
ASSERT_EQ(tapePool2Name, mi->existingOrNextMounts.front().tapePool);
ASSERT_EQ(copy2TapeVid, mi->existingOrNextMounts.front().vid);
std::unique_ptr<cta::ArchiveMount> archiveMount;
archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release()));
ASSERT_NE(nullptr, archiveMount.get());
std::list<std::unique_ptr<cta::ArchiveJob>> archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
ASSERT_NE(nullptr, archiveJobBatch.front().get());
std::unique_ptr<ArchiveJob> archiveJob = std::move(archiveJobBatch.front());
archiveJob->tapeFile.blockId = 1;
archiveJob->tapeFile.fSeq = 1;
archiveJob->tapeFile.checksumBlob.insert(cta::checksum::ADLER32, 0x1234abcd);
archiveJob->tapeFile.fileSize = archiveJob->archiveFile.fileSize;
archiveJob->tapeFile.copyNb = 2;
archiveJob->validate();
std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch;
std::queue<cta::catalogue::TapeItemWritten> sTapeItems;
std::queue<std::unique_ptr <cta::SchedulerDatabase::ArchiveJob >> failedToReportArchiveJobs;
sDBarchiveJobBatch.emplace(std::move(archiveJob));
archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems,failedToReportArchiveJobs, lc);
archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(0, archiveJobBatch.size());
archiveMount->complete();
}
{
// Emulate the the reporter process reporting successful transfer to tape to the disk system
auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc);
ASSERT_NE(0, jobsToReport.size());
disk::DiskReporterFactory factory;
log::TimingList timings;
utils::Timer t;
scheduler.reportArchiveJobsBatch(jobsToReport, factory, timings, t, lc);
ASSERT_EQ(0, scheduler.getNextArchiveJobsToReportBatch(10, lc).size());
}
// Check that there are now two tape copies in the catalogue
{
common::dataStructures::RequesterIdentity requester;
requester.name = s_userName;
requester.group = "userGroup";
optional<std::string> activity;
const common::dataStructures::RetrieveFileQueueCriteria queueCriteria =
catalogue.prepareToRetrieveFile(s_diskInstance, archiveFileId, requester, activity, lc);
ASSERT_EQ(2, queueCriteria.archiveFile.tapeFiles.size());
std::map<uint8_t, common::dataStructures::TapeFile> copyNbToTape;
for (auto &tapeFile: queueCriteria.archiveFile.tapeFiles) {
if(copyNbToTape.end() != copyNbToTape.find(tapeFile.copyNb)) {
FAIL() << "Duplicate copyNb: vid=" << tapeFile.vid << " copyNb=" << (uint32_t)(tapeFile.copyNb);
}
copyNbToTape[tapeFile.copyNb] = tapeFile;
}
{
const auto tapeItor = copyNbToTape.find(1);
ASSERT_NE(copyNbToTape.end(), tapeItor);
const auto tapeFile = tapeItor->second;
ASSERT_EQ(copy1TapeVid, tapeFile.vid);
ASSERT_EQ(1, tapeFile.copyNb);
}
{
const auto tapeItor = copyNbToTape.find(2);
ASSERT_NE(copyNbToTape.end(), tapeItor);
const auto tapeFile = tapeItor->second;
ASSERT_EQ(copy2TapeVid, tapeFile.vid);
ASSERT_EQ(2, tapeFile.copyNb);
}
}
// Queue the retrieve request
{
cta::common::dataStructures::EntryLog creationLog;
creationLog.host="host2";
creationLog.time=0;
creationLog.username="admin1";
cta::common::dataStructures::DiskFileInfo diskFileInfo;
diskFileInfo.gid=GROUP_2;
diskFileInfo.owner_uid=CMS_USER;
diskFileInfo.path="path/to/file";
cta::common::dataStructures::RetrieveRequest request;
request.archiveFileID = archiveFileId;
request.creationLog = creationLog;
request.diskFileInfo = diskFileInfo;
request.dstURL = "dstURL";
request.requester.name = s_userName;
request.requester.group = "userGroup";
scheduler.queueRetrieve("disk_instance", request, lc);
scheduler.waitSchedulerDbSubthreadsComplete();
}
// Check that the retrieve request is queued
{
auto rqsts = scheduler.getPendingRetrieveJobs(lc);
// We expect 1 tape with queued jobs
ASSERT_EQ(1, rqsts.size());
// We expect the queue to contain 1 job
ASSERT_EQ(1, rqsts.cbegin()->second.size());
// We expect the job to be single copy
auto & job = rqsts.cbegin()->second.back();
ASSERT_EQ(1, job.tapeCopies.size());
// Currently guaranteed to get the copy 1 tape due to the temporary fix for
// the following CTA GitLab issue:
//
// cta/CTA#777 Minimize mounts for dual copy tape pool recalls
ASSERT_TRUE(copy1TapeVid == job.tapeCopies.cbegin()->first);
// Check the remote target
ASSERT_EQ("dstURL", job.request.dstURL);
// Check the archive file ID
ASSERT_EQ(archiveFileId, job.request.archiveFileID);
// Check that we can retrieve jobs by VID
// Get the vid from the above job and submit a separate request for the same vid
auto vid = rqsts.begin()->second.back().tapeCopies.begin()->first;
auto rqsts_vid = scheduler.getPendingRetrieveJobs(vid, lc);
// same tests as above
ASSERT_EQ(1, rqsts_vid.size());
auto &job_vid = rqsts_vid.back();
ASSERT_EQ(1, job_vid.tapeCopies.size());
// Currently guaranteed to get the copy 1 tape due to the temporary fix for
// the following CTA GitLab issue:
//
// cta/CTA#777 Minimize mounts for dual copy tape pool recalls
ASSERT_TRUE(copy1TapeVid == job_vid.tapeCopies.cbegin()->first);
ASSERT_EQ("dstURL", job_vid.request.dstURL);
ASSERT_EQ(archiveFileId, job_vid.request.archiveFileID);
}
{
// Emulate a tape server by asking for a mount and then a file (and succeed the transfer)
std::unique_ptr<cta::TapeMount> mount;
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE(nullptr, mount.get());
ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType());
std::unique_ptr<cta::RetrieveMount> retrieveMount;
retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release()));
ASSERT_NE(nullptr, retrieveMount.get());
std::unique_ptr<cta::RetrieveJob> retrieveJob;
auto jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(1, jobBatch.size());
retrieveJob.reset(jobBatch.front().release());
ASSERT_NE(nullptr, retrieveJob.get());
retrieveJob->asyncSetSuccessful();
std::queue<std::unique_ptr<cta::RetrieveJob> > jobQueue;
jobQueue.push(std::move(retrieveJob));
retrieveMount->flushAsyncSuccessReports(jobQueue, lc);
jobBatch = retrieveMount->getNextJobBatch(1,1,lc);
ASSERT_EQ(0, jobBatch.size());
}
}
TEST_P(SchedulerTest, archive_and_retrieve_failure) {
using namespace cta;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment