From ac2efcfd23c6fd28fd97daf2fd0996460e2a2315 Mon Sep 17 00:00:00 2001 From: Eric Cano <Eric.Cano@cern.ch> Date: Thu, 6 Jun 2019 17:32:03 +0200 Subject: [PATCH] #471 added unit test. --- catalogue/DropSchemaCmd.cpp | 12 +- catalogue/RdbmsCatalogue.cpp | 86 +++--- common/dataStructures/DesiredDriveState.hpp | 1 + common/dataStructures/DriveState.cpp | 26 -- common/dataStructures/DriveState.hpp | 44 ++-- .../RetrieveFileQueueCriteria.cpp | 1 + objectstore/DriveState.cpp | 31 +-- scheduler/ArchiveMount.hpp | 8 + scheduler/LabelMount.hpp | 8 + scheduler/OStoreDB/OStoreDB.cpp | 77 ++++-- scheduler/OStoreDB/OStoreDB.hpp | 5 +- scheduler/RetrieveMount.cpp | 7 + scheduler/RetrieveMount.hpp | 8 + scheduler/Scheduler.cpp | 82 ++++-- scheduler/Scheduler.hpp | 13 +- scheduler/SchedulerDatabase.hpp | 8 +- scheduler/SchedulerTest.cpp | 246 ++++++++++++++++++ scheduler/TapeMount.hpp | 9 + scheduler/TapeMountDummy.hpp | 3 + 19 files changed, 506 insertions(+), 169 deletions(-) diff --git a/catalogue/DropSchemaCmd.cpp b/catalogue/DropSchemaCmd.cpp index a73dd2f841..38072cc9f3 100644 --- a/catalogue/DropSchemaCmd.cpp +++ b/catalogue/DropSchemaCmd.cpp @@ -146,7 +146,8 @@ void DropSchemaCmd::dropSqliteCatalogueSchema(rdbms::Conn &conn) { "STORAGE_CLASS_ID", "TAPE_POOL", "LOGICAL_LIBRARY", - "MOUNT_POLICY"}; + "MOUNT_POLICY", + "ACTIVITIES_WEIGHTS"}; dropDatabaseTables(conn, tablesToDrop); } catch(exception::Exception &ex) { throw exception::Exception(std::string(__FUNCTION__) + " failed: " + ex.getMessage().str()); @@ -173,7 +174,8 @@ void DropSchemaCmd::dropMysqlCatalogueSchema(rdbms::Conn &conn) { "STORAGE_CLASS_ID", "TAPE_POOL", "LOGICAL_LIBRARY", - "MOUNT_POLICY"}; + "MOUNT_POLICY", + "ACTIVITIES_WEIGHTS"}; dropDatabaseTables(conn, tablesToDrop); std::list<std::string> triggersToDrop = { @@ -237,7 +239,8 @@ void DropSchemaCmd::dropOracleCatalogueSchema(rdbms::Conn &conn) { "STORAGE_CLASS", "TAPE_POOL", "LOGICAL_LIBRARY", - "MOUNT_POLICY" + "MOUNT_POLICY", + "ACTIVITIES_WEIGHTS" }; dropDatabaseTables(conn, tablesToDrop); @@ -268,7 +271,8 @@ void DropSchemaCmd::dropPostgresCatalogueSchema(rdbms::Conn &conn) { "STORAGE_CLASS", "TAPE_POOL", "LOGICAL_LIBRARY", - "MOUNT_POLICY" + "MOUNT_POLICY", + "ACTIVITIES_WEIGHTS" }; dropDatabaseTables(conn, tablesToDrop); diff --git a/catalogue/RdbmsCatalogue.cpp b/catalogue/RdbmsCatalogue.cpp index 47f98fdeb3..766de6b418 100644 --- a/catalogue/RdbmsCatalogue.cpp +++ b/catalogue/RdbmsCatalogue.cpp @@ -4915,52 +4915,56 @@ common::dataStructures::RetrieveFileQueueCriteria RdbmsCatalogue::prepareToRetri log::LogContext &lc) { try { cta::utils::Timer t; - auto conn = m_connPool.getConn(); - const auto getConnTime = t.secs(utils::Timer::resetCounter); - auto archiveFile = getArchiveFileToRetrieveByArchiveFileId(conn, archiveFileId); - const auto getArchiveFileTime = t.secs(utils::Timer::resetCounter); - if(nullptr == archiveFile.get()) { - exception::UserError ex; - ex.getMessage() << "No tape files available for archive file with archive file ID " << archiveFileId; - throw ex; - } + common::dataStructures::RetrieveFileQueueCriteria criteria; + { + auto conn = m_connPool.getConn(); + const auto getConnTime = t.secs(utils::Timer::resetCounter); + auto archiveFile = getArchiveFileToRetrieveByArchiveFileId(conn, archiveFileId); + const auto getArchiveFileTime = t.secs(utils::Timer::resetCounter); + if(nullptr == archiveFile.get()) { + exception::UserError ex; + ex.getMessage() << "No tape files available for archive file with archive file ID " << archiveFileId; + throw ex; + } - if(diskInstanceName != archiveFile->diskInstance) { - exception::UserError ue; - ue.getMessage() << "Cannot retrieve file because the disk instance of the request does not match that of the" - " archived file: archiveFileId=" << archiveFileId << " path=" << archiveFile->diskFileInfo.path << - " requestDiskInstance=" << diskInstanceName << " archiveFileDiskInstance=" << archiveFile->diskInstance; - throw ue; - } + if(diskInstanceName != archiveFile->diskInstance) { + exception::UserError ue; + ue.getMessage() << "Cannot retrieve file because the disk instance of the request does not match that of the" + " archived file: archiveFileId=" << archiveFileId << " path=" << archiveFile->diskFileInfo.path << + " requestDiskInstance=" << diskInstanceName << " archiveFileDiskInstance=" << archiveFile->diskInstance; + throw ue; + } - t.reset(); - const RequesterAndGroupMountPolicies mountPolicies = getMountPolicies(conn, diskInstanceName, user.name, - user.group); - const auto getMountPoliciesTime = t.secs(utils::Timer::resetCounter); + t.reset(); + const RequesterAndGroupMountPolicies mountPolicies = getMountPolicies(conn, diskInstanceName, user.name, + user.group); + const auto getMountPoliciesTime = t.secs(utils::Timer::resetCounter); + + log::ScopedParamContainer spc(lc); + spc.add("getConnTime", getConnTime) + .add("getArchiveFileTime", getArchiveFileTime) + .add("getMountPoliciesTime", getMountPoliciesTime); + lc.log(log::INFO, "Catalogue::prepareToRetrieve internal timings"); + + // Requester mount policies overrule requester group mount policies + common::dataStructures::MountPolicy mountPolicy; + if(!mountPolicies.requesterMountPolicies.empty()) { + mountPolicy = mountPolicies.requesterMountPolicies.front(); + } else if(!mountPolicies.requesterGroupMountPolicies.empty()) { + mountPolicy = mountPolicies.requesterGroupMountPolicies.front(); + } else { + exception::UserError ue; + ue.getMessage() << "Cannot retrieve file because there are no mount rules for the requester or their group:" << + " archiveFileId=" << archiveFileId << " path=" << archiveFile->diskFileInfo.path << " requester=" << + diskInstanceName << ":" << user.name << ":" << user.group; + throw ue; + } - log::ScopedParamContainer spc(lc); - spc.add("getConnTime", getConnTime) - .add("getArchiveFileTime", getArchiveFileTime) - .add("getMountPoliciesTime", getMountPoliciesTime); - lc.log(log::INFO, "Catalogue::prepareToRetrieve internal timings"); - // Requester mount policies overrule requester group mount policies - common::dataStructures::MountPolicy mountPolicy; - if(!mountPolicies.requesterMountPolicies.empty()) { - mountPolicy = mountPolicies.requesterMountPolicies.front(); - } else if(!mountPolicies.requesterGroupMountPolicies.empty()) { - mountPolicy = mountPolicies.requesterGroupMountPolicies.front(); - } else { - exception::UserError ue; - ue.getMessage() << "Cannot retrieve file because there are no mount rules for the requester or their group:" << - " archiveFileId=" << archiveFileId << " path=" << archiveFile->diskFileInfo.path << " requester=" << - diskInstanceName << ":" << user.name << ":" << user.group; - throw ue; + criteria.archiveFile = *archiveFile; + criteria.mountPolicy = mountPolicy; } - - common::dataStructures::RetrieveFileQueueCriteria criteria; - criteria.archiveFile = *archiveFile; - criteria.mountPolicy = mountPolicy; + criteria.activitiesFairShareWeight = getCachedActivitiesWeights(diskInstanceName); return criteria; } catch(exception::UserError &) { throw; diff --git a/common/dataStructures/DesiredDriveState.hpp b/common/dataStructures/DesiredDriveState.hpp index 07ba573959..4f6302fece 100644 --- a/common/dataStructures/DesiredDriveState.hpp +++ b/common/dataStructures/DesiredDriveState.hpp @@ -35,6 +35,7 @@ struct DesiredDriveState { bool operator==(const DesiredDriveState &rhs) const { return up == rhs.up && forceDown == rhs.forceDown; } + DesiredDriveState(): up(false), forceDown(false) {} }; std::ostream &operator<<(std::ostream& os, const DesiredDriveState& obj); diff --git a/common/dataStructures/DriveState.cpp b/common/dataStructures/DriveState.cpp index a536affd4a..7678a739d5 100644 --- a/common/dataStructures/DriveState.cpp +++ b/common/dataStructures/DriveState.cpp @@ -24,32 +24,6 @@ namespace cta { namespace common { namespace dataStructures { -//------------------------------------------------------------------------------ -// constructor -//------------------------------------------------------------------------------ -DriveState::DriveState(): - sessionId(0), - bytesTransferredInSession(0), - filesTransferredInSession(0), - latestBandwidth(0), - sessionStartTime(0), - mountStartTime(0), - transferStartTime(0), - unloadStartTime(0), - unmountStartTime(0), - drainingStartTime(0), - downOrUpStartTime(0), - probeStartTime(0), - cleanupStartTime(0), - lastUpdateTime(0), - startStartTime(0), - shutdownTime(0), - mountType(dataStructures::MountType::NoMount), - driveStatus(dataStructures::DriveStatus::Down), - desiredDriveState({false, false}), - currentPriority(0), - nextMountType(dataStructures::MountType::NoMount) {} - //------------------------------------------------------------------------------ // operator== //------------------------------------------------------------------------------ diff --git a/common/dataStructures/DriveState.hpp b/common/dataStructures/DriveState.hpp index 075d071899..64afdd153d 100644 --- a/common/dataStructures/DriveState.hpp +++ b/common/dataStructures/DriveState.hpp @@ -36,8 +36,6 @@ namespace dataStructures { */ struct DriveState { - DriveState(); - bool operator==(const DriveState &rhs) const; bool operator!=(const DriveState &rhs) const; @@ -45,37 +43,37 @@ struct DriveState { std::string driveName; std::string host; std::string logicalLibrary; - uint64_t sessionId; - uint64_t bytesTransferredInSession; - uint64_t filesTransferredInSession; - double latestBandwidth; /** < Byte per seconds */ - time_t sessionStartTime; - time_t mountStartTime; - time_t transferStartTime; - time_t unloadStartTime; - time_t unmountStartTime; - time_t drainingStartTime; - time_t downOrUpStartTime; - time_t probeStartTime; - time_t cleanupStartTime; - time_t lastUpdateTime; - time_t startStartTime; - time_t shutdownTime; - MountType mountType; - DriveStatus driveStatus; + uint64_t sessionId = 0; + uint64_t bytesTransferredInSession = 0; + uint64_t filesTransferredInSession = 0; + double latestBandwidth = 0.0; /** < Byte per seconds */ + time_t sessionStartTime = 0; + time_t mountStartTime = 0; + time_t transferStartTime = 0; + time_t unloadStartTime = 0; + time_t unmountStartTime = 0; + time_t drainingStartTime = 0; + time_t downOrUpStartTime = 0; + time_t probeStartTime = 0; + time_t cleanupStartTime = 0; + time_t lastUpdateTime = 0; + time_t startStartTime = 0; + time_t shutdownTime = 0; + MountType mountType = MountType::NoMount; + DriveStatus driveStatus = DriveStatus::Down; DesiredDriveState desiredDriveState; std::string currentVid; std::string currentTapePool; - uint64_t currentPriority; + uint64_t currentPriority = 0; struct ActivityAndWeight { std::string activity; double weight; }; optional<ActivityAndWeight> currentActivityAndWeight; - MountType nextMountType; + MountType nextMountType = MountType::NoMount; std::string nextVid; std::string nextTapepool; - uint64_t nextPriority; + uint64_t nextPriority = 0; optional<ActivityAndWeight> nextActivityAndWeight; }; // struct DriveState diff --git a/common/dataStructures/RetrieveFileQueueCriteria.cpp b/common/dataStructures/RetrieveFileQueueCriteria.cpp index df8980f6f6..237b7b639e 100644 --- a/common/dataStructures/RetrieveFileQueueCriteria.cpp +++ b/common/dataStructures/RetrieveFileQueueCriteria.cpp @@ -25,6 +25,7 @@ RetrieveFileQueueCriteria& RetrieveFileQueueCriteria::operator=(const RetrieveFi if(this != &other){ this->archiveFile = other.archiveFile; this->mountPolicy = other.mountPolicy; + this->activitiesFairShareWeight = other.activitiesFairShareWeight; } return *this; } diff --git a/objectstore/DriveState.cpp b/objectstore/DriveState.cpp index 9a70559451..fa47f28e77 100644 --- a/objectstore/DriveState.cpp +++ b/objectstore/DriveState.cpp @@ -41,34 +41,13 @@ void DriveState::garbageCollect(const std::string& presumedOwner, AgentReference } void DriveState::initialize(const std::string & driveName) { - // Setup underlying object + // Setup underlying object with defaults from dataStructures::DriveState ObjectOps<serializers::DriveState, serializers::DriveState_t>::initialize(); m_payload.set_drivename(driveName); - m_payload.set_host(""); - m_payload.set_logicallibrary(""); - m_payload.set_sessionid(0); - m_payload.set_bytestransferedinsession(0); - m_payload.set_filestransferedinsession(0); - m_payload.set_latestbandwidth(0); - m_payload.set_sessionstarttime(0); - m_payload.set_mountstarttime(0); - m_payload.set_transferstarttime(0); - m_payload.set_unloadstarttime(0); - m_payload.set_unmountstarttime(0); - m_payload.set_drainingstarttime(0); - // In the absence of info, we sent down now. - m_payload.set_downorupstarttime(::time(nullptr)); - m_payload.set_probestarttime(0); - m_payload.set_cleanupstarttime(0); - m_payload.set_lastupdatetime(0); - m_payload.set_startstarttime(0); - m_payload.set_shutdowntime(0); - m_payload.set_mounttype((uint32_t)common::dataStructures::MountType::NoMount); - m_payload.set_drivestatus((uint32_t)common::dataStructures::DriveStatus::Down); - m_payload.set_desiredup(false); - m_payload.set_desiredforcedown(false); - m_payload.set_currentvid(""); - m_payload.set_currenttapepool(""); + cta::common::dataStructures::DriveState driveState; + driveState.driveName = driveName; + driveState.downOrUpStartTime = ::time(nullptr); + setState(driveState); // This object is good to go (to storage) m_payloadInterpreted = true; } diff --git a/scheduler/ArchiveMount.hpp b/scheduler/ArchiveMount.hpp index 2073545e4d..5ff30a24ba 100644 --- a/scheduler/ArchiveMount.hpp +++ b/scheduler/ArchiveMount.hpp @@ -83,6 +83,14 @@ namespace cta { * @return The mount transaction id. */ std::string getMountTransactionId() const override; + + /** + * Return nullopt as activities are for retrieve mounts; + * + * @return nullopt. + */ + optional<std::string> getActivity() const override { return nullopt; } + /** * Indicates that the mount was completed. diff --git a/scheduler/LabelMount.hpp b/scheduler/LabelMount.hpp index 25778cb741..613ceb3411 100644 --- a/scheduler/LabelMount.hpp +++ b/scheduler/LabelMount.hpp @@ -73,6 +73,14 @@ namespace cta { * @return The mount transaction id. */ std::string getMountTransactionId() const override; + + /** + * Return nullopt as activities are for retrieve mounts; + * + * @return nullopt. + */ + optional<std::string> getActivity() const override { return nullopt; } + /** * Indicates that the mount was cancelled. diff --git a/scheduler/OStoreDB/OStoreDB.cpp b/scheduler/OStoreDB/OStoreDB.cpp index e73982e673..15d1a11198 100644 --- a/scheduler/OStoreDB/OStoreDB.cpp +++ b/scheduler/OStoreDB/OStoreDB.cpp @@ -187,7 +187,7 @@ void OStoreDB::ping() { void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, RootEntry& re, log::LogContext & logContext) { utils::Timer t, t2; - // Walk the archive queues for user for statistics + // Walk the archive queues for USER for statistics for (auto & aqp: re.dumpArchiveQueues(JobQueueType::JobsToTransferForUser)) { objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore); // debug utility variable @@ -233,7 +233,7 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro .add("processingTime", processingTime); logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched an archive for user queue."); } - // Walk the archive queues for user for statistics + // Walk the archive queues for REPACK for statistics for (auto & aqp: re.dumpArchiveQueues(JobQueueType::JobsToTransferForRepack)) { objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore); // debug utility variable @@ -301,18 +301,56 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro } // If there are files queued, we create an entry for this retrieve queue in the // mount candidates list. - if (rqueue.getJobsSummary().jobs) { - tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount()); - auto & m = tmdi.potentialMounts.back(); - m.vid = rqp.vid; - m.type = cta::common::dataStructures::MountType::Retrieve; - m.bytesQueued = rqueue.getJobsSummary().bytes; - m.filesQueued = rqueue.getJobsSummary().jobs; - m.oldestJobStartTime = rqueue.getJobsSummary().oldestJobStartTime; - m.priority = rqueue.getJobsSummary().priority; - m.maxDrivesAllowed = rqueue.getJobsSummary().maxDrivesAllowed; - m.minRequestAge = rqueue.getJobsSummary().minRetrieveRequestAge; - m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller. + auto rqSummary = rqueue.getJobsSummary(); + if (rqSummary.jobs) { + // Check if we have activities and if all the jobs are covered by one or not (possible mixed case). + bool jobsWithoutActivity = true; + if (rqSummary.activityCounts.size()) { + if (rqSummary.activityCounts.size() >= rqSummary.jobs) + jobsWithoutActivity = false; + // In all cases, we create one potential mount per activity + for (auto ac: rqSummary.activityCounts) { + tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount()); + auto & m = tmdi.potentialMounts.back(); + m.vid = rqp.vid; + m.type = cta::common::dataStructures::MountType::Retrieve; + m.bytesQueued = rqueue.getJobsSummary().bytes; + m.filesQueued = rqueue.getJobsSummary().jobs; + m.oldestJobStartTime = rqueue.getJobsSummary().oldestJobStartTime; + m.priority = rqueue.getJobsSummary().priority; + m.maxDrivesAllowed = rqueue.getJobsSummary().maxDrivesAllowed; + m.minRequestAge = rqueue.getJobsSummary().minRetrieveRequestAge; + m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller. + m.tapePool = ""; // The tape pool is not know and will be determined by the caller. + m.vendor = ""; // The vendor is not known here, and will be determined by the caller. + m.mediaType = ""; // The logical library is not known here, and will be determined by the caller. + m.vo = ""; // The vo is not known here, and will be determined by the caller. + m.capacityInBytes = 0; // The capacity is not known here, and will be determined by the caller. + m.activityNameAndWeightedMountCount = PotentialMount::ActivityNameAndWeightedMountCount(); + m.activityNameAndWeightedMountCount.value().activity = ac.activity; + m.activityNameAndWeightedMountCount.value().weight = ac.weight; + m.activityNameAndWeightedMountCount.value().weightedMountCount = 0.0; // This value will be computed later by the caller. + m.activityNameAndWeightedMountCount.value().mountCount = 0; // This value will be computed later by the caller. + } + } + if (jobsWithoutActivity) { + tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount()); + auto & m = tmdi.potentialMounts.back(); + m.vid = rqp.vid; + m.type = cta::common::dataStructures::MountType::Retrieve; + m.bytesQueued = rqueue.getJobsSummary().bytes; + m.filesQueued = rqueue.getJobsSummary().jobs; + m.oldestJobStartTime = rqueue.getJobsSummary().oldestJobStartTime; + m.priority = rqueue.getJobsSummary().priority; + m.maxDrivesAllowed = rqueue.getJobsSummary().maxDrivesAllowed; + m.minRequestAge = rqueue.getJobsSummary().minRetrieveRequestAge; + m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller. + m.tapePool = ""; // The tape pool is not know and will be determined by the caller. + m.vendor = ""; // The vendor is not known here, and will be determined by the caller. + m.mediaType = ""; // The logical library is not known here, and will be determined by the caller. + m.vo = ""; // The vo is not known here, and will be determined by the caller. + m.capacityInBytes = 0; // The capacity is not known here, and will be determined by the caller. + } } else { tmdi.queueTrimRequired = true; } @@ -519,7 +557,7 @@ std::unique_ptr<SchedulerDatabase::RetrieveMount> OStoreDB::TapeMountDecisionInf const std::string& mediaType, const std::string& vendor, const uint64_t capacityInBytes, - time_t startTime) { + time_t startTime, const optional<common::dataStructures::DriveState::ActivityAndWeight> &) { throw cta::exception::Exception("In OStoreDB::TapeMountDecisionInfoNoLock::createRetrieveMount(): This function should not be called"); } @@ -3165,7 +3203,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount> inputs.latestBandwidth = 0; inputs.mountSessionId = am.mountInfo.mountId; inputs.reportTime = startTime; - inputs.status = common::dataStructures::DriveStatus::Mounting; + inputs.status = common::dataStructures::DriveStatus::Starting; inputs.vid = tape.vid; inputs.tapepool = tape.tapePool; log::LogContext lc(m_oStoreDB.m_logger); @@ -3191,7 +3229,8 @@ std::unique_ptr<SchedulerDatabase::RetrieveMount> OStoreDB::TapeMountDecisionInfo::createRetrieveMount( const std::string& vid, const std::string & tapePool, const std::string driveName, const std::string& logicalLibrary, const std::string& hostName,const std::string& vo, const std::string& mediaType, - const std::string& vendor,const uint64_t capacityInBytes, time_t startTime) { + const std::string& vendor,const uint64_t capacityInBytes, time_t startTime, + const optional<common::dataStructures::DriveState::ActivityAndWeight>& activityAndWeight) { // In order to create the mount, we have to: // Check we actually hold the scheduling lock // Check the tape exists, add it to ownership and set its activity status to @@ -3223,6 +3262,7 @@ std::unique_ptr<SchedulerDatabase::RetrieveMount> rm.mountInfo.mediaType = mediaType; rm.mountInfo.vendor = vendor; rm.mountInfo.capacityInBytes = capacityInBytes; + if(activityAndWeight) rm.mountInfo.activity = activityAndWeight.value().activity; // Update the status of the drive in the registry { // Get hold of the drive registry @@ -3237,9 +3277,10 @@ std::unique_ptr<SchedulerDatabase::RetrieveMount> inputs.mountType = common::dataStructures::MountType::Retrieve; inputs.mountSessionId = rm.mountInfo.mountId; inputs.reportTime = startTime; - inputs.status = common::dataStructures::DriveStatus::Mounting; + inputs.status = common::dataStructures::DriveStatus::Starting; inputs.vid = rm.mountInfo.vid; inputs.tapepool = rm.mountInfo.tapePool; + inputs.activityAndWeigh = activityAndWeight; log::LogContext lc(m_oStoreDB.m_logger); m_oStoreDB.updateDriveStatus(driveInfo, inputs, lc); } diff --git a/scheduler/OStoreDB/OStoreDB.hpp b/scheduler/OStoreDB/OStoreDB.hpp index a91002d995..de93c966c4 100644 --- a/scheduler/OStoreDB/OStoreDB.hpp +++ b/scheduler/OStoreDB/OStoreDB.hpp @@ -109,7 +109,7 @@ public: const std::string& vo, const std::string& mediaType, const std::string& vendor, const uint64_t capacityInBytes, - time_t startTime) override; + time_t startTime, const optional<common::dataStructures::DriveState::ActivityAndWeight> &activityAndWeight) override; virtual ~TapeMountDecisionInfo(); private: TapeMountDecisionInfo (OStoreDB & oStoreDB); @@ -136,7 +136,7 @@ public: const std::string& vo, const std::string& mediaType, const std::string& vendor, const uint64_t capacityInBytes, - time_t startTime) override; + time_t startTime, const optional<common::dataStructures::DriveState::ActivityAndWeight> &activityAndWeight) override; virtual ~TapeMountDecisionInfoNoLock(); }; @@ -549,6 +549,7 @@ private: double latestBandwidth; std::string vid; std::string tapepool; + optional<common::dataStructures::DriveState::ActivityAndWeight> activityAndWeigh; }; /** Collection of smaller scale parts of reportDriveStats */ struct ReportDriveStatsInputs { diff --git a/scheduler/RetrieveMount.cpp b/scheduler/RetrieveMount.cpp index 43136f9f3c..a629077989 100644 --- a/scheduler/RetrieveMount.cpp +++ b/scheduler/RetrieveMount.cpp @@ -56,6 +56,13 @@ std::string cta::RetrieveMount::getVid() const{ return m_dbMount->mountInfo.vid; } +//------------------------------------------------------------------------------ +// getActivity() +//------------------------------------------------------------------------------ +cta::optional<std::string> cta::RetrieveMount::getActivity() const { + return m_dbMount->mountInfo.activity; +} + //------------------------------------------------------------------------------ // getMountTransactionId() //------------------------------------------------------------------------------ diff --git a/scheduler/RetrieveMount.hpp b/scheduler/RetrieveMount.hpp index 82d3f6e3f2..fff81a1bea 100644 --- a/scheduler/RetrieveMount.hpp +++ b/scheduler/RetrieveMount.hpp @@ -69,6 +69,14 @@ namespace cta { */ virtual std::string getVid() const; + /** + * Returns the (optional) activity for this mount. + * + * @return + */ + optional<std::string> getActivity() const override; + + /** * Returns the mount transaction id. * diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp index 0fc7634220..4a356161ae 100644 --- a/scheduler/Scheduler.cpp +++ b/scheduler/Scheduler.cpp @@ -752,7 +752,7 @@ std::list<common::dataStructures::DriveState> Scheduler::getDriveStates(const co //------------------------------------------------------------------------------ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>& mountInfo, const std::string & logicalLibraryName, const std::string & driveName, utils::Timer & timer, - std::map<tpType, uint32_t> & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList, + ExistingMountSummary & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList, double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc) { // The library information is not know for the tapes involved in retrieves. We // need to query the catalogue now about all those tapes. @@ -793,11 +793,10 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T for (auto & em: mountInfo->existingOrNextMounts) { // If a mount is still listed for our own drive, it is a leftover that we disregard. if (em.driveName!=driveName) { - try { - existingMountsSummary.at(tpType(em.tapePool, common::dataStructures::getMountBasicType(em.type)))++; - } catch (std::out_of_range &) { - existingMountsSummary[tpType(em.tapePool, common::dataStructures::getMountBasicType(em.type))] = 1; - } + existingMountsSummary[TapePoolMountPair(em.tapePool, common::dataStructures::getMountBasicType(em.type))].totalMounts++; + if (em.activity) + existingMountsSummary[TapePoolMountPair(em.tapePool, common::dataStructures::getMountBasicType(em.type))] + .activityMounts[em.activity.value()].value++; if (em.vid.size()) { tapesInUse.insert(em.vid); log::ScopedParamContainer params(lc); @@ -810,17 +809,25 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T } // We can now filter out the potential mounts for which their mount criteria - // is already met, filter out the potential mounts for which the maximum mount + // is not yet met, filter out the potential mounts for which the maximum mount // quota is already reached, and weight the remaining by how much of their quota // is reached for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) { // Get summary data - uint32_t existingMounts; + uint32_t existingMounts = 0; + uint32_t activityMounts = 0; try { - existingMounts = existingMountsSummary.at(tpType(m->tapePool, common::dataStructures::getMountBasicType(m->type))); - } catch (std::out_of_range &) { - existingMounts = 0; - } + existingMounts = existingMountsSummary + .at(TapePoolMountPair(m->tapePool, common::dataStructures::getMountBasicType(m->type))) + .totalMounts; + } catch (std::out_of_range &) {} + if (m->activityNameAndWeightedMountCount) { + try { + activityMounts = existingMountsSummary + .at(TapePoolMountPair(m->tapePool, common::dataStructures::getMountBasicType(m->type))) + .activityMounts.at(m->activityNameAndWeightedMountCount.value().activity).value; + } catch (std::out_of_range &) {} + } uint32_t effectiveExistingMounts = 0; if (m->type == common::dataStructures::MountType::ArchiveForUser) effectiveExistingMounts = existingMounts; bool mountPassesACriteria = false; @@ -852,6 +859,16 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T } else { // populate the mount with a weight m->ratioOfMountQuotaUsed = 1.0L * existingMounts / m->maxDrivesAllowed; + if (m->activityNameAndWeightedMountCount) { + m->activityNameAndWeightedMountCount.value().mountCount = activityMounts; + // Protect against division by zero + if (m->activityNameAndWeightedMountCount.value().weight) { + m->activityNameAndWeightedMountCount.value().weightedMountCount = + 1.0 * activityMounts / m->activityNameAndWeightedMountCount.value().weight; + } else { + m->activityNameAndWeightedMountCount.value().weightedMountCount = std::numeric_limits<double>::max(); + } + } log::ScopedParamContainer params(lc); params.add("tapePool", m->tapePool); if ( m->type == common::dataStructures::MountType::Retrieve) { @@ -918,7 +935,7 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo; mountInfo = m_db.getMountInfoNoLock(lc); getMountInfoTime = timer.secs(utils::Timer::resetCounter); - std::map<tpType, uint32_t> existingMountsSummary; + ExistingMountSummary existingMountsSummary; std::set<std::string> tapesInUse; std::list<catalogue::TapeForWriting> tapeList; @@ -942,7 +959,7 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const catalogueTime = getTapeInfoTime + getTapeForWriteTime; uint32_t existingMounts = 0; try { - existingMounts=existingMountsSummary.at(tpType(m->tapePool, common::dataStructures::getMountBasicType(m->type))); + existingMounts=existingMountsSummary.at(TapePoolMountPair(m->tapePool, common::dataStructures::getMountBasicType(m->type))).totalMounts; } catch (...) {} log::ScopedParamContainer params(lc); params.add("tapePool", m->tapePool) @@ -975,15 +992,21 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const log::ScopedParamContainer params(lc); uint32_t existingMounts = 0; try { - existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type)); + existingMounts=existingMountsSummary.at(TapePoolMountPair(m->tapePool, m->type)).totalMounts; } catch (...) {} schedulerDbTime = getMountInfoTime; catalogueTime = getTapeInfoTime + getTapeForWriteTime; params.add("tapePool", m->tapePool) .add("tapeVid", m->vid) .add("mountType", common::dataStructures::toString(m->type)) - .add("existingMounts", existingMounts) - .add("bytesQueued", m->bytesQueued) + .add("existingMounts", existingMounts); + if (m->activityNameAndWeightedMountCount) { + params.add("activity", m->activityNameAndWeightedMountCount.value().activity) + .add("activityMounts", m->activityNameAndWeightedMountCount.value().weightedMountCount) + .add("ActivityMountCount", m->activityNameAndWeightedMountCount.value().mountCount) + .add("ActivityWeight", m->activityNameAndWeightedMountCount.value().weight); + } + params.add("bytesQueued", m->bytesQueued) .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) .add("filesQueued", m->filesQueued) .add("minFilesToWarrantMount", m_minFilesToWarrantAMount) @@ -1053,7 +1076,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib } __attribute__((unused)) SchedulerDatabase::TapeMountDecisionInfo & debugMountInfo = *mountInfo; - std::map<tpType, uint32_t> existingMountsSummary; + ExistingMountSummary existingMountsSummary; std::set<std::string> tapesInUse; std::list<catalogue::TapeForWriting> tapeList; @@ -1089,12 +1112,11 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib time(NULL)).release()); mountCreationTime += timer.secs(utils::Timer::resetCounter); internalRet->m_sessionRunning = true; - internalRet->setDriveStatus(common::dataStructures::DriveStatus::Starting); driveStatusSetTime += timer.secs(utils::Timer::resetCounter); log::ScopedParamContainer params(lc); uint32_t existingMounts = 0; try { - existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type)); + existingMounts=existingMountsSummary.at(TapePoolMountPair(m->tapePool, common::dataStructures::getMountBasicType(m->type))).totalMounts; } catch (...) {} schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime; catalogueTime = getTapeInfoTime + getTapeForWriteTime; @@ -1140,6 +1162,12 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib try { // create the mount, and populate its DB side. decisionTime += timer.secs(utils::Timer::resetCounter); + optional<common::dataStructures::DriveState::ActivityAndWeight> actvityAndWeight; + if (m->activityNameAndWeightedMountCount) { + actvityAndWeight = common::dataStructures::DriveState::ActivityAndWeight{ + m->activityNameAndWeightedMountCount.value().activity, + m->activityNameAndWeightedMountCount.value().weight }; + } std::unique_ptr<RetrieveMount> internalRet ( new RetrieveMount(mountInfo->createRetrieveMount(m->vid, m->tapePool, @@ -1150,17 +1178,16 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib m->mediaType, m->vendor, m->capacityInBytes, - time(NULL)))); + time(NULL), actvityAndWeight))); mountCreationTime += timer.secs(utils::Timer::resetCounter); internalRet->m_sessionRunning = true; internalRet->m_diskRunning = true; internalRet->m_tapeRunning = true; - internalRet->setDriveStatus(common::dataStructures::DriveStatus::Starting); driveStatusSetTime += timer.secs(utils::Timer::resetCounter); log::ScopedParamContainer params(lc); uint32_t existingMounts = 0; try { - existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type)); + existingMounts=existingMountsSummary.at(TapePoolMountPair(m->tapePool, m->type)).totalMounts; } catch (...) {} schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime; catalogueTime = getTapeInfoTime + getTapeForWriteTime; @@ -1170,7 +1197,14 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib .add("mediaType",m->mediaType) .add("vendor",m->vendor) .add("mountType", common::dataStructures::toString(m->type)) - .add("existingMounts", existingMounts) + .add("existingMounts", existingMounts); + if (m->activityNameAndWeightedMountCount) { + params.add("activity", m->activityNameAndWeightedMountCount.value().activity) + .add("activityMounts", m->activityNameAndWeightedMountCount.value().weightedMountCount) + .add("ActivityMountCount", m->activityNameAndWeightedMountCount.value().mountCount) + .add("ActivityWeight", m->activityNameAndWeightedMountCount.value().weight); + } + params.add("bytesQueued", m->bytesQueued) .add("bytesQueued", m->bytesQueued) .add("minBytesToWarrantMount", m_minBytesToWarrantAMount) .add("filesQueued", m->filesQueued) diff --git a/scheduler/Scheduler.hpp b/scheduler/Scheduler.hpp index 375c1113ee..5cddb3c05e 100644 --- a/scheduler/Scheduler.hpp +++ b/scheduler/Scheduler.hpp @@ -272,14 +272,23 @@ private: */ double m_repackRequestExpansionTimeLimit = 30; - typedef std::pair<std::string, common::dataStructures::MountType> tpType; + typedef std::pair<std::string, common::dataStructures::MountType> TapePoolMountPair; + struct MountCounts { + uint32_t totalMounts = 0; + struct AutoZeroUint32_t { + uint32_t value = 0; + }; + std::map<std::string, AutoZeroUint32_t> activityMounts; + }; + typedef std::map<TapePoolMountPair, MountCounts> ExistingMountSummary; + /** * Common part to getNextMountDryRun() and getNextMount() to populate mount decision info. * The structure should be pre-loaded by the calling function. */ void sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> &mountInfo, const std::string & logicalLibraryName, const std::string & driveName, utils::Timer & timer, - std::map<tpType, uint32_t> & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList, + ExistingMountSummary & existingMountsSummary, std::set<std::string> & tapesInUse, std::list<catalogue::TapeForWriting> & tapeList, double & getTapeInfoTime, double & candidateSortingTime, double & getTapeForWriteTime, log::LogContext & lc); /** diff --git a/scheduler/SchedulerDatabase.hpp b/scheduler/SchedulerDatabase.hpp index 097990f520..9b95e00dac 100644 --- a/scheduler/SchedulerDatabase.hpp +++ b/scheduler/SchedulerDatabase.hpp @@ -351,6 +351,7 @@ public: std::string host; uint64_t capacityInBytes; uint64_t mountId; + optional<std::string> activity; } mountInfo; virtual const MountInfo & getMountInfo() = 0; virtual std::list<std::unique_ptr<cta::SchedulerDatabase::RetrieveJob>> getNextJobBatch(uint64_t filesRequested, @@ -546,8 +547,9 @@ public: uint32_t mountCount; /**< The number of mounts for this tape pool (which is the current "chargeable" entity for quotas. */ struct ActivityNameAndWeightedMountCount { std::string activity; - double weight; - double weightedMountCount; + double weight = 0.0; + uint32_t mountCount = 0; + double weightedMountCount = 0.0; }; /**< Struct describing the activity if we have one for this mount. */ optional<ActivityNameAndWeightedMountCount> activityNameAndWeightedMountCount; @@ -654,7 +656,7 @@ public: const std::string& vo, const std::string& mediaType, const std::string& vendor, const uint64_t capacityInBytes, - time_t startTime) = 0; + time_t startTime, const optional<common::dataStructures::DriveState::ActivityAndWeight> &) = 0; /** Destructor: releases the global lock if not already done */ virtual ~TapeMountDecisionInfo() {}; }; diff --git a/scheduler/SchedulerTest.cpp b/scheduler/SchedulerTest.cpp index 3067e541ae..ddd44dff77 100644 --- a/scheduler/SchedulerTest.cpp +++ b/scheduler/SchedulerTest.cpp @@ -2622,6 +2622,252 @@ TEST_P(SchedulerTest, expandRepackRequestExpansionTimeLimitReached) { } } +TEST_P(SchedulerTest, archiveReportMultipleAndQueueRetrievesWithActivities) { + using namespace cta; + + Scheduler &scheduler = getScheduler(); + auto &catalogue = getCatalogue(); + + setupDefaultCatalogue(); +#ifdef STDOUT_LOGGING + log::StdoutLogger dl("dummy", "unitTest"); +#else + log::DummyLogger dl("", ""); +#endif + log::LogContext lc(dl); + + // We want to virtually archive files on 10 different tapes that will be asked for by different activities. + // Activity A will have a weight of .4, B 0.3, and this allows partially predicting the mount order for them: + // (A or B) (the other) A B A B A (A or B) (the other) A. + // We hence need to create files on 10 different tapes and recall them with the respective activities. + std::map<size_t, uint64_t> archiveFileIds; + cta::range<size_t> fileRange(10); + for (auto i: fileRange) { + // Queue several archive requests. + cta::common::dataStructures::EntryLog creationLog; + creationLog.host="host2"; + creationLog.time=0; + creationLog.username="admin1"; + cta::common::dataStructures::DiskFileInfo diskFileInfo; + diskFileInfo.group="group2"; + diskFileInfo.owner="cms_user"; + diskFileInfo.path="path/to/file"; + diskFileInfo.path += std::to_string(i); + cta::common::dataStructures::ArchiveRequest request; + request.checksumType="ADLER32"; + request.checksumValue="1234abcd"; + request.creationLog=creationLog; + request.diskFileInfo=diskFileInfo; + request.diskFileID="diskFileID"; + request.diskFileID += std::to_string(i); + request.fileSize=100*1000*1000; + cta::common::dataStructures::UserIdentity requester; + requester.name = s_userName; + requester.group = "userGroup"; + request.requester = requester; + request.srcURL="srcURL"; + request.storageClass=s_storageClassName; + archiveFileIds[i] = scheduler.checkAndGetNextArchiveFileId(s_diskInstance, request.storageClass, request.requester, lc); + scheduler.queueArchiveWithGivenId(archiveFileIds[i], s_diskInstance, request, lc); + } + scheduler.waitSchedulerDbSubthreadsComplete(); + + // Check that we have the files in the queues + // TODO: for this to work all the time, we need an index of all requests + // (otherwise we miss the selected ones). + // Could also be limited to querying by ID (global index needed) + std::map<size_t, bool> found; + for (auto & tp: scheduler.getPendingArchiveJobs(lc)) { + for (auto & req: tp.second) { + for (auto i:fileRange) + if (req.archiveFileID == archiveFileIds.at(i)) + found[i] = true; + } + } + for (auto i:fileRange) { + ASSERT_NO_THROW(found.at(i)); + ASSERT_TRUE(found.at(i)); + } + + // Create the environment for the migrations to happen (library + tapes) + const std::string libraryComment = "Library comment"; + const bool libraryIsDisabled = false; + catalogue.createLogicalLibrary(s_adminOnAdminHost, s_libraryName, + libraryIsDisabled, libraryComment); + { + auto libraries = catalogue.getLogicalLibraries(); + ASSERT_EQ(1, libraries.size()); + ASSERT_EQ(s_libraryName, libraries.front().name); + ASSERT_EQ(libraryComment, libraries.front().comment); + } + const uint64_t capacityInBytes = 12345678; + const std::string tapeComment = "Tape comment"; + bool notDisabled = false; + bool notFull = false; + const std::string driveName = "tape_drive"; + for (auto i:fileRange) { + catalogue.createTape(s_adminOnAdminHost, s_vid + std::to_string(i), s_mediaType, s_vendor, s_libraryName, s_tapePoolName, capacityInBytes, + notDisabled, notFull, tapeComment); + catalogue.tapeLabelled(s_vid + std::to_string(i), "tape_drive"); + } + + + { + // Emulate a tape server by asking for a mount and then a file (and succeed the transfer) + std::unique_ptr<cta::TapeMount> mount; + // This first initialization is normally done by the dataSession function. + cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName }; + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down, lc); + scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up, lc); + for (auto i:fileRange) { + i=i; + mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::ArchiveForUser, mount.get()->getMountType()); + auto & osdb=getSchedulerDB(); + auto mi=osdb.getMountInfo(lc); + ASSERT_EQ(1, mi->existingOrNextMounts.size()); + ASSERT_EQ("TestTapePool", mi->existingOrNextMounts.front().tapePool); + std::unique_ptr<cta::ArchiveMount> archiveMount; + archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())); + ASSERT_NE(nullptr, archiveMount.get()); + std::list<std::unique_ptr<cta::ArchiveJob>> archiveJobBatch = archiveMount->getNextJobBatch(1,1,lc); + ASSERT_NE(nullptr, archiveJobBatch.front().get()); + ASSERT_EQ(1, archiveJobBatch.size()); + std::unique_ptr<ArchiveJob> archiveJob = std::move(archiveJobBatch.front()); + archiveJob->tapeFile.blockId = 1; + archiveJob->tapeFile.fSeq = 1; + archiveJob->tapeFile.checksumType = "ADLER32"; + archiveJob->tapeFile.checksumValue = "1234abcd"; + archiveJob->tapeFile.compressedSize = archiveJob->archiveFile.fileSize; + archiveJob->tapeFile.copyNb = 1; + archiveJob->validate(); + std::queue<std::unique_ptr <cta::ArchiveJob >> sDBarchiveJobBatch; + std::queue<cta::catalogue::TapeItemWritten> sTapeItems; + sDBarchiveJobBatch.emplace(std::move(archiveJob)); + archiveMount->reportJobsBatchTransferred(sDBarchiveJobBatch, sTapeItems, lc); + // Mark the tape full so we get one file per tape. + archiveMount->setTapeFull(); + archiveMount->complete(); + } + } + + { + // Emulate the the reporter process reporting successful transfer to tape to the disk system + // The jobs get reported by tape, so we need to report 10*1 file (one per tape). + for (auto i:fileRange) { + i=i; + auto jobsToReport = scheduler.getNextArchiveJobsToReportBatch(10, lc); + ASSERT_EQ(1, jobsToReport.size()); + disk::DiskReporterFactory factory; + log::TimingList timings; + utils::Timer t; + scheduler.reportArchiveJobsBatch(jobsToReport, factory, timings, t, lc); + } + ASSERT_EQ(0, scheduler.getNextArchiveJobsToReportBatch(10, lc).size()); + } + + { + // Declare activities in the catalogue. + catalogue.createActivitiesFairShareWeight(s_adminOnAdminHost, s_diskInstance, "A", 0.4, "No comment"); + catalogue.createActivitiesFairShareWeight(s_adminOnAdminHost, s_diskInstance, "B", 0.3, "No comment"); + auto activities = catalogue.getActivitiesFairShareWeights(); + ASSERT_EQ(1, activities.size()); + auto ac=activities.front(); + ASSERT_EQ(s_diskInstance, ac.diskInstance); + ASSERT_EQ(2, ac.activitiesWeights.size()); + ASSERT_NO_THROW(ac.activitiesWeights.at("A")); + ASSERT_EQ(0.4, ac.activitiesWeights.at("A")); + ASSERT_NO_THROW(ac.activitiesWeights.at("B")); + ASSERT_EQ(0.3, ac.activitiesWeights.at("B")); + } + + { + cta::common::dataStructures::EntryLog creationLog; + creationLog.host="host2"; + creationLog.time=0; + creationLog.username="admin1"; + cta::common::dataStructures::DiskFileInfo diskFileInfo; + diskFileInfo.group="group2"; + diskFileInfo.owner="cms_user"; + diskFileInfo.path="path/to/file"; + for (auto i:fileRange) { + cta::common::dataStructures::RetrieveRequest request; + request.archiveFileID = archiveFileIds.at(i); + request.creationLog = creationLog; + request.diskFileInfo = diskFileInfo; + request.dstURL = "dstURL"; + request.requester.name = s_userName; + request.requester.group = "userGroup"; + if (i < 6) + request.activity = "A"; + else + request.activity = "B"; + scheduler.queueRetrieve(s_diskInstance, request, lc); + } + scheduler.waitSchedulerDbSubthreadsComplete(); + } + + // Check that the retrieve requests are queued + { + auto rqsts = scheduler.getPendingRetrieveJobs(lc); + // We expect 10 tape with queued jobs + ASSERT_EQ(10, rqsts.size()); + // We expect each queue to contain 1 job + for (auto & q: rqsts) { + ASSERT_EQ(1, q.second.size()); + // We expect the job to be single copy + auto & job = q.second.back(); + ASSERT_EQ(1, job.tapeCopies.size()); + // Check the remote target + ASSERT_EQ("dstURL", job.request.dstURL); + } + // We expect each tape to be seen + for (auto i:fileRange) { + ASSERT_NO_THROW(rqsts.at(s_vid + std::to_string(i))); + } + } + + + enum ExpectedActivity { + Unknown, + A, + B + }; + + std::vector<ExpectedActivity> expectedActivities = { Unknown, Unknown, A, B, A, B, A, Unknown, Unknown, A}; + size_t i=0; + for (auto ea: expectedActivities) { + // Emulate a tape server by asking for a mount and then a file (and succeed the transfer) + std::unique_ptr<cta::TapeMount> mount; + std::string drive="drive"; + drive += std::to_string(++i); + mount.reset(scheduler.getNextMount(s_libraryName, drive, lc).release()); + ASSERT_NE(nullptr, mount.get()); + ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType()); + ASSERT_TRUE((bool)mount.get()->getActivity()); + if (ea != Unknown) { + std::string expectedActivity(ea==A?"A":"B"), activity(mount.get()->getActivity().value()); + ASSERT_EQ(expectedActivity, activity); + } + std::unique_ptr<cta::RetrieveMount> retrieveMount; + retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())); + ASSERT_NE(nullptr, retrieveMount.get()); + std::unique_ptr<cta::RetrieveJob> retrieveJob; + auto jobBatch = retrieveMount->getNextJobBatch(1,1,lc); + ASSERT_EQ(1, jobBatch.size()); + retrieveJob.reset(jobBatch.front().release()); + ASSERT_NE(nullptr, retrieveJob.get()); + retrieveJob->asyncSetSuccessful(); + std::queue<std::unique_ptr<cta::RetrieveJob> > jobQueue; + jobQueue.push(std::move(retrieveJob)); + retrieveMount->flushAsyncSuccessReports(jobQueue, lc); + jobBatch = retrieveMount->getNextJobBatch(1,1,lc); + ASSERT_EQ(0, jobBatch.size()); + } +} + + #undef TEST_MOCK_DB #ifdef TEST_MOCK_DB static cta::MockSchedulerDatabaseFactory mockDbFactory; diff --git a/scheduler/TapeMount.hpp b/scheduler/TapeMount.hpp index f30c670585..01dcae8da0 100644 --- a/scheduler/TapeMount.hpp +++ b/scheduler/TapeMount.hpp @@ -20,6 +20,7 @@ #include "common/dataStructures/MountType.hpp" #include "common/dataStructures/DriveStatus.hpp" +#include "common/optional.hpp" #include "tapeserver/castor/tape/tapeserver/daemon/TapeSessionStats.hpp" #include <string> @@ -52,6 +53,14 @@ namespace cta { * @return The mount transaction id. */ virtual std::string getMountTransactionId() const = 0; + + /** + * Return the activity this mount is running for. + * + * @return optional, populated with the activity name if appropriate. + */ + + virtual optional<std::string> getActivity() const = 0; /** * Returns the mount transaction id. diff --git a/scheduler/TapeMountDummy.hpp b/scheduler/TapeMountDummy.hpp index 82cc05df81..ca98d766ef 100644 --- a/scheduler/TapeMountDummy.hpp +++ b/scheduler/TapeMountDummy.hpp @@ -35,6 +35,9 @@ class TapeMountDummy: public TapeMount { cta::common::dataStructures::MountType getMountType() const override { throw exception::Exception("In DummyTapeMount::getMountType() : not implemented"); } + optional<std::string> getActivity() const override { + throw exception::Exception("In DummyTapeMount::getActivity() : not implemented"); + } uint32_t getNbFiles() const override { throw exception::Exception("In DummyTapeMount::getNbFiles() : not implemented"); } -- GitLab