Commit 4840f8e5 authored by Cedric Caffy's avatar Cedric Caffy
Browse files

[scheduler] The scheduling now takes the VO readMaxDrives and writeMaxDrives...

[scheduler] The scheduling now takes the VO readMaxDrives and writeMaxDrives parameters to trigger a mount or not
parent ff773f1f
......@@ -289,6 +289,20 @@ public:
*/
virtual std::list<common::dataStructures::VirtualOrganization> getVirtualOrganizations() const = 0;
/**
* Get the virtual organization corresponding to the tapepool passed in parameter
* @param tapepoolName the name of the tapepool which we want the virtual organization
* @return the VirtualOrganization associated to the tapepool passed in parameter
*/
virtual common::dataStructures::VirtualOrganization getVirtualOrganizationOfTapepool(const std::string & tapepoolName) const = 0;
/**
* Get, from the cache, the virtual organization corresponding to the tapepool passed in parameter
* @param tapepoolName the name of the tapepool which we want the virtual organization
* @return the VirtualOrganization associated to the tapepool passed in parameter
*/
virtual common::dataStructures::VirtualOrganization getCachedVirtualOrganizationOfTapepool(const std::string & tapepoolName) const = 0;
/**
* Modifies the name of the specified Virtual Organization.
*
......
......@@ -127,6 +127,14 @@ public:
std::list<common::dataStructures::VirtualOrganization> getVirtualOrganizations() const override {
return retryOnLostConnection(m_log, [&]{return m_catalogue->getVirtualOrganizations();}, m_maxTriesToConnect);
}
common::dataStructures::VirtualOrganization getVirtualOrganizationOfTapepool(const std::string & tapepoolName) const override {
return retryOnLostConnection(m_log, [&]{return m_catalogue->getVirtualOrganizationOfTapepool(tapepoolName);}, m_maxTriesToConnect);
}
common::dataStructures::VirtualOrganization getCachedVirtualOrganizationOfTapepool(const std::string & tapepoolName) const override {
return retryOnLostConnection(m_log, [&]{return m_catalogue->getCachedVirtualOrganizationOfTapepool(tapepoolName);}, m_maxTriesToConnect);
}
void modifyVirtualOrganizationName(const common::dataStructures::SecurityIdentity &admin, const std::string &currentVoName, const std::string &newVoName) override {
return retryOnLostConnection(m_log, [&]{return m_catalogue->modifyVirtualOrganizationName(admin,currentVoName,newVoName);}, m_maxTriesToConnect);
......
......@@ -14756,6 +14756,24 @@ TEST_P(cta_catalogue_CatalogueTest, modifyVirtualOrganizationMaxDrivesAllowedFor
ASSERT_THROW(m_catalogue->modifyVirtualOrganizationWriteMaxDrives(m_admin,"DOES not exists",newMaxDrivesAllowedForWrite),cta::exception::UserError);
}
 
TEST_P(cta_catalogue_CatalogueTest, getVirtualOrganizationOfTapepool) {
using namespace cta;
const uint64_t nbPartialTapes = 2;
const bool isEncrypted = true;
const cta::optional<std::string> supply("value for the supply pool mechanism");
common::dataStructures::VirtualOrganization vo = getVo();
m_catalogue->createVirtualOrganization(m_admin,vo);
m_catalogue->createTapePool(m_admin, m_tape1.tapePoolName, m_vo.name, nbPartialTapes, isEncrypted, supply, "Create tape pool");
cta::common::dataStructures::VirtualOrganization voFromTapepool = m_catalogue->getVirtualOrganizationOfTapepool(m_tape1.tapePoolName);
ASSERT_EQ(vo,voFromTapepool);
ASSERT_THROW(m_catalogue->getVirtualOrganizationOfTapepool("DOES_NOT_EXIST"),cta::exception::Exception);
}
TEST_P(cta_catalogue_CatalogueTest, updateDiskFileId) {
using namespace cta;
 
......
......@@ -106,6 +106,8 @@ public:
void createVirtualOrganization(const common::dataStructures::SecurityIdentity &admin, const common::dataStructures::VirtualOrganization &vo) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void deleteVirtualOrganization(const std::string &voName) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
std::list<common::dataStructures::VirtualOrganization> getVirtualOrganizations() const override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
common::dataStructures::VirtualOrganization getVirtualOrganizationOfTapepool(const std::string & tapepoolName) const override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
common::dataStructures::VirtualOrganization getCachedVirtualOrganizationOfTapepool(const std::string & tapepoolName) const override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyVirtualOrganizationName(const common::dataStructures::SecurityIdentity &admin, const std::string &currentVoName, const std::string &newVoName) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyVirtualOrganizationReadMaxDrives(const common::dataStructures::SecurityIdentity &admin, const std::string &voName, const uint64_t maxDrivesAllowedForRead) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
void modifyVirtualOrganizationWriteMaxDrives(const common::dataStructures::SecurityIdentity &admin, const std::string &voName, const uint64_t maxDrivesAllowedForWrite) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
......
......@@ -60,6 +60,7 @@ RdbmsCatalogue::RdbmsCatalogue(
m_groupMountPolicyCache(10),
m_userMountPolicyCache(10),
m_allMountPoliciesCache(60),
m_tapepoolVirtualOrganizationCache(120),
m_expectedNbArchiveRoutesCache(10),
m_isAdminCache(10),
m_activitiesFairShareWeights(10) {}
......@@ -373,6 +374,8 @@ void RdbmsCatalogue::createVirtualOrganization(const common::dataStructures::Sec
stmt.executeNonQuery();
m_tapepoolVirtualOrganizationCache.invalidate();
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
......@@ -412,6 +415,7 @@ void RdbmsCatalogue::deleteVirtualOrganization(const std::string &voName){
throw exception::UserError(std::string("Cannot delete Virtual Organization : ") +
voName + " because it does not exist");
}
m_tapepoolVirtualOrganizationCache.invalidate();
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
......@@ -476,6 +480,97 @@ std::list<common::dataStructures::VirtualOrganization> RdbmsCatalogue::getVirtua
}
}
//------------------------------------------------------------------------------
// getVirtualOrganizationOfTapepool
//------------------------------------------------------------------------------
common::dataStructures::VirtualOrganization RdbmsCatalogue::getVirtualOrganizationOfTapepool(const std::string & tapepoolName) const {
try {
auto conn = m_connPool.getConn();
return getVirtualOrganizationOfTapepool(conn,tapepoolName);
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str());
throw;
}
}
//------------------------------------------------------------------------------
// getVirtualOrganizationOfTapepool
//------------------------------------------------------------------------------
common::dataStructures::VirtualOrganization RdbmsCatalogue::getVirtualOrganizationOfTapepool(rdbms::Conn & conn, const std::string & tapepoolName) const {
try {
const char *const sql =
"SELECT "
"VIRTUAL_ORGANIZATION.VIRTUAL_ORGANIZATION_NAME AS VIRTUAL_ORGANIZATION_NAME,"
"VIRTUAL_ORGANIZATION.READ_MAX_DRIVES AS READ_MAX_DRIVES,"
"VIRTUAL_ORGANIZATION.WRITE_MAX_DRIVES AS WRITE_MAX_DRIVES,"
"VIRTUAL_ORGANIZATION.USER_COMMENT AS USER_COMMENT,"
"VIRTUAL_ORGANIZATION.CREATION_LOG_USER_NAME AS CREATION_LOG_USER_NAME,"
"VIRTUAL_ORGANIZATION.CREATION_LOG_HOST_NAME AS CREATION_LOG_HOST_NAME,"
"VIRTUAL_ORGANIZATION.CREATION_LOG_TIME AS CREATION_LOG_TIME,"
"VIRTUAL_ORGANIZATION.LAST_UPDATE_USER_NAME AS LAST_UPDATE_USER_NAME,"
"VIRTUAL_ORGANIZATION.LAST_UPDATE_HOST_NAME AS LAST_UPDATE_HOST_NAME,"
"VIRTUAL_ORGANIZATION.LAST_UPDATE_TIME AS LAST_UPDATE_TIME "
"FROM "
"TAPE_POOL "
"INNER JOIN "
"VIRTUAL_ORGANIZATION "
"ON "
"TAPE_POOL.VIRTUAL_ORGANIZATION_ID = VIRTUAL_ORGANIZATION.VIRTUAL_ORGANIZATION_ID "
"WHERE "
"TAPE_POOL.TAPE_POOL_NAME = :TAPE_POOL_NAME";
auto stmt = conn.createStmt(sql);
stmt.bindString(":TAPE_POOL_NAME",tapepoolName);
auto rset = stmt.executeQuery();
if(!rset.next()){
throw exception::UserError(std::string("In RdbmsCatalogue::getVirtualOrganizationsOfTapepool() unable to find the Virtual Organization of the tapepool ") + tapepoolName + ".");
}
common::dataStructures::VirtualOrganization virtualOrganization;
virtualOrganization.name = rset.columnString("VIRTUAL_ORGANIZATION_NAME");
virtualOrganization.readMaxDrives = rset.columnUint64("READ_MAX_DRIVES");
virtualOrganization.writeMaxDrives = rset.columnUint64("WRITE_MAX_DRIVES");
virtualOrganization.comment = rset.columnString("USER_COMMENT");
virtualOrganization.creationLog.username = rset.columnString("CREATION_LOG_USER_NAME");
virtualOrganization.creationLog.host = rset.columnString("CREATION_LOG_HOST_NAME");
virtualOrganization.creationLog.time = rset.columnUint64("CREATION_LOG_TIME");
virtualOrganization.lastModificationLog.username = rset.columnString("LAST_UPDATE_USER_NAME");
virtualOrganization.lastModificationLog.host = rset.columnString("LAST_UPDATE_HOST_NAME");
virtualOrganization.lastModificationLog.time = rset.columnUint64("LAST_UPDATE_TIME");
return virtualOrganization;
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str());
throw;
}
}
//------------------------------------------------------------------------------
// getCachedVirtualOrganizationOfTapepool
//------------------------------------------------------------------------------
common::dataStructures::VirtualOrganization RdbmsCatalogue::getCachedVirtualOrganizationOfTapepool(const std::string & tapepoolName) const {
try {
auto getNonCachedValue = [&] {
auto conn = m_connPool.getConn();
return getVirtualOrganizationOfTapepool(conn,tapepoolName);
};
return m_tapepoolVirtualOrganizationCache.getCachedValue(tapepoolName,getNonCachedValue);
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str());
throw;
}
}
//------------------------------------------------------------------------------
// modifyVirtualOrganizationName
//------------------------------------------------------------------------------
......@@ -508,6 +603,9 @@ void RdbmsCatalogue::modifyVirtualOrganizationName(const common::dataStructures:
throw exception::UserError(std::string("Cannot modify virtual organization : ") + currentVoName +
" because it does not exist");
}
m_tapepoolVirtualOrganizationCache.invalidate();
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
......@@ -541,6 +639,9 @@ void RdbmsCatalogue::modifyVirtualOrganizationReadMaxDrives(const common::dataSt
throw exception::UserError(std::string("Cannot modify virtual organization : ") + voName +
" because it does not exist");
}
m_tapepoolVirtualOrganizationCache.invalidate();
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
......@@ -574,6 +675,9 @@ void RdbmsCatalogue::modifyVirtualOrganizationWriteMaxDrives(const common::dataS
throw exception::UserError(std::string("Cannot modify virtual organization : ") + voName +
" because it does not exist");
}
m_tapepoolVirtualOrganizationCache.invalidate();
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
......@@ -2077,6 +2181,9 @@ void RdbmsCatalogue::deleteTapePool(const std::string &name) {
if(0 == stmt.getNbAffectedRows()) {
throw exception::UserError(std::string("Cannot delete tape-pool ") + name + " because it does not exist");
}
m_tapepoolVirtualOrganizationCache.invalidate();
} else {
throw UserSpecifiedAnEmptyTapePool(std::string("Cannot delete tape-pool ") + name + " because it is not empty");
}
......@@ -2326,6 +2433,8 @@ void RdbmsCatalogue::modifyTapePoolVo(const common::dataStructures::SecurityIden
if(0 == stmt.getNbAffectedRows()) {
throw exception::UserError(std::string("Cannot modify tape pool ") + name + " because it does not exist");
}
//The VO of this tapepool has changed, invalidate the tapepool-VO cache
m_tapepoolVirtualOrganizationCache.invalidate();
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
......@@ -2534,6 +2643,9 @@ void RdbmsCatalogue::modifyTapePoolName(const common::dataStructures::SecurityId
if(0 == stmt.getNbAffectedRows()) {
throw exception::UserError(std::string("Cannot modify tape pool ") + currentName + " because it does not exist");
}
m_tapepoolVirtualOrganizationCache.invalidate();
} catch(exception::UserError &) {
throw;
} catch(exception::Exception &ex) {
......
......@@ -237,6 +237,28 @@ public:
*/
std::list<common::dataStructures::VirtualOrganization> getVirtualOrganizations() const override;
/**
* Get the virtual organization corresponding to the tapepool passed in parameter
* @param tapepoolName the name of the tapepool which we want the virtual organization
* @return the VirtualOrganization associated to the tapepool passed in parameter
*/
common::dataStructures::VirtualOrganization getVirtualOrganizationOfTapepool(const std::string & tapepoolName) const override;
/**
* Get the virtual organization corresponding to the tapepool passed in parameter
* @param conn the database connection
* @param tapepoolName the name of the tapepool which we want the virtual organization
* @return the VirtualOrganization associated to the tapepool passed in parameter
*/
common::dataStructures::VirtualOrganization getVirtualOrganizationOfTapepool(rdbms::Conn & conn, const std::string & tapepoolName) const;
/**
* Get, from the cache, the virtual organization corresponding to the tapepool passed in parameter
* @param tapepoolName the name of the tapepool which we want the virtual organization
* @return the VirtualOrganization associated to the tapepool passed in parameter
*/
common::dataStructures::VirtualOrganization getCachedVirtualOrganizationOfTapepool(const std::string & tapepoolName) const override;
/**
* Modifies the name of the specified Virtual Organization.
*
......@@ -2045,6 +2067,11 @@ protected:
* Cached versions of all mount policies
*/
mutable TimeBasedCache<std::string, std::list<common::dataStructures::MountPolicy>> m_allMountPoliciesCache;
/**
* Cached versions of virtual organization for specific tapepools
*/
mutable TimeBasedCache<std::string, common::dataStructures::VirtualOrganization> m_tapepoolVirtualOrganizationCache;
/**
* Cached versions of the expected number of archive routes for specific
......
......@@ -51,6 +51,10 @@ struct VirtualOrganization {
* The last modification log.
*/
EntryLog lastModificationLog;
bool operator==(const VirtualOrganization & other) const{
return (name == other.name && comment == other.comment && readMaxDrives == other.readMaxDrives && writeMaxDrives == other.writeMaxDrives);
}
};
}}}
......
......@@ -223,7 +223,6 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
m.oldestJobStartTime = aqueueJobsSummary.oldestJobStartTime;
//By default, we get the mountPolicies from the objectstore's queue counters
m.priority = aqueueJobsSummary.priority;
m.maxDrivesAllowed = aqueueJobsSummary.maxDrivesAllowed;
m.minRequestAge = aqueueJobsSummary.minArchiveRequestAge;
//If there are mount policies in the Catalogue
if(mountPolicies.size()) {
......@@ -233,7 +232,6 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
if(mountPoliciesInQueueList.size()){
auto mountPolicyToUse = createBestArchiveMountPolicy(mountPoliciesInQueueList);
m.priority = mountPolicyToUse.archivePriority;
m.maxDrivesAllowed = mountPolicyToUse.maxDrivesAllowed;
m.minRequestAge = mountPolicyToUse.archiveMinRequestAge;
}
}
......@@ -284,7 +282,6 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
m.filesQueued = aqueueRepackJobsSummary.jobs;
m.oldestJobStartTime = aqueueRepackJobsSummary.oldestJobStartTime;
m.priority = aqueueRepackJobsSummary.priority;
m.maxDrivesAllowed = aqueueRepackJobsSummary.maxDrivesAllowed;
m.minRequestAge = aqueueRepackJobsSummary.minArchiveRequestAge;
//If there are mount policies in the Catalogue
if(mountPolicies.size()) {
......@@ -294,7 +291,6 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
if(mountPoliciesInQueueList.size()){
auto mountPolicyToUse = createBestArchiveMountPolicy(mountPoliciesInQueueList);
m.priority = mountPolicyToUse.archivePriority;
m.maxDrivesAllowed = mountPolicyToUse.maxDrivesAllowed;
m.minRequestAge = mountPolicyToUse.archiveMinRequestAge;
}
}
......@@ -376,7 +372,6 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
}
if (rqSummary.jobs && (isPotentialMount || purpose == SchedulerDatabase::PurposeGetMountInfo::SHOW_QUEUES)) {
//Getting the default mountPolicies parameters from the queue summary
uint64_t maxDrivesAllowed = rqSummary.maxDrivesAllowed;
uint64_t minRetrieveRequestAge = rqSummary.minRetrieveRequestAge;
uint64_t priority = rqSummary.priority;
//Try to get the last values of the mountPolicies from the ones in the Catalogue
......@@ -387,7 +382,6 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
//As the Init element of the reduce function is the first element of the list, we start the reduce with the second element (++mountPolicyInQueueList.begin())
common::dataStructures::MountPolicy mountPolicyToUse = createBestRetrieveMountPolicy(mountPoliciesInQueueList);
priority = mountPolicyToUse.retrievePriority;
maxDrivesAllowed = mountPolicyToUse.maxDrivesAllowed;
minRetrieveRequestAge = mountPolicyToUse.retrieveMinRequestAge;
}
}
......@@ -406,7 +400,6 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
m.filesQueued = rqSummary.jobs;
m.oldestJobStartTime = rqueue.getJobsSummary().oldestJobStartTime;
m.priority = priority;
m.maxDrivesAllowed = maxDrivesAllowed;
m.minRequestAge = minRetrieveRequestAge;
m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller.
m.tapePool = ""; // The tape pool is not know and will be determined by the caller.
......@@ -439,7 +432,6 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
m.filesQueued = rqSummary.jobs;
m.oldestJobStartTime = rqSummary.oldestJobStartTime;
m.priority = priority;
m.maxDrivesAllowed = maxDrivesAllowed;
m.minRequestAge = minRetrieveRequestAge;
m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller.
m.tapePool = ""; // The tape pool is not know and will be determined by the caller.
......
This diff is collapsed.
......@@ -296,7 +296,7 @@ private:
*/
void sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> &mountInfo,
const std::string & logicalLibraryName, const std::string & driveName, utils::Timer & timer,
ExistingMountSummaryPerTapepool & existingMountsSummaryPerTapepool, ExistingMountSummaryPerVo & existingMountSummaryPerVo, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList,
ExistingMountSummaryPerTapepool & existingMountsDistinctTypeSummaryPerTapepool, ExistingMountSummaryPerVo & existingMountBasicTypeSummaryPerVo, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList,
double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc);
/**
......
......@@ -605,9 +605,6 @@ public:
uint64_t priority; /**< The priority for the mount, defined as the highest
* priority of all queued jobs */
uint64_t maxDrivesAllowed; /**< The maximum number of drives allowed for this
* tape pool, defined as the highest value amongst
* jobs */
time_t minRequestAge; /**< The maximum amount of time to wait before
* forcing a mount in the absence of enough data.
* Defined as the smallest value amongst jobs.*/
......
......@@ -2961,7 +2961,7 @@ TEST_P(SchedulerTest, emptyMountIsTriggeredWhenCancelledRetrieveRequest) {
}
TEST_P(SchedulerTest, archiveReportMultipleAndQueueRetrievesWithActivities) {
TEST_P(SchedulerTest, DISABLED_archiveReportMultipleAndQueueRetrievesWithActivities) {
using namespace cta;
Scheduler &scheduler = getScheduler();
......@@ -4431,7 +4431,6 @@ TEST_P(SchedulerTest, getSchedulingInformations) {
ASSERT_EQ(0,potentialMount.capacityInBytes);
ASSERT_EQ("",potentialMount.diskSystemSleptFor);
ASSERT_EQ(1,potentialMount.filesQueued);
ASSERT_EQ(s_maxDrivesAllowed,potentialMount.maxDrivesAllowed);
ASSERT_EQ(0,potentialMount.mountCount);
ASSERT_EQ(s_minArchiveRequestAge,potentialMount.minRequestAge);
ASSERT_EQ(s_archivePriority,potentialMount.priority);
......@@ -4506,7 +4505,6 @@ TEST_P(SchedulerTest, getSchedulingInformations) {
ASSERT_EQ(s_mediaTypeCapacityInBytes,potentialMount.capacityInBytes);
ASSERT_EQ("",potentialMount.diskSystemSleptFor);
ASSERT_EQ(1,potentialMount.filesQueued);
ASSERT_EQ(s_maxDrivesAllowed,potentialMount.maxDrivesAllowed);
ASSERT_EQ(0,potentialMount.mountCount);
ASSERT_EQ(s_minRetrieveRequestAge,potentialMount.minRequestAge);
ASSERT_EQ(s_retrievePriority,potentialMount.priority);
......@@ -4787,7 +4785,7 @@ TEST_P(SchedulerTest, expandRepackRequestShouldNotThrowIfTapeDisabledButNoRecall
ASSERT_NO_THROW(scheduler.expandRepackRequest(repackRequestToExpand,tl,t,lc));
}
TEST_P(SchedulerTest, archiveMountPolicyInFlightChangeScheduleMount){
TEST_P(SchedulerTest, archiveMaxDrivesVoInFlightChangeScheduleMount){
using namespace cta;
setupDefaultCatalogue();
......@@ -4841,7 +4839,7 @@ TEST_P(SchedulerTest, archiveMountPolicyInFlightChangeScheduleMount){
scheduler.queueArchiveWithGivenId(archiveFileId, s_diskInstance, request, lc);
scheduler.waitSchedulerDbSubthreadsComplete();
catalogue.modifyMountPolicyMaxDrivesAllowed(s_adminOnAdminHost,s_mountPolicyName,0);
catalogue.modifyVirtualOrganizationWriteMaxDrives(s_adminOnAdminHost,s_vo,0);
{
// Emulate a tape server
......@@ -4853,14 +4851,14 @@ TEST_P(SchedulerTest, archiveMountPolicyInFlightChangeScheduleMount){
bool nextMount = scheduler.getNextMountDryRun(s_libraryName, driveName, lc);
//nextMount should be false as the maxDrivesAllowed is 0
ASSERT_FALSE(nextMount);
catalogue.modifyMountPolicyMaxDrivesAllowed(s_adminOnAdminHost,s_mountPolicyName,50);
catalogue.modifyVirtualOrganizationWriteMaxDrives(s_adminOnAdminHost,s_vo,1);
//Reset the maxDrivesAllowed to a positive number should give a new mount
nextMount = scheduler.getNextMountDryRun(s_libraryName,driveName,lc);
ASSERT_TRUE(nextMount);
}
}
TEST_P(SchedulerTest, retrieveMountPolicyInFlightChangeScheduleMount)
TEST_P(SchedulerTest, retrieveMaxDrivesVoInFlightChangeScheduleMount)
{
using namespace cta;
using namespace cta::objectstore;
......@@ -4935,7 +4933,7 @@ TEST_P(SchedulerTest, retrieveMountPolicyInFlightChangeScheduleMount)
catalogue.filesWrittenToTape(tapeFilesWrittenCopy1);
tapeFilesWrittenCopy1.clear();
}
//Test the queueing of the Retrieve Request and try to mount after having disabled the tape
//Test the queueing of the Retrieve Request and try to mount after having changed the readMaxDrives of the VO
scheduler.waitSchedulerDbSubthreadsComplete();
{
std::string diskInstance="disk_instance";
......@@ -4950,15 +4948,251 @@ TEST_P(SchedulerTest, retrieveMountPolicyInFlightChangeScheduleMount)
ASSERT_TRUE(scheduler.getNextMountDryRun(s_libraryName,"drive",lc));
catalogue.modifyMountPolicyMaxDrivesAllowed(s_adminOnAdminHost,s_mountPolicyName,0);
catalogue.modifyVirtualOrganizationReadMaxDrives(s_adminOnAdminHost,s_vo,0);
ASSERT_FALSE(scheduler.getNextMountDryRun(s_libraryName,"drive",lc));
catalogue.modifyMountPolicyMaxDrivesAllowed(s_adminOnAdminHost,s_mountPolicyName,50);
catalogue.modifyVirtualOrganizationReadMaxDrives(s_adminOnAdminHost,s_vo,1);
ASSERT_TRUE(scheduler.getNextMountDryRun(s_libraryName,"drive",lc));
}
TEST_P(SchedulerTest, retrieveArchiveAllTypesMaxDrivesVoInFlightChangeScheduleMount)
{
//This test will emulate 3 tapeservers that will try to schedule one ArchiveForRepack, one ArchiveForUser and one Retrieve mount at the same time
//The VO readMaxDrives and writeMaxDrives will be changed to test that it works well.
//Also we will create two tapepools within the same VO to ensure that the readMaxDrives and writeMaxDrives are not per-tapepool
using namespace cta;
using namespace cta::objectstore;
unitTests::TempDirectory tempDirectory;
auto &catalogue = getCatalogue();
auto &scheduler = getScheduler();
auto &schedulerDB = getSchedulerDB();
cta::objectstore::Backend& backend = schedulerDB.getBackend();
setupDefaultCatalogue();
#ifdef STDOUT_LOGGING
log::StdoutLogger dl("dummy", "unitTest");
#else
log::DummyLogger dl("", "");
#endif
log::LogContext lc(dl);
//Create an agent to represent this test process
cta::objectstore::AgentReference agentReference("expandRepackRequestTest", dl);
cta::objectstore::Agent agent(agentReference.getAgentAddress(), backend);
agent.initialize();
agent.setTimeout_us(0);
agent.insertAndRegisterSelf(lc);
std::string drive1 = "drive1";
std::string drive2 = "drive2";
std::string drive3 = "drive3";
//Create a logical library in the catalogue
const bool logicalLibraryIsDisabled = false;
catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName, logicalLibraryIsDisabled, "Create logical library");
//This tape will contains files for triggering a Retrieve
auto tape1 = getDefaultTape();
catalogue.createTape(s_adminOnAdminHost, tape1);
//Two tapes for ArchiveForUser and ArchiveForRepack mounts
std::string vid2 = "vid_2";
std::string vid3 = "vid_3";
auto tape2 = tape1;
tape2.vid = vid2;
catalogue.createTape(s_adminOnAdminHost, tape2);
//Create a new tapepool on the same VO
std::string newTapepool = "new_tapepool";
catalogue.createTapePool(s_adminOnAdminHost,newTapepool,s_vo,1,false,cta::nullopt,"Test");
//Create the third tape in the new tapepool
auto tape3 = tape1;
tape3.vid = vid3;
tape3.tapePoolName = newTapepool;
catalogue.createTape(s_adminOnAdminHost,tape3);
//Create a storage class in the catalogue
common::dataStructures::StorageClass storageClass;
storageClass.name = s_storageClassName;
storageClass.nbCopies = 2;
storageClass.comment = "Create storage class";
catalogue.modifyStorageClassNbCopies(s_adminOnAdminHost,storageClass.name,storageClass.nbCopies);
//Create the a new archive routes for the second copy
catalogue.createArchiveRoute(s_adminOnAdminHost,storageClass.name,2,newTapepool,"ArchiveRoute2");
const std::string tapeDrive = "tape_drive";
const uint64_t nbArchiveFilesPerTape = 10;
const uint64_t archiveFileSize = 2 * 1000 * 1000 * 1000;
//Simulate the writing of 10 files in the first tape in the catalogue
std::set<catalogue::TapeItemWrittenPointer> tapeFilesWrittenCopy1;
{
uint64_t archiveFileId = 1;
std::string currentVid = s_vid;
for(uint64_t j = 1; j <= nbArchiveFilesPerTape; ++j) {
std::ostringstream diskFileId;
diskFileId << (12345677 + archiveFileId);
std::ostringstream diskFilePath;
diskFilePath << "/public_dir/public_file_"<<1<<"_"<< j;
auto fileWrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
auto & fileWritten = *fileWrittenUP;
fileWritten.archiveFileId = archiveFileId++;
fileWritten.diskInstance = s_diskInstance;
fileWritten.diskFileId = diskFileId.str();
fileWritten.diskFileOwnerUid = PUBLIC_OWNER_UID;
fileWritten.diskFileGid = PUBLIC_GID;
fileWritten.size = archiveFileSize;
fileWritten.checksumBlob.insert(cta::checksum::ADLER32,"1234");
fileWritten.storageClassName = s_storageClassName;
fileWritten.vid = currentVid;
fileWritten.fSeq = j;
fileWritten.blockId = j * 100;
fileWritten.size = archiveFileSize;
fileWritten.copyNb = 1;
fileWritten.tapeDrive = tapeDrive;
tapeFilesWrittenCopy1.emplace(fileWrittenUP.release());
}
//update the DB tape
catalogue.filesWrittenToTape(tapeFilesWrittenCopy1);
tapeFilesWrittenCopy1.clear();
}
//Queue the Retrieve request
scheduler.waitSchedulerDbSubthreadsComplete();
{
std::string diskInstance="disk_instance";
cta::common::dataStructures::RetrieveRequest rReq;
rReq.archiveFileID=1;
rReq.requester.name = s_userName;
rReq.requester.group = "someGroup";
rReq.dstURL = "dst_url";
scheduler.queueRetrieve(diskInstance, rReq, lc);
scheduler.waitSchedulerDbSubthreadsComplete();
}
uint64_t nbArchiveRequestToQueue = 2;
Sorter sorter(agentReference,backend,catalogue);
for(uint64_t i = 0; i < nbArchiveRequestToQueue; ++i) {
std::shared_ptr<cta::objectstore::ArchiveRequest> ar(new cta::objectstore::ArchiveRequest(agentReference.nextId("RepackSubRequest"),backend));
ar->initialize();
cta::common::dataStructures::ArchiveFile aFile;
aFile.archiveFileID = i;
aFile.diskFileId = "eos://diskFile";
aFile.checksumBlob.insert(cta::checksum::NONE, "");
aFile.creationTime = 0;