Commit 6d0d4dee authored by Cedric Caffy's avatar Cedric Caffy
Browse files

[scheduler] The Archive min request age is now taken into account for both...

[scheduler] The Archive min request age is now taken into account for both type ArchiveForUser and ArchiveForRepack mounts
parent 9f6108b8
......@@ -932,12 +932,19 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
}
}
//Distinct mount summary to store, per mount type (ArchiveForUser, ArchiveForRepack and Retrieve)
//the total mounts
//It will be used to decide whether we will use the mountPolicy's ArchiveMinRequest age
//or not (issue https://gitlab.cern.ch/cta/operations/-/issues/150)
ExistingMountSummary existingMountsDistinctType;
// With the existing mount list, we can now populate the potential mount list
// with the per tape pool existing mount statistics.
for (auto & em: mountInfo->existingOrNextMounts) {
// If a mount is still listed for our own drive, it is a leftover that we disregard.
if (em.driveName!=driveName) {
existingMountsSummary[TapePoolMountPair(em.tapePool, common::dataStructures::getMountBasicType(em.type))].totalMounts++;
existingMountsDistinctType[TapePoolMountPair(em.tapePool, em.type)].totalMounts++;
if (em.activity)
existingMountsSummary[TapePoolMountPair(em.tapePool, common::dataStructures::getMountBasicType(em.type))]
.activityMounts[em.activity.value()].value++;
......@@ -959,6 +966,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
// Get summary data
uint32_t existingMounts = 0;
uint32_t nbExistingMountsDistinctTypes = 0;
uint32_t activityMounts = 0;
bool sleepingMount = false;
try {
......@@ -966,6 +974,10 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
.at(TapePoolMountPair(m->tapePool, common::dataStructures::getMountBasicType(m->type)))
.totalMounts;
} catch (std::out_of_range &) {}
try {
nbExistingMountsDistinctTypes = existingMountsDistinctType.at(TapePoolMountPair(m->tapePool, m->type))
.totalMounts;
} catch (std::out_of_range &) {}
if (m->activityNameAndWeightedMountCount) {
try {
activityMounts = existingMountsSummary
......@@ -974,7 +986,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
} catch (std::out_of_range &) {}
}
uint32_t effectiveExistingMounts = 0;
if (common::dataStructures::getMountBasicType(m->type) == common::dataStructures::MountType::ArchiveAllTypes) effectiveExistingMounts = existingMounts;
if (common::dataStructures::getMountBasicType(m->type) == common::dataStructures::MountType::ArchiveAllTypes) effectiveExistingMounts = nbExistingMountsDistinctTypes;
bool mountPassesACriteria = false;
uint64_t minBytesToWarrantAMount = m_minBytesToWarrantAMount;
uint64_t minFilesToWarrantAMount = m_minFilesToWarrantAMount;
......
......@@ -5320,6 +5320,141 @@ TEST_P(SchedulerTest, getQueuesAndMountSummariesTest)
ASSERT_EQ(tapePool, res->tapePool);
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack,res->mountType);
}
TEST_P(SchedulerTest, getNextMountWithArchiveForUserAndArchiveForRepackShouldReturnBothMounts)
{
using namespace cta;
using namespace cta::objectstore;
unitTests::TempDirectory tempDirectory;
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
auto &schedulerDB = getSchedulerDB();
cta::objectstore::Backend& backend = schedulerDB.getBackend();
setupDefaultCatalogue();
#ifdef STDOUT_LOGGING
log::StdoutLogger dl("dummy", "unitTest");
#else
log::DummyLogger dl("", "");
#endif
log::LogContext lc(dl);
//Create an agent to represent this test process
cta::objectstore::AgentReference agentReference("testAgent", dl);
cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
//Create a logical library in the catalogue
const bool logicalLibraryIsDisabled = false;
catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName, logicalLibraryIsDisabled, "Create logical library");
const std::string comment = "Tape comment";
bool notDisabled = false;
bool notFull = false;
bool notReadOnly = false;
catalogue::CreateTapeAttributes tape;
tape.vid = s_vid;
tape.mediaType = s_mediaType;
tape.vendor = s_vendor;
tape.logicalLibraryName = s_libraryName;
tape.tapePoolName = s_tapePoolName;
tape.full = notFull;
tape.disabled = notDisabled;
tape.readOnly = notReadOnly;
tape.comment = comment;
catalogue.createTape(s_adminOnAdminHost, tape);
std::string secondTapeVid = s_vid + "2";
catalogue::CreateTapeAttributes tape2;
tape2.vid = secondTapeVid;
tape2.mediaType = s_mediaType;
tape2.vendor = s_vendor;
tape2.logicalLibraryName = s_libraryName;
tape2.tapePoolName = s_tapePoolName;
tape2.full = notFull;
tape2.disabled = notDisabled;
tape2.readOnly = notReadOnly;
tape2.comment = comment;
catalogue.createTape(s_adminOnAdminHost, tape2);
//Create a storage class in the catalogue
common::dataStructures::StorageClass storageClass;
storageClass.name = s_storageClassName;
storageClass.nbCopies = 1;
storageClass.comment = "Create storage class";
const std::string tapeDrive1 = "tape_drive";
const std::string tapeDrive2 = "tape_drive_2";
//Create an ArchiveForUser queue and put one file on it
std::string archiveForUserQueueAddress;
cta::objectstore::RootEntry re(backend);
{
cta::objectstore::ScopedExclusiveLock sel(re);
re.fetch();
archiveForUserQueueAddress = re.addOrGetArchiveQueueAndCommit(s_tapePoolName,agentReference,JobQueueType::JobsToTransferForUser);
}
cta::common::dataStructures::MountPolicy mountPolicy;
cta::objectstore::ArchiveRequest::JobDump jobDump;
jobDump.copyNb = 1;
jobDump.status = cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForUser;
jobDump.tapePool = s_tapePoolName;
mountPolicy.maxDrivesAllowed = 3;
mountPolicy.archiveMinRequestAge = 0;
cta::objectstore::ArchiveQueue::JobToAdd archiveJobToAdd{jobDump,"",1,2,mountPolicy,time(nullptr)};
cta::objectstore::ArchiveQueue aq(archiveForUserQueueAddress,backend);
{
cta::objectstore::ScopedExclusiveLock sel(aq);
aq.fetch();
std::list<cta::objectstore::ArchiveQueue::JobToAdd> jobsToAdd({archiveJobToAdd});
aq.addJobsAndCommit(jobsToAdd,agentReference,lc);
}
::sleep(1);
ASSERT_TRUE(scheduler.getNextMountDryRun(s_libraryName,tapeDrive1,lc));
auto mount = scheduler.getNextMount(s_libraryName,tapeDrive1,lc);
ASSERT_NE(nullptr,mount);
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser,mount->getMountType());
//Create an ArchiveForRepack queue
std::string archiveForRepackQueueAddress;
{
cta::objectstore::ScopedExclusiveLock sel(re);
re.fetch();
archiveForRepackQueueAddress = re.addOrGetArchiveQueueAndCommit(s_tapePoolName,agentReference,JobQueueType::JobsToTransferForRepack);
}
jobDump.status = cta::objectstore::serializers::ArchiveJobStatus::AJS_ToTransferForRepack;
cta::objectstore::ArchiveQueue::JobToAdd archiveRepackJobToAdd{jobDump,"",2,2,mountPolicy,time(nullptr)};
cta::objectstore::ArchiveQueue aqRepack(archiveForRepackQueueAddress,backend);
{
cta::objectstore::ScopedExclusiveLock sel(aqRepack);
aqRepack.fetch();
std::list<cta::objectstore::ArchiveQueue::JobToAdd> jobsToAdd({archiveRepackJobToAdd});
aqRepack.addJobsAndCommit(jobsToAdd,agentReference,lc);
}
::sleep(1);
ASSERT_TRUE(scheduler.getNextMountDryRun(s_libraryName,tapeDrive2,lc));
//Get the next mount
auto archiveForRepackMount = scheduler.getNextMount(s_libraryName,tapeDrive2,lc);
ASSERT_NE(nullptr,archiveForRepackMount);
ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForRepack,archiveForRepackMount->getMountType());
//Now requeue another job and check that it does not trigger another mount
}
#undef TEST_MOCK_DB
#ifdef TEST_MOCK_DB
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment