Commit e9246dcf authored by Cedric Caffy's avatar Cedric Caffy
Browse files

[scheduler] Scheduling now considers ArchiveForRepack and ArchiveForUser as...

[scheduler] Scheduling now considers ArchiveForRepack and ArchiveForUser as two distinct types of mount

(issue https://gitlab.cern.ch/cta/operations/-/issues/150)
parent fe14ed53
......@@ -932,21 +932,14 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
}
}
//Distinct mount summary to store, per mount type (ArchiveForUser, ArchiveForRepack and Retrieve)
//the total mounts
//It will be used to decide whether we will use the mountPolicy's ArchiveMinRequest age
//or not (issue https://gitlab.cern.ch/cta/operations/-/issues/150)
ExistingMountSummary existingMountsDistinctType;
// With the existing mount list, we can now populate the potential mount list
// with the per tape pool existing mount statistics.
for (auto & em: mountInfo->existingOrNextMounts) {
// If a mount is still listed for our own drive, it is a leftover that we disregard.
if (em.driveName!=driveName) {
existingMountsSummary[TapePoolMountPair(em.tapePool, common::dataStructures::getMountBasicType(em.type))].totalMounts++;
existingMountsDistinctType[TapePoolMountPair(em.tapePool, em.type)].totalMounts++;
existingMountsSummary[TapePoolMountPair(em.tapePool, em.type)].totalMounts++;
if (em.activity)
existingMountsSummary[TapePoolMountPair(em.tapePool, common::dataStructures::getMountBasicType(em.type))]
existingMountsSummary[TapePoolMountPair(em.tapePool, em.type)]
.activityMounts[em.activity.value()].value++;
if (em.vid.size()) {
tapesInUse.insert(em.vid);
......@@ -966,24 +959,17 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
// Get summary data
uint32_t existingMounts = 0;
uint32_t nbExistingMountsDistinctTypes = 0;
uint32_t activityMounts = 0;
bool sleepingMount = false;
try {
existingMounts = existingMountsSummary
.at(TapePoolMountPair(m->tapePool, common::dataStructures::getMountBasicType(m->type)))
.totalMounts;
} catch (std::out_of_range &) {}
try {
//The number of existing mounts with ArchiveAllTypes split into ArchiveForUser and ArchiveForRepack to
//take into consideration the mount policy's Archive min request age (issue https://gitlab.cern.ch/cta/operations/-/issues/150)
nbExistingMountsDistinctTypes = existingMountsDistinctType.at(TapePoolMountPair(m->tapePool, m->type))
.at(TapePoolMountPair(m->tapePool, m->type))
.totalMounts;
} catch (std::out_of_range &) {}
if (m->activityNameAndWeightedMountCount) {
try {
activityMounts = existingMountsSummary
.at(TapePoolMountPair(m->tapePool, common::dataStructures::getMountBasicType(m->type)))
.at(TapePoolMountPair(m->tapePool, m->type))
.activityMounts.at(m->activityNameAndWeightedMountCount.value().activity).value;
} catch (std::out_of_range &) {}
}
......@@ -1000,7 +986,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
mountPassesACriteria = true;
if (m->filesQueued / (1 + effectiveExistingMounts) >= minFilesToWarrantAMount)
mountPassesACriteria = true;
if (!nbExistingMountsDistinctTypes && ((time(NULL) - m->oldestJobStartTime) > m->minRequestAge))
if (!effectiveExistingMounts && ((time(NULL) - m->oldestJobStartTime) > m->minRequestAge))
mountPassesACriteria = true;
if (m->sleepingMount) {
sleepingMount = true;
......@@ -1215,7 +1201,7 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const
catalogueTime = getTapeInfoTime + getTapeForWriteTime;
uint32_t existingMounts = 0;
try {
existingMounts=existingMountsSummary.at(TapePoolMountPair(m->tapePool, common::dataStructures::getMountBasicType(m->type))).totalMounts;
existingMounts=existingMountsSummary.at(TapePoolMountPair(m->tapePool, m->type)).totalMounts;
} catch (...) {}
log::ScopedParamContainer params(lc);
params.add("tapePool", m->tapePool)
......@@ -1391,7 +1377,7 @@ auto logicalLibrary = getLogicalLibrary(logicalLibraryName,getLogicalLibrariesTi
log::ScopedParamContainer params(lc);
uint32_t existingMounts = 0;
try {
existingMounts=existingMountsSummary.at(TapePoolMountPair(m->tapePool, common::dataStructures::getMountBasicType(m->type))).totalMounts;
existingMounts=existingMountsSummary.at(TapePoolMountPair(m->tapePool, m->type)).totalMounts;
} catch (...) {}
schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime;
catalogueTime = getTapeInfoTime + getTapeForWriteTime;
......
......@@ -5321,17 +5321,17 @@ TEST_P(SchedulerTest, getQueuesAndMountSummariesTest)
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack,res->mountType);
}
TEST_P(SchedulerTest, getNextMountWithArchiveForUserAndArchiveForRepackShouldReturnBothMounts)
{
using namespace cta;
//This test tests what is described in the use case ticket
// high priority Archive job not scheduled when Repack is running : https://gitlab.cern.ch/cta/operations/-/issues/150
TEST_P(SchedulerTest, getNextMountWithArchiveForUserAndArchiveForRepackShouldReturnBothMountsArchiveMinRequestAge){
using namespace cta;
using namespace cta::objectstore;
unitTests::TempDirectory tempDirectory;
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
auto &schedulerDB = getSchedulerDB();
cta::objectstore::Backend& backend = schedulerDB.getBackend();
setupDefaultCatalogue();
#ifdef STDOUT_LOGGING
log::StdoutLogger dl("dummy", "unitTest");
#else
......@@ -5339,119 +5339,180 @@ TEST_P(SchedulerTest, getNextMountWithArchiveForUserAndArchiveForRepackShouldRet
#endif
log::LogContext lc(dl);
//Create an agent to represent this test process
cta::objectstore::AgentReference agentReference("testAgent", dl);
cta::objectstore::AgentReference agentReference("agentTest", dl);
cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
//Create a logical library in the catalogue
const bool logicalLibraryIsDisabled = false;
catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName, logicalLibraryIsDisabled, "Create logical library");
const std::string comment = "Tape comment";
bool notDisabled = false;
bool notFull = false;
bool notReadOnly = false;
catalogue::CreateTapeAttributes tape;
tape.vid = s_vid;
tape.mediaType = s_mediaType;
tape.vendor = s_vendor;
tape.logicalLibraryName = s_libraryName;
tape.tapePoolName = s_tapePoolName;
tape.full = notFull;
tape.disabled = notDisabled;
tape.readOnly = notReadOnly;
tape.comment = comment;
catalogue.createTape(s_adminOnAdminHost, tape);
std::string secondTapeVid = s_vid + "2";
catalogue::CreateTapeAttributes tape2;
tape2.vid = secondTapeVid;
tape2.mediaType = s_mediaType;
tape2.vendor = s_vendor;
tape2.logicalLibraryName = s_libraryName;
tape2.tapePoolName = s_tapePoolName;
tape2.full = notFull;
tape2.disabled = notDisabled;
tape2.readOnly = notReadOnly;
tape2.comment = comment;
catalogue.createTape(s_adminOnAdminHost, tape2);
//Create environment for the test
const std::string libraryComment = "Library comment";
const bool libraryIsDisabled = false;
catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName,
libraryIsDisabled, libraryComment);
//Create a storage class in the catalogue
common::dataStructures::StorageClass storageClass;
storageClass.name = s_storageClassName;
storageClass.nbCopies = 1;
storageClass.comment = "Create storage class";
const std::string tapeDrive1 = "tape_drive";
const std::string tapeDrive2 = "tape_drive_2";
std::string drive0 = "drive0";
std::string drive1 = "drive1";
std::string drive2 = "drive2";
//Create an ArchiveForUser queue and put one file on it
std::string archiveForUserQueueAddress;
cta::objectstore::RootEntry re(backend);
//Create two tapes (ArchiveForRepack and ArchiveForUser)
{
cta::objectstore::ScopedExclusiveLock sel(re);
re.fetch();
archiveForUserQueueAddress = re.addOrGetArchiveQueueAndCommit(s_tapePoolName,agentReference,JobQueueType::JobsToTransferForUser);
catalogue::CreateTapeAttributes tape;
tape.vid = s_vid;
tape.mediaType = s_mediaType;
tape.vendor = s_vendor;
tape.logicalLibraryName = s_libraryName;
tape.tapePoolName = s_tapePoolName;
tape.full = false;
tape.disabled = false;
tape.readOnly = false;
tape.comment = "Comment";
catalogue.createTape(s_adminOnAdminHost, tape);
}
cta::common::dataStructures::MountPolicy mountPolicy;
cta::objectstore::ArchiveRequest::JobDump jobDump;
jobDump.copyNb = 1;
jobDump.status = cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForUser;
jobDump.tapePool = s_tapePoolName;
mountPolicy.maxDrivesAllowed = 3;
mountPolicy.archiveMinRequestAge = 0;
cta::objectstore::ArchiveQueue::JobToAdd archiveJobToAdd{jobDump,"",1,2,mountPolicy,1};
cta::objectstore::ArchiveQueue aq(archiveForUserQueueAddress,backend);
{
cta::objectstore::ScopedExclusiveLock sel(aq);
aq.fetch();
std::list<cta::objectstore::ArchiveQueue::JobToAdd> jobsToAdd({archiveJobToAdd});
aq.addJobsAndCommit(jobsToAdd,agentReference,lc);
catalogue::CreateTapeAttributes tape;
tape.vid = s_vid+"_1";
tape.mediaType = s_mediaType;
tape.vendor = s_vendor;
tape.logicalLibraryName = s_libraryName;
tape.tapePoolName = s_tapePoolName;
tape.full = false;
tape.disabled = false;
tape.readOnly = false;
tape.comment = "Comment";
catalogue.createTape(s_adminOnAdminHost, tape);
}
ASSERT_TRUE(scheduler.getNextMountDryRun(s_libraryName,tapeDrive1,lc));
uint64_t fileSize = 667;
auto mount = scheduler.getNextMount(s_libraryName,tapeDrive1,lc);
Sorter sorter(agentReference,backend,catalogue);
for(uint64_t i = 0; i < 2; ++i) {
std::shared_ptr<cta::objectstore::ArchiveRequest> ar(new cta::objectstore::ArchiveRequest(agentReference.nextId("RepackSubRequest"),backend));
ar->initialize();
cta::common::dataStructures::ArchiveFile aFile;
aFile.archiveFileID = i;
aFile.diskFileId = "eos://diskFile";
aFile.checksumBlob.insert(cta::checksum::NONE, "");
aFile.creationTime = 0;
aFile.reconciliationTime = 0;
aFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo();
aFile.diskInstance = "eoseos";
aFile.fileSize = fileSize;
aFile.storageClass = "sc";
ar->setArchiveFile(aFile);
ar->addJob(1, s_tapePoolName, agentReference.getAgentAddress(), 1, 1, 1);
ar->setJobStatus(1, serializers::ArchiveJobStatus::AJS_ToTransferForRepack);
cta::common::dataStructures::MountPolicy mp;
//We want the archiveMinRequestAge to be taken into account and trigger the mount
//hence set it to 0
mp.archiveMinRequestAge = 0;
mp.maxDrivesAllowed = 3;
ar->setMountPolicy(mp);
ar->setArchiveReportURL("");
ar->setArchiveErrorReportURL("");
ar->setRequester(cta::common::dataStructures::RequesterIdentity("user0", "group0"));
ar->setSrcURL("root://eoseos/myFile");
ar->setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr)));
sorter.insertArchiveRequest(ar, agentReference, lc);
ar->insert();
}
ASSERT_NE(nullptr,mount);
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser,mount->getMountType());
sorter.flushAll(lc);
//Create an ArchiveForRepack queue
std::string archiveForRepackQueueAddress;
{
cta::objectstore::ScopedExclusiveLock sel(re);
re.fetch();
archiveForRepackQueueAddress = re.addOrGetArchiveQueueAndCommit(s_tapePoolName,agentReference,JobQueueType::JobsToTransferForRepack);
}
//The archiveMinRequestAge should have 1 second to trigger a mount
::sleep(1);
jobDump.status = cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForRepack;
cta::objectstore::ArchiveQueue::JobToAdd archiveRepackJobToAdd{jobDump,"",2,2,mountPolicy,1};
ASSERT_TRUE(scheduler.getNextMountDryRun(s_libraryName,drive0,lc));
cta::objectstore::ArchiveQueue aqRepack(archiveForRepackQueueAddress,backend);
{
cta::objectstore::ScopedExclusiveLock sel(aqRepack);
aqRepack.fetch();
std::list<cta::objectstore::ArchiveQueue::JobToAdd> jobsToAdd({archiveRepackJobToAdd});
aqRepack.addJobsAndCommit(jobsToAdd,agentReference,lc);
std::unique_ptr<cta::TapeMount> tapeMount = scheduler.getNextMount(s_libraryName,drive0,lc);
ASSERT_NE(nullptr,tapeMount);
std::unique_ptr<cta::ArchiveMount> archiveForRepackMount(static_cast<ArchiveMount *>(tapeMount.release()));
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack,archiveForRepackMount->getMountType());
auto archiveForRepackJobs = archiveForRepackMount->getNextJobBatch(1,fileSize,lc);
//Pop only one file for this mount
ASSERT_EQ(1,archiveForRepackJobs.size());
}
//Now queue ArchiveForUser files
for(uint64_t i = 0; i < 2; ++i) {
std::shared_ptr<cta::objectstore::ArchiveRequest> ar(new cta::objectstore::ArchiveRequest(agentReference.nextId("ArchiveRequest"),backend));
ar->initialize();
cta::common::dataStructures::ArchiveFile aFile;
aFile.archiveFileID = i;
aFile.diskFileId = "eos://diskFile";
aFile.checksumBlob.insert(cta::checksum::NONE, "");
aFile.creationTime = 0;
aFile.reconciliationTime = 0;
aFile.diskFileInfo = cta::common::dataStructures::DiskFileInfo();
aFile.diskInstance = "eoseos";
aFile.fileSize = fileSize;
aFile.storageClass = "sc";
ar->setArchiveFile(aFile);
ar->addJob(1, s_tapePoolName, agentReference.getAgentAddress(), 1, 1, 1);
ar->setJobStatus(1, serializers::ArchiveJobStatus::AJS_ToTransferForUser);
cta::common::dataStructures::MountPolicy mp;
//We want the archiveMinRequestAge to be taken into account and trigger the mount
//hence set it to 0
mp.archiveMinRequestAge = 0;
mp.maxDrivesAllowed = 3;
ar->setMountPolicy(mp);
ar->setArchiveReportURL("");
ar->setArchiveErrorReportURL("");
ar->setRequester(cta::common::dataStructures::RequesterIdentity("user0", "group0"));
ar->setSrcURL("root://eoseos/myFile");
ar->setEntryLog(cta::common::dataStructures::EntryLog("user0", "host0", time(nullptr)));
sorter.insertArchiveRequest(ar, agentReference, lc);
ar->insert();
}
ASSERT_TRUE(scheduler.getNextMountDryRun(s_libraryName,tapeDrive2,lc));
sorter.flushAll(lc);
//Get the next mount
auto archiveForRepackMount = scheduler.getNextMount(s_libraryName,tapeDrive2,lc);
//Sleeping one second to trigger a mount
::sleep(1);
ASSERT_NE(nullptr,archiveForRepackMount);
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack,archiveForRepackMount->getMountType());
//The next mount should be an ArchiveForUser mount as there is already a mount ongoing with an ArchiveForRepack
ASSERT_TRUE(scheduler.getNextMountDryRun(s_libraryName,drive1,lc));
//Now requeue another job and check that it does not trigger another mount
{
std::unique_ptr<cta::TapeMount> tapeMount = scheduler.getNextMount(s_libraryName,drive1,lc);
ASSERT_NE(nullptr,tapeMount);
std::unique_ptr<cta::ArchiveMount> archiveForUserMount(static_cast<ArchiveMount *>(tapeMount.release()));
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser,archiveForUserMount->getMountType());
auto archiveForUserJobs = archiveForUserMount->getNextJobBatch(1,fileSize,lc);
//Pop only one file for this mount
ASSERT_EQ(1,archiveForUserJobs.size());
}
//Now let's create another tape, and try to schedule another mount with another drive
//No ArchiveMount should be triggered
{
catalogue::CreateTapeAttributes tape;
tape.vid = s_vid+"_2";
tape.mediaType = s_mediaType;
tape.vendor = s_vendor;
tape.logicalLibraryName = s_libraryName;
tape.tapePoolName = s_tapePoolName;
tape.full = false;
tape.disabled = false;
tape.readOnly = false;
tape.comment = "Comment";
catalogue.createTape(s_adminOnAdminHost, tape);
}
ASSERT_FALSE(scheduler.getNextMountDryRun(s_libraryName,drive2,lc));
}
#undef TEST_MOCK_DB
#ifdef TEST_MOCK_DB
static cta::MockSchedulerDatabaseFactory mockDbFactory;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment