-
Joao Afonso authoredJoao Afonso authored
Scheduler.cpp 100.78 KiB
/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright © 2021-2022 CERN
* @license This program is free software, distributed under the terms of the GNU General Public
* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can
* redistribute it and/or modify it under the terms of the GPL Version 3, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* In applying this licence, CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization or
* submit itself to any jurisdiction.
*/
#include "ArchiveMount.hpp"
#include "common/dataStructures/ArchiveFileQueueCriteriaAndFileId.hpp"
#include "common/exception/NonRetryableError.hpp"
#include "common/exception/NoSuchObject.hpp"
#include "common/exception/UserError.hpp"
#include "common/Timer.hpp"
#include "common/utils/utils.hpp"
#include "disk/DiskFileImplementations.hpp"
#include "disk/RadosStriperPool.hpp"
#include "DiskReportRunner.hpp"
#include "RetrieveMount.hpp"
#include "RetrieveRequestDump.hpp"
#include "Scheduler.hpp"
#include "catalogue/TapeDrivesCatalogueState.hpp"
#include "catalogue/DriveConfig.hpp"
#include <iostream>
#include <sstream>
#include <iomanip>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm>
#include <random>
#include <chrono>
#include <cstdlib>
#include <time.h>
namespace cta {
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
Scheduler::Scheduler(
catalogue::Catalogue &catalogue,
SchedulerDatabase &db, const uint64_t minFilesToWarrantAMount, const uint64_t minBytesToWarrantAMount):
m_catalogue(catalogue), m_db(db), m_minFilesToWarrantAMount(minFilesToWarrantAMount),
m_minBytesToWarrantAMount(minBytesToWarrantAMount) {
m_tapeDrivesState = std::make_unique<TapeDrivesCatalogueState>(m_catalogue);
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
Scheduler::~Scheduler() throw() { }
//------------------------------------------------------------------------------
// ping
//------------------------------------------------------------------------------
void Scheduler::ping(log::LogContext & lc) {
cta::utils::Timer t;
m_catalogue.ping();
auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
m_db.ping();
auto schedulerDbTime = t.secs(cta::utils::Timer::resetCounter);
checkNeededEnvironmentVariables();
auto checkEnvironmentVariablesTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime)
.add("checkEnvironmentVariablesTime",checkEnvironmentVariablesTime);
lc.log(log::INFO, "In Scheduler::ping(): success.");
}
//------------------------------------------------------------------------------
// waitSchedulerDbThreads
//------------------------------------------------------------------------------
void Scheduler::waitSchedulerDbSubthreadsComplete() {
m_db.waitSubthreadsComplete();
}
//------------------------------------------------------------------------------
// authorizeAdmin
//------------------------------------------------------------------------------
void Scheduler::authorizeAdmin(const common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc){
cta::utils::Timer t;
if(!(m_catalogue.isAdmin(cliIdentity))) {
std::stringstream msg;
msg << "User: " << cliIdentity.username << " on host: " << cliIdentity.host << " is not authorized to execute CTA admin commands";
throw exception::UserError(msg.str());
}
auto catalogueTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("catalogueTime", catalogueTime);
lc.log(log::INFO, "In Scheduler::authorizeAdmin(): success.");
}
//------------------------------------------------------------------------------
// checkAndGetNextArchiveFileId
//------------------------------------------------------------------------------
uint64_t Scheduler::checkAndGetNextArchiveFileId(const std::string &instanceName,
const std::string &storageClassName, const common::dataStructures::RequesterIdentity &user, log::LogContext &lc) {
cta::utils::Timer t;
const uint64_t archiveFileId = m_catalogue.checkAndGetNextArchiveFileId(instanceName, storageClassName, user);
const auto catalogueTime = t.secs();
const auto schedulerDbTime = catalogueTime;
log::ScopedParamContainer spc(lc);
spc.add("instanceName", instanceName)
.add("username", user.name)
.add("usergroup", user.group)
.add("storageClass", storageClassName)
.add("fileId", archiveFileId)
.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "Checked request and got next archive file ID");
return archiveFileId;
}
//------------------------------------------------------------------------------
// queueArchiveWithGivenId
//------------------------------------------------------------------------------
std::string Scheduler::queueArchiveWithGivenId(const uint64_t archiveFileId, const std::string &instanceName,
const cta::common::dataStructures::ArchiveRequest &request, log::LogContext &lc) {
cta::utils::Timer t;
using utils::postEllipsis;
using utils::midEllipsis;
if (!request.fileSize)
throw cta::exception::UserError(std::string("Rejecting archive request for zero-length file: ")+request.diskFileInfo.path);
const auto queueCriteria = m_catalogue.getArchiveFileQueueCriteria(instanceName, request.storageClass,
request.requester);
auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
const common::dataStructures::ArchiveFileQueueCriteriaAndFileId catalogueInfo(archiveFileId,
queueCriteria.copyToPoolMap, queueCriteria.mountPolicy);
std::string archiveReqAddr = m_db.queueArchive(instanceName, request, catalogueInfo, lc);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("instanceName", instanceName)
.add("storageClass", request.storageClass)
.add("diskFileID", request.diskFileID)
.add("fileSize", request.fileSize)
.add("fileId", catalogueInfo.fileId);
for (auto & ctp: catalogueInfo.copyToPoolMap) {
std::stringstream tp;
tp << "tapePool" << ctp.first;
spc.add(tp.str(), ctp.second);
}
spc.add("policyName", catalogueInfo.mountPolicy.name)
.add("policyArchiveMinAge", catalogueInfo.mountPolicy.archiveMinRequestAge)
.add("policyArchivePriority", catalogueInfo.mountPolicy.archivePriority)
.add("diskFilePath", request.diskFileInfo.path)
.add("diskFileOwnerUid", request.diskFileInfo.owner_uid)
.add("diskFileGid", request.diskFileInfo.gid)
.add("archiveReportURL", midEllipsis(request.archiveReportURL, 50, 15))
.add("archiveErrorReportURL", midEllipsis(request.archiveErrorReportURL, 50, 15))
.add("creationHost", request.creationLog.host)
.add("creationTime", request.creationLog.time)
.add("creationUser", request.creationLog.username)
.add("requesterName", request.requester.name)
.add("requesterGroup", request.requester.group)
.add("srcURL", midEllipsis(request.srcURL, 50, 15))
.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime);
request.checksumBlob.addFirstChecksumToLog(spc);
lc.log(log::INFO, "Queued archive request");
return archiveReqAddr;
}
//------------------------------------------------------------------------------
// queueRetrieve
//------------------------------------------------------------------------------
std::string Scheduler::queueRetrieve(
const std::string &instanceName,
common::dataStructures::RetrieveRequest &request,
log::LogContext & lc) {
using utils::postEllipsis;
using utils::midEllipsis;
utils::Timer t;
// Get the queue criteria
common::dataStructures::RetrieveFileQueueCriteria queueCriteria;
queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, request.activity, lc, request.mountPolicy);
queueCriteria.archiveFile.diskFileInfo = request.diskFileInfo;
auto diskSystemList = m_catalogue.getAllDiskSystems();
auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
// By default, the scheduler makes its decision based on all available vids. But if a vid is specified in the protobuf,
// ignore all the others.
if(request.vid) {
queueCriteria.archiveFile.tapeFiles.removeAllVidsExcept(*request.vid);
if(queueCriteria.archiveFile.tapeFiles.empty()) {
exception::UserError ex;
ex.getMessage() << "VID " << *request.vid << " does not contain a tape copy of file with archive file ID " << request.archiveFileID;
throw ex;
}
}
// Determine disk system for this request, if any
std::optional<std::string> diskSystemName;
try {
diskSystemName = diskSystemList.getDSName(request.dstURL);
} catch (std::out_of_range&) {}
auto requestInfo = m_db.queueRetrieve(request, queueCriteria, diskSystemName, lc);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("fileId", request.archiveFileID)
.add("instanceName", instanceName)
.add("diskFilePath", request.diskFileInfo.path)
.add("diskFileOwnerUid", request.diskFileInfo.owner_uid)
.add("diskFileGid", request.diskFileInfo.gid)
.add("dstURL", request.dstURL)
.add("errorReportURL", request.errorReportURL)
.add("creationHost", request.creationLog.host)
.add("creationTime", request.creationLog.time)
.add("creationUser", request.creationLog.username)
.add("requesterName", request.requester.name)
.add("requesterGroup", request.requester.group)
.add("criteriaArchiveFileId", queueCriteria.archiveFile.archiveFileID)
.add("criteriaCreationTime", queueCriteria.archiveFile.creationTime)
.add("criteriaDiskFileId", queueCriteria.archiveFile.diskFileId)
.add("criteriaDiskFileOwnerUid", queueCriteria.archiveFile.diskFileInfo.owner_uid)
.add("criteriaDiskInstance", queueCriteria.archiveFile.diskInstance)
.add("criteriaFileSize", queueCriteria.archiveFile.fileSize)
.add("reconciliationTime", queueCriteria.archiveFile.reconciliationTime)
.add("storageClass", queueCriteria.archiveFile.storageClass);
queueCriteria.archiveFile.checksumBlob.addFirstChecksumToLog(spc);
const auto &tapeFile = queueCriteria.archiveFile.tapeFiles.front();
spc.add("fSeq", tapeFile.fSeq)
.add("vid", tapeFile.vid)
.add("blockId", tapeFile.blockId)
.add("fileSize", tapeFile.fileSize)
.add("copyNb", tapeFile.copyNb)
.add("creationTime", tapeFile.creationTime);
spc.add("selectedVid", requestInfo.selectedVid)
.add("verifyOnly", request.isVerifyOnly)
.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime)
.add("policyName", queueCriteria.mountPolicy.name)
.add("policyMinAge", queueCriteria.mountPolicy.retrieveMinRequestAge)
.add("policyPriority", queueCriteria.mountPolicy.retrievePriority)
.add("retrieveRequestId", requestInfo.requestId);
if (request.activity)
spc.add("activity", request.activity.value());
lc.log(log::INFO, "Queued retrieve request");
return requestInfo.requestId;
}
//------------------------------------------------------------------------------
// deleteArchive
//------------------------------------------------------------------------------
void Scheduler::deleteArchive(const std::string &instanceName, const common::dataStructures::DeleteArchiveRequest &request,
log::LogContext & lc) {
// We have different possible scenarios here. The file can be safe in the catalogue,
// fully queued, or partially queued.
// First, make sure the file is not queued anymore.
utils::Timer t;
log::TimingList tl;
if(request.address) {
//Check if address is provided, we can remove the request from the objectstore
m_db.cancelArchive(request,lc);
// no need to do anything else, if file was failed it will not be in the catalogue.
}
tl.insertAndReset("schedulerDbTime",t);
m_catalogue.moveArchiveFileToRecycleLog(request,lc);
tl.insertAndReset("catalogueTime",t);
log::ScopedParamContainer spc(lc);
tl.addToLog(spc);
lc.log(log::INFO, "In Scheduler::deleteArchive(): success.");
}
//------------------------------------------------------------------------------
// abortRetrieve
//------------------------------------------------------------------------------
void Scheduler::abortRetrieve(const std::string &instanceName, const common::dataStructures::CancelRetrieveRequest &request, log::LogContext & lc) {
m_db.cancelRetrieve(instanceName, request, lc);
}
void Scheduler::deleteFailed(const std::string &objectId, log::LogContext & lc) {
m_db.deleteFailed(objectId, lc);
}
void Scheduler::checkTapeCanBeRepacked(const std::string & vid, const SchedulerDatabase::QueueRepackRequest & repackRequest){
try{
auto vidToTapesMap = m_catalogue.getTapesByVid(vid); //throws an exception if the vid is not found on the database
cta::common::dataStructures::Tape tapeToCheck = vidToTapesMap.at(vid);
if(!tapeToCheck.full){
throw exception::UserError("You must set the tape as full before repacking it.");
}
if(tapeToCheck.state == common::dataStructures::Tape::BROKEN){
throw exception::UserError(std::string("You cannot repack a tape that is ") + common::dataStructures::Tape::stateToString(common::dataStructures::Tape::BROKEN) + ".");
}
if(tapeToCheck.isDisabled() && !repackRequest.m_forceDisabledTape){
throw exception::UserError(std::string("You cannot repack a ") + common::dataStructures::Tape::stateToString(common::dataStructures::Tape::DISABLED)+ " tape. You can force it by using the flag --disabledtape.");
}
} catch(const exception::UserError& userEx){
throw userEx;
} catch(const cta::exception::Exception & ex){
throw exception::UserError("The VID provided for repacking does not exist");
}
}
//------------------------------------------------------------------------------
// repack
//------------------------------------------------------------------------------
void Scheduler::queueRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const SchedulerDatabase::QueueRepackRequest & repackRequest, log::LogContext & lc) {
// Check request sanity
SchedulerDatabase::QueueRepackRequest repackRequestToQueue = repackRequest;
repackRequestToQueue.m_creationLog = common::dataStructures::EntryLog(cliIdentity.username,cliIdentity.host,::time(nullptr));
std::string vid = repackRequest.m_vid;
std::string repackBufferURL = repackRequest.m_repackBufferURL;
if (vid.empty()) throw exception::UserError("Empty VID name.");
if (repackBufferURL.empty()) throw exception::UserError("Empty buffer URL.");
utils::Timer t;
checkTapeCanBeRepacked(vid,repackRequestToQueue);
std::string repackRequestAddress = m_db.queueRepack(repackRequestToQueue, lc);
log::TimingList tl;
tl.insertAndReset("schedulerDbTime", t);
log::ScopedParamContainer params(lc);
params.add("tapeVid", vid)
.add("repackType", toString(repackRequest.m_repackType))
.add("forceDisabledTape", repackRequest.m_forceDisabledTape)
.add("mountPolicy", repackRequest.m_mountPolicy.name)
.add("noRecall", repackRequest.m_noRecall)
.add("creationHostName",repackRequestToQueue.m_creationLog.host)
.add("creationUserName",repackRequestToQueue.m_creationLog.username)
.add("creationTime",repackRequestToQueue.m_creationLog.time)
.add("bufferURL", repackRequest.m_repackBufferURL)
.add("repackRequestAddress", repackRequestAddress);
tl.addToLog(params);
lc.log(log::INFO, "In Scheduler::queueRepack(): success.");
}
//------------------------------------------------------------------------------
// cancelRepack
//------------------------------------------------------------------------------
void Scheduler::cancelRepack(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &vid, log::LogContext & lc) {
m_db.cancelRepack(vid, lc);
}
//------------------------------------------------------------------------------
// getRepacks
//------------------------------------------------------------------------------
std::list<common::dataStructures::RepackInfo> Scheduler::getRepacks() {
return m_db.getRepackInfo();
}
//------------------------------------------------------------------------------
// getRepack
//------------------------------------------------------------------------------
common::dataStructures::RepackInfo Scheduler::getRepack(const std::string &vid) {
return m_db.getRepackInfo(vid);
}
//------------------------------------------------------------------------------
// isBeingRepacked
//------------------------------------------------------------------------------
bool Scheduler::isBeingRepacked(const std::string &vid) {
try {
getRepack(vid);
return true;
} catch(cta::exception::UserError&) {
return false;
}
}
//------------------------------------------------------------------------------
// promoteRepackRequestsToToExpand
//------------------------------------------------------------------------------
void Scheduler::promoteRepackRequestsToToExpand(log::LogContext & lc) {
// We target 2 fresh requests available for processing (ToExpand or Starting).
const size_t targetAvailableRequests = 2;
// Dry-run test to check if promotion is needed.
auto repackStatsNL = m_db.getRepackStatisticsNoLock();
// Statistics are supposed to be initialized for each status value. We only try to
// expand if there are requests available in Pending status.
typedef common::dataStructures::RepackInfo::Status Status;
if (repackStatsNL->at(Status::Pending) &&
(targetAvailableRequests > repackStatsNL->at(Status::ToExpand) + repackStatsNL->at(Status::Starting))) {
// Let's try to promote a repack request. Take the lock.
repackStatsNL.reset();
decltype(m_db.getRepackStatistics()) repackStats;
try {
repackStats = m_db.getRepackStatistics();
} catch (SchedulerDatabase::RepackRequestStatistics::NoPendingRequestQueue &) {
// Nothing to promote after all.
return;
}
if (repackStats->at(Status::Pending) &&
(targetAvailableRequests > repackStats->at(Status::ToExpand) + repackStats->at(Status::Starting))) {
auto requestsToPromote = targetAvailableRequests;
requestsToPromote -= repackStats->at(Status::ToExpand);
requestsToPromote -= repackStats->at(Status::Starting);
auto stats = repackStats->promotePendingRequestsForExpansion(requestsToPromote, lc);
log::ScopedParamContainer params(lc);
params.add("promotedRequests", stats.promotedRequests)
.add("pendingBefore", stats.pendingBefore)
.add("toEnpandBefore", stats.toEnpandBefore)
.add("pendingAfter", stats.pendingAfter)
.add("toExpandAfter", stats.toExpandAfter);
lc.log(log::INFO,"In Scheduler::promoteRepackRequestsToToExpand(): Promoted repack request to \"to expand\"");
}
}
}
//------------------------------------------------------------------------------
// getNextRepackRequestToExpand
//------------------------------------------------------------------------------
std::unique_ptr<RepackRequest> Scheduler::getNextRepackRequestToExpand() {
std::unique_ptr<cta::SchedulerDatabase::RepackRequest> repackRequest;
repackRequest = m_db.getNextRepackJobToExpand();
if(repackRequest != nullptr){
std::unique_ptr<RepackRequest> ret(new RepackRequest());
ret->m_dbReq.reset(repackRequest.release());
return ret;
}
return nullptr;
}
//------------------------------------------------------------------------------
// expandRepackRequest
//------------------------------------------------------------------------------
void Scheduler::expandRepackRequest(std::unique_ptr<RepackRequest>& repackRequest, log::TimingList& timingList, utils::Timer& t, log::LogContext& lc) {
auto repackInfo = repackRequest->getRepackInfo();
typedef cta::common::dataStructures::RepackInfo::Type RepackType;
//We need to get the ArchiveRoutes to allow the retrieval of the tapePool in which the
//tape where the file is is located
std::list<common::dataStructures::ArchiveRoute> routes = m_catalogue.getArchiveRoutes();
timingList.insertAndReset("catalogueGetArchiveRoutesTime",t);
//To identify the routes, we need to have both the dist instance name and the storage class name
//thus, the key of the map is a pair of string
cta::common::dataStructures::ArchiveRoute::FullMap archiveRoutesMap;
for(auto route: routes){
//insert the route into the map to allow a quick retrieval
archiveRoutesMap[route.storageClassName][route.copyNb] = route;
}
uint64_t fSeq;
cta::SchedulerDatabase::RepackRequest::TotalStatsFiles totalStatsFile;
repackRequest->m_dbReq->fillLastExpandedFSeqAndTotalStatsFile(fSeq,totalStatsFile);
timingList.insertAndReset("fillTotalStatsFileBeforeExpandTime",t);
cta::catalogue::Catalogue::ArchiveFileItor archiveFilesForCatalogue = m_catalogue.getArchiveFilesForRepackItor(repackInfo.vid, fSeq);
timingList.insertAndReset("catalogueGetArchiveFilesForRepackItorTime",t);
std::stringstream dirBufferURL;
dirBufferURL << repackInfo.repackBufferBaseURL << "/" << repackInfo.vid << "/";
std::set<std::string> filesInDirectory;
std::unique_ptr<cta::disk::Directory> dir;
if(archiveFilesForCatalogue.hasMore()){
//We only create the folder if there are some files to Repack
cta::disk::DirectoryFactory dirFactory;
dir.reset(dirFactory.createDirectory(dirBufferURL.str()));
try {
if(dir->exist()){
//Repack tape repair workflow
filesInDirectory = dir->getFilesName();
} else {
if(repackInfo.noRecall){
//The buffer directory should be created if the --no-recall flag has been passed
//So we throw an exception
throw ExpandRepackRequestException("In Scheduler::expandRepackRequest(): the flag --no-recall is set but no buffer directory has been created.");
}
dir->mkdir();
}
} catch (const cta::exception::XrootCl &ex) {
throw ExpandRepackRequestException("In Scheduler::expandRepackRequest(): errors while doing some checks on the repack buffer. ExceptionMsg = " + ex.getMessageValue());
}
}
std::list<common::dataStructures::StorageClass> storageClasses = m_catalogue.getStorageClasses();
repackRequest->m_dbReq->setExpandStartedAndChangeStatus();
uint64_t nbRetrieveSubrequestsQueued = 0;
std::list<cta::common::dataStructures::ArchiveFile> archiveFilesFromCatalogue;
while(archiveFilesForCatalogue.hasMore()){
archiveFilesFromCatalogue.push_back(archiveFilesForCatalogue.next());
}
if(repackInfo.noRecall){
archiveFilesFromCatalogue.remove_if([&repackInfo, &filesInDirectory](const common::dataStructures::ArchiveFile & archiveFile){
//We remove all the elements that are not in the repack buffer so that we don't recall them
return std::find_if(filesInDirectory.begin(), filesInDirectory.end(),[&archiveFile, &repackInfo](const std::string & fseq){
//If we find a tape file that has the current fseq and belongs to the VID to repack, then we DON'T remove it from
//the archiveFilesFromCatalogue list
return std::find_if(archiveFile.tapeFiles.begin(), archiveFile.tapeFiles.end(),[&repackInfo, &fseq](const common::dataStructures::TapeFile & tapeFile){
//Can we find, in the archiveFilesFromCatalogue list an archiveFile that contains a tapefile that belongs to the VID to repack and that has the
//fseq of the current file read from the filesInDirectory list ?
return tapeFile.vid == repackInfo.vid && tapeFile.fSeq == cta::utils::toUint64(cta::utils::removePrefix(fseq,'0'));
}) != archiveFile.tapeFiles.end();
}) == filesInDirectory.end();
});
}
std::list<SchedulerDatabase::RepackRequest::Subrequest> retrieveSubrequests;
uint64_t maxAddedFSeq = 0;
while(!archiveFilesFromCatalogue.empty()) {
fSeq++;
retrieveSubrequests.push_back(cta::SchedulerDatabase::RepackRequest::Subrequest());
auto archiveFile = archiveFilesFromCatalogue.front();
archiveFilesFromCatalogue.pop_front();
auto & retrieveSubRequest = retrieveSubrequests.back();
retrieveSubRequest.archiveFile = archiveFile;
retrieveSubRequest.fSeq = std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max();
//Check that all the archive routes have been configured, if one archive route is missing, we fail the repack request.
auto archiveFileRoutes = archiveRoutesMap[archiveFile.storageClass];
auto storageClassOfArchiveFile = std::find_if(storageClasses.begin(),storageClasses.end(),[&archiveFile](const common::dataStructures::StorageClass& sc){
return sc.name == archiveFile.storageClass;
});
if(storageClassOfArchiveFile == storageClasses.end()) {
//No storage class have been found for the current tapefile throw an exception
deleteRepackBuffer(std::move(dir),lc);
std::ostringstream oss;
oss << "In Scheduler::expandRepackRequest(): No storage class have been found for the file to repack. ArchiveFileID=" << archiveFile.archiveFileID << " StorageClass of the file=" << archiveFile.storageClass;
throw ExpandRepackRequestException(oss.str());
}
common::dataStructures::StorageClass sc = *storageClassOfArchiveFile;
// We have to determine which copynbs we want to rearchive, and under which fSeq we record this file.
if (repackInfo.type == RepackType::MoveAndAddCopies || repackInfo.type == RepackType::MoveOnly) {
// determine which fSeq(s) (normally only one) lives on this tape.
for (auto & tc: archiveFile.tapeFiles) if (tc.vid == repackInfo.vid) {
// We make the (reasonable) assumption that the archive file only has one copy on this tape.
// If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape.
// This will prevent double subrequest creation (we already have such a mechanism in case of crash and
// restart of expansion.
//Here, test that the archive route of the copyNb of the tape file is configured
try {
archiveFileRoutes.at(tc.copyNb);
} catch (const std::out_of_range & ex) {
deleteRepackBuffer(std::move(dir),lc);
std::ostringstream oss;
oss << "In Scheduler::expandRepackRequest(): the file archiveFileID=" << archiveFile.archiveFileID << ", copyNb=" << std::to_string(tc.copyNb) << ", storageClass=" << archiveFile.storageClass << " does not have any archive route for archival.";
throw ExpandRepackRequestException(oss.str());
}
totalStatsFile.totalFilesToArchive += 1;
totalStatsFile.totalBytesToArchive += retrieveSubRequest.archiveFile.fileSize;
retrieveSubRequest.copyNbsToRearchive.insert(tc.copyNb);
retrieveSubRequest.fSeq = tc.fSeq;
}
}
if(repackInfo.type == RepackType::AddCopiesOnly || repackInfo.type == RepackType::MoveAndAddCopies){
//We are in the case where we possibly need to create new copies (if the number of copies the storage class of the current ArchiveFile
//is greater than the number of tape files we have in the current ArchiveFile)
uint64_t nbFilesAlreadyArchived = getNbFilesAlreadyArchived(archiveFile);
uint64_t nbCopiesInStorageClass = sc.nbCopies;
uint64_t filesToArchive = nbCopiesInStorageClass - nbFilesAlreadyArchived;
if(filesToArchive > 0){
totalStatsFile.totalFilesToArchive += filesToArchive;
totalStatsFile.totalBytesToArchive += (filesToArchive * archiveFile.fileSize);
std::set<uint64_t> copyNbsAlreadyInCTA;
for (auto & tc: archiveFile.tapeFiles) {
copyNbsAlreadyInCTA.insert(tc.copyNb);
if (tc.vid == repackInfo.vid) {
// We make the (reasonable) assumption that the archive file only has one copy on this tape.
// If not, we will ensure the subrequest is filed under the lowest fSeq existing on this tape.
// This will prevent double subrequest creation (we already have such a mechanism in case of crash and
// restart of expansion.
//We found the copy of the file we want to retrieve and archive
//retrieveSubRequest.fSeq = tc.fSeq;
if(repackInfo.type == RepackType::AddCopiesOnly)
retrieveSubRequest.fSeq = (retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) ? tc.fSeq : std::max(tc.fSeq, retrieveSubRequest.fSeq);
}
}
for(auto archiveFileRoutesItor = archiveFileRoutes.begin(); archiveFileRoutesItor != archiveFileRoutes.end(); ++archiveFileRoutesItor){
if(copyNbsAlreadyInCTA.find(archiveFileRoutesItor->first) == copyNbsAlreadyInCTA.end()){
//We need to archive the missing copy
retrieveSubRequest.copyNbsToRearchive.insert(archiveFileRoutesItor->first);
}
}
if(retrieveSubRequest.copyNbsToRearchive.size() < filesToArchive){
deleteRepackBuffer(std::move(dir),lc);
throw ExpandRepackRequestException("In Scheduler::expandRepackRequest(): Missing archive routes for the creation of the new copies of the files");
}
} else {
if(repackInfo.type == RepackType::AddCopiesOnly){
//Nothing to Archive so nothing to Retrieve as well
retrieveSubrequests.pop_back();
continue;
}
}
}
std::stringstream fileName;
fileName << std::setw(9) << std::setfill('0') << retrieveSubRequest.fSeq;
bool createArchiveSubrequest = false;
if(filesInDirectory.count(fileName.str())){
cta::disk::RadosStriperPool radosStriperPool;
cta::disk::DiskFileFactory fileFactory("",0,radosStriperPool);
cta::disk::ReadFile *fileReader = fileFactory.createReadFile(dirBufferURL.str() + fileName.str());
if(fileReader->size() == archiveFile.fileSize){
createArchiveSubrequest = true;
}
}
if (!createArchiveSubrequest && retrieveSubRequest.fSeq == std::numeric_limits<decltype(retrieveSubRequest.fSeq)>::max()) {
if(!createArchiveSubrequest){
log::ScopedParamContainer params(lc);
params.add("fileId", retrieveSubRequest.archiveFile.archiveFileID)
.add("repackVid", repackInfo.vid);
lc.log(log::ERR, "In Scheduler::expandRepackRequest(): no fSeq found for this file on this tape.");
totalStatsFile.totalBytesToRetrieve -= retrieveSubRequest.archiveFile.fileSize;
totalStatsFile.totalFilesToRetrieve -= 1;
retrieveSubrequests.pop_back();
}
} else {
if(!createArchiveSubrequest){
totalStatsFile.totalBytesToRetrieve += retrieveSubRequest.archiveFile.fileSize;
totalStatsFile.totalFilesToRetrieve += 1;
} else {
totalStatsFile.userProvidedFiles += 1;
retrieveSubRequest.hasUserProvidedFile = true;
}
// We found some copies to rearchive. We still have to decide which file path we are going to use.
// File path will be base URL + /<VID>/<fSeq>
maxAddedFSeq = std::max(maxAddedFSeq,retrieveSubRequest.fSeq);
retrieveSubRequest.fileBufferURL = dirBufferURL.str() + fileName.str();
}
}
auto diskSystemList = m_catalogue.getAllDiskSystems();
timingList.insertAndReset("getDisksystemsListTime",t);
try{
// Note: the highest fSeq will be recorded internally in the following call.
// We know that the fSeq processed on the tape are >= initial fSeq + filesCount - 1 (or fSeq - 1 as we counted).
// We pass this information to the db for recording in the repack request. This will allow restarting from the right
// value in case of crash.
nbRetrieveSubrequestsQueued = repackRequest->m_dbReq->addSubrequestsAndUpdateStats(retrieveSubrequests, archiveRoutesMap, fSeq, maxAddedFSeq, totalStatsFile, diskSystemList, lc);
} catch(const cta::ExpandRepackRequestException& e){
deleteRepackBuffer(std::move(dir),lc);
throw e;
}
timingList.insertAndReset("addSubrequestsAndUpdateStatsTime",t);
log::ScopedParamContainer params(lc);
params.add("tapeVid",repackInfo.vid);
timingList.addToLog(params);
if(archiveFilesFromCatalogue.empty() && totalStatsFile.totalFilesToArchive == 0 && (totalStatsFile.totalFilesToRetrieve == 0 || nbRetrieveSubrequestsQueued == 0)){
//If no files have been retrieve, the repack buffer will have to be deleted
//TODO : in case of Repack tape repair, we should not try to delete the buffer
deleteRepackBuffer(std::move(dir),lc);
}
repackRequest->m_dbReq->expandDone();
lc.log(log::INFO,"In Scheduler::expandRepackRequest(), repack request expanded");
}
//------------------------------------------------------------------------------
// Scheduler::getNextRepackReportBatch
//------------------------------------------------------------------------------
Scheduler::RepackReportBatch Scheduler::getNextRepackReportBatch(log::LogContext& lc) {
RepackReportBatch ret;
ret.m_DbBatch = std::move(m_db.getNextRepackReportBatch(lc));
return ret;
}
//------------------------------------------------------------------------------
// Scheduler::getRepackReportBatches
//------------------------------------------------------------------------------
std::list<Scheduler::RepackReportBatch> Scheduler::getRepackReportBatches(log::LogContext &lc){
std::list<Scheduler::RepackReportBatch> ret;
for(auto& reportBatch: m_db.getRepackReportBatches(lc)){
Scheduler::RepackReportBatch report;
report.m_DbBatch.reset(reportBatch.release());
ret.push_back(std::move(report));
}
return ret;
}
//------------------------------------------------------------------------------
// Scheduler::getNextSuccessfulRetrieveRepackReportBatch
//------------------------------------------------------------------------------
Scheduler::RepackReportBatch Scheduler::getNextSuccessfulRetrieveRepackReportBatch(log::LogContext &lc){
Scheduler::RepackReportBatch ret;
try{
ret.m_DbBatch.reset(m_db.getNextSuccessfulRetrieveRepackReportBatch(lc).release());
} catch (SchedulerDatabase::NoRepackReportBatchFound &){
ret.m_DbBatch = nullptr;
}
return ret;
}
//------------------------------------------------------------------------------
// Scheduler::getNextFailedRetrieveRepackReportBatch
//------------------------------------------------------------------------------
Scheduler::RepackReportBatch Scheduler::getNextFailedRetrieveRepackReportBatch(log::LogContext &lc){
Scheduler::RepackReportBatch ret;
try{
ret.m_DbBatch.reset(m_db.getNextFailedRetrieveRepackReportBatch(lc).release());
} catch (SchedulerDatabase::NoRepackReportBatchFound &){
ret.m_DbBatch = nullptr;
}
return ret;
}
//------------------------------------------------------------------------------
// Scheduler::getNextSuccessfulArchiveRepackReportBatch
//------------------------------------------------------------------------------
Scheduler::RepackReportBatch Scheduler::getNextSuccessfulArchiveRepackReportBatch(log::LogContext &lc){
Scheduler::RepackReportBatch ret;
try{
ret.m_DbBatch.reset(m_db.getNextSuccessfulArchiveRepackReportBatch(lc).release());
} catch (SchedulerDatabase::NoRepackReportBatchFound &){
ret.m_DbBatch = nullptr;
}
return ret;
}
//------------------------------------------------------------------------------
// Scheduler::getNextFailedArchiveRepackReportBatch
//------------------------------------------------------------------------------
Scheduler::RepackReportBatch Scheduler::getNextFailedArchiveRepackReportBatch(log::LogContext &lc){
Scheduler::RepackReportBatch ret;
try{
ret.m_DbBatch.reset(m_db.getNextFailedArchiveRepackReportBatch(lc).release());
} catch (SchedulerDatabase::NoRepackReportBatchFound &){
ret.m_DbBatch = nullptr;
}
return ret;
}
//------------------------------------------------------------------------------
// Scheduler::RepackReportBatch::report
//------------------------------------------------------------------------------
void Scheduler::RepackReportBatch::report(log::LogContext& lc) {
if (nullptr == m_DbBatch) {
// lc.log(log::DEBUG, "In Scheduler::RepackReportBatch::report(): empty batch.");
} else {
m_DbBatch->report(lc);
}
}
//------------------------------------------------------------------------------
// getDesiredDriveState
//------------------------------------------------------------------------------
common::dataStructures::DesiredDriveState Scheduler::getDesiredDriveState(const std::string& driveName, log::LogContext & lc) {
utils::Timer t;
auto driveStates = m_tapeDrivesState->getDriveStates(lc);
for (auto & driveState : driveStates) {
if (driveState.driveName == driveName) {
auto schedulerDbTime = t.secs();
if (schedulerDbTime > 1) {
log::ScopedParamContainer spc(lc);
spc.add("drive", driveName)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::DEBUG, "In Scheduler::getDesiredDriveState(): success.");
}
common::dataStructures::DesiredDriveState desiredDriveState;
desiredDriveState.up = driveState.desiredUp;
desiredDriveState.forceDown = driveState.desiredForceDown;
desiredDriveState.reason = driveState.reasonUpDown;
desiredDriveState.comment = driveState.userComment;
return desiredDriveState;
}
}
throw NoSuchDrive ("In Scheduler::getDesiredDriveState(): no such drive");
}
//------------------------------------------------------------------------------
// setDesiredDriveState
//------------------------------------------------------------------------------
void Scheduler::setDesiredDriveState(const common::dataStructures::SecurityIdentity &cliIdentity, const std::string &driveName, const common::dataStructures::DesiredDriveState & desiredState, log::LogContext & lc) {
utils::Timer t;
m_tapeDrivesState->setDesiredDriveState(driveName, desiredState, lc);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("drive", driveName)
.add("up", desiredState.up ? "up" : "down")
.add("force", desiredState.forceDown ? "yes" : "no")
.add("reason",desiredState.reason ? desiredState.reason.value() : "")
.add("comment", desiredState.comment ? desiredState.comment.value() : "")
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "In Scheduler::setDesiredDriveState(): success.");
}
bool Scheduler::checkDriveCanBeCreated(const cta::common::dataStructures::DriveInfo & driveInfo, log::LogContext & lc) {
try{
m_tapeDrivesState->checkDriveCanBeCreated(driveInfo);
return true;
} catch (cta::TapeDrivesCatalogueState::DriveAlreadyExistsException &ex) {
log::ScopedParamContainer param(lc);
param.add("tapeDrive",driveInfo.driveName)
.add("logicalLibrary",driveInfo.logicalLibrary)
.add("host",driveInfo.host)
.add("exceptionMsg",ex.getMessageValue());
lc.log(log::CRIT,"In Scheduler::checkDriveCanBeCreated(): drive already exists. Reporting fatal error.");
return false;
}
}
//------------------------------------------------------------------------------
// removeDrive
//------------------------------------------------------------------------------
void Scheduler::removeDrive(const common::dataStructures::SecurityIdentity &cliIdentity,
const std::string &driveName, log::LogContext & lc) {
utils::Timer t;
m_tapeDrivesState->removeDrive(driveName, lc);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("drive", driveName)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "In Scheduler::removeDrive(): success.");
}
//------------------------------------------------------------------------------
// reportDriveConfig
//------------------------------------------------------------------------------
void Scheduler::reportDriveConfig(const cta::tape::daemon::TpconfigLine& tpConfigLine,const cta::tape::daemon::TapedConfiguration& tapedConfig,log::LogContext& lc) {
utils::Timer t;
DriveConfig::setTapedConfiguration(tapedConfig, &m_catalogue, tpConfigLine.unitName);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("drive", tpConfigLine.unitName)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO,"In Scheduler::reportDriveConfig(): success.");
}
//------------------------------------------------------------------------------
// reportDriveStatus
//------------------------------------------------------------------------------
void Scheduler::reportDriveStatus(const common::dataStructures::DriveInfo& driveInfo,
common::dataStructures::MountType type, common::dataStructures::DriveStatus status,
log::LogContext& lc) {
utils::Timer t;
m_tapeDrivesState->reportDriveStatus(driveInfo, type, status, time(nullptr), lc);
auto schedulerDbTime = t.secs();
if (schedulerDbTime > 1) {
log::ScopedParamContainer spc(lc);
spc.add("drive", driveInfo.driveName)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::DEBUG, "In Scheduler::reportDriveStatus(): success.");
}
}
void Scheduler::createTapeDriveStatus(const common::dataStructures::DriveInfo& driveInfo,
const common::dataStructures::DesiredDriveState & desiredState, const common::dataStructures::MountType& type,
const common::dataStructures::DriveStatus& status, const tape::daemon::TpconfigLine& tpConfigLine,
const common::dataStructures::SecurityIdentity& identity, log::LogContext & lc) {
m_tapeDrivesState->createTapeDriveStatus(driveInfo, desiredState, type, status, tpConfigLine, identity, lc);
log::ScopedParamContainer spc(lc);
spc.add("drive", driveInfo.driveName);
lc.log(log::DEBUG, "In Scheduler::createTapeDriveStatus(): success.");
}
//------------------------------------------------------------------------------
// getPendingArchiveJobs
//------------------------------------------------------------------------------
std::map<std::string, std::list<common::dataStructures::ArchiveJob> > Scheduler::getPendingArchiveJobs(log::LogContext & lc) const {
utils::Timer t;
auto ret = m_db.getArchiveJobs();
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "In Scheduler::getPendingArchiveJobs(): success.");
return ret;
}
//------------------------------------------------------------------------------
// getPendingArchiveJobs
//------------------------------------------------------------------------------
std::list<common::dataStructures::ArchiveJob> Scheduler::getPendingArchiveJobs(const std::string &tapePoolName, log::LogContext & lc) const {
utils::Timer t;
if(!m_catalogue.tapePoolExists(tapePoolName)) {
throw exception::UserError(std::string("Tape pool ") + tapePoolName + " does not exist");
}
auto catalogueTime = t.secs(utils::Timer::resetCounter);
auto ret = m_db.getArchiveJobs(tapePoolName);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "In Scheduler::getPendingArchiveJobs(tapePool): success.");
return ret;
}
//------------------------------------------------------------------------------
// getPendingRetrieveJobs
//------------------------------------------------------------------------------
std::map<std::string, std::list<common::dataStructures::RetrieveJob> > Scheduler::getPendingRetrieveJobs(log::LogContext & lc) const {
utils::Timer t;
auto ret = m_db.getRetrieveJobs();
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "In Scheduler::getPendingRetrieveJobs(): success.");
return ret;
}
//------------------------------------------------------------------------------
// getPendingRetrieveJobs
//------------------------------------------------------------------------------
std::list<common::dataStructures::RetrieveJob> Scheduler::getPendingRetrieveJobs(const std::string& vid, log::LogContext &lc) const {
utils::Timer t;
auto ret = m_db.getRetrieveJobs(vid);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "In Scheduler::getPendingRetrieveJobs(): success.");
return ret;
}
//------------------------------------------------------------------------------
// getDriveState
//------------------------------------------------------------------------------
std::optional<cta::common::dataStructures::TapeDrive> Scheduler::getDriveState(const std::string& tapeDriveName,
log::LogContext* lc) const {
utils::Timer t;
auto ret = m_catalogue.getTapeDrive(tapeDriveName);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(*lc);
spc.add("schedulerDbTime", schedulerDbTime);
lc->log(log::INFO, "In Scheduler::getDriveState(): success.");
return ret;
}
//------------------------------------------------------------------------------
// getDriveStates
//------------------------------------------------------------------------------
std::list<common::dataStructures::TapeDrive> Scheduler::getDriveStates(const common::dataStructures::SecurityIdentity &cliIdentity, log::LogContext & lc) const {
utils::Timer t;
auto ret = m_tapeDrivesState->getDriveStates(lc);
auto schedulerDbTime = t.secs();
log::ScopedParamContainer spc(lc);
spc.add("schedulerDbTime", schedulerDbTime);
lc.log(log::INFO, "In Scheduler::getDriveStates(): success.");
return ret;
}
//------------------------------------------------------------------------------
// sortAndGetTapesForMountInfo
//------------------------------------------------------------------------------
void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>& mountInfo,
const std::string& logicalLibraryName, const std::string& driveName, utils::Timer& timer,
ExistingMountSummaryPerTapepool& existingMountsDistinctTypeSummaryPerTapepool,
ExistingMountSummaryPerVo& existingMountsBasicTypeSummaryPerVo, std::set<std::string>& tapesInUse,
std::list<catalogue::TapeForWriting>& tapeList, double& getTapeInfoTime, double& candidateSortingTime,
double& getTapeForWriteTime, log::LogContext& lc) {
// The library information is not known for tapes involved in retrieves. Get the library information from the DB so
// we can filter the potential mounts to the ones that this tape server can serve.
catalogue::TapeSearchCriteria searchCriteria;
searchCriteria.logicalLibrary = logicalLibraryName;
auto eligibleTapesList = m_catalogue.getTapes(searchCriteria);
std::set<std::string> eligibleTapeSet;
for(auto& t : eligibleTapesList) {
eligibleTapeSet.insert(t.vid);
}
// Filter the potential mounts to keep only the ones that match the logical library for retrieves,
// and build the list of tapes that can potentially be mounted by this tape server
std::set<std::string> retrieveTapeSet;
for(auto m_it = mountInfo->potentialMounts.begin(); m_it != mountInfo->potentialMounts.end(); ) {
if(m_it->type == common::dataStructures::MountType::Retrieve) {
if(eligibleTapeSet.count(m_it->vid) == 0) {
m_it = mountInfo->potentialMounts.erase(m_it);
continue;
} else {
retrieveTapeSet.insert(m_it->vid);
}
}
m_it++;
}
common::dataStructures::VidToTapeMap retrieveTapesInfo;
if(!retrieveTapeSet.empty()) {
retrieveTapesInfo = m_catalogue.getTapesByVid(retrieveTapeSet);
getTapeInfoTime = timer.secs(utils::Timer::resetCounter);
for(auto& m : mountInfo->potentialMounts) {
if(m.type == common::dataStructures::MountType::Retrieve) {
m.logicalLibrary = retrieveTapesInfo[m.vid].logicalLibraryName;
m.tapePool = retrieveTapesInfo[m.vid].tapePoolName;
m.vendor = retrieveTapesInfo[m.vid].vendor;
m.mediaType = retrieveTapesInfo[m.vid].mediaType;
m.vo = retrieveTapesInfo[m.vid].vo;
m.capacityInBytes = retrieveTapesInfo[m.vid].capacityInBytes;
m.labelFormat = retrieveTapesInfo[m.vid].labelFormat;
}
}
}
//Get the tapepools of the potential and existing mounts
std::set<std::string> tapepoolsPotentialOrExistingMounts;
for (auto & pm: mountInfo->potentialMounts) {
tapepoolsPotentialOrExistingMounts.insert(pm.tapePool);
}
for (auto & em: mountInfo->existingOrNextMounts) {
tapepoolsPotentialOrExistingMounts.insert(em.tapePool);
}
//Get the potential and existing mounts tapepool virtual organization information
std::map<std::string,common::dataStructures::VirtualOrganization> tapepoolVoMap;
for (auto & tapepool: tapepoolsPotentialOrExistingMounts) {
try {
tapepoolVoMap[tapepool] = m_catalogue.getCachedVirtualOrganizationOfTapepool(tapepool);
} catch (cta::exception::Exception & ex){
//The VO of this tapepool does not exist, abort the scheduling as we need it to know the number of allocated drives
//the VO is allowed to use
ex.getMessage() << " Aborting scheduling." << std::endl;
throw ex;
}
}
// With the existing mount list, we can now populate the potential mount list
// with the per tape pool existing mount statistics.
for (auto & em: mountInfo->existingOrNextMounts) {
// If a mount is still listed for our own drive, it is a leftover that we disregard.
if (em.driveName!=driveName) {
existingMountsDistinctTypeSummaryPerTapepool[TapePoolMountPair(em.tapePool, em.type)].totalMounts++;
existingMountsBasicTypeSummaryPerVo[VirtualOrganizationMountPair(tapepoolVoMap.at(em.tapePool).name,common::dataStructures::getMountBasicType(em.type))].totalMounts++;
if (em.activity)
existingMountsDistinctTypeSummaryPerTapepool[TapePoolMountPair(em.tapePool, em.type)]
.activityMounts[em.activity.value()].value++;
if (em.vid.size()) {
tapesInUse.insert(em.vid);
log::ScopedParamContainer params(lc);
params.add("tapeVid", em.vid)
.add("mountType", common::dataStructures::toString(em.type))
.add("drive", em.driveName);
lc.log(log::DEBUG,"In Scheduler::sortAndGetTapesForMountInfo(): tapeAlreadyInUse found.");
}
}
}
// We can now filter out the potential mounts for which their mount criteria
// is not yet met, filter out the potential mounts for which the maximum mount
// quota is already reached, filter out the retrieve requests put to sleep for lack of disk space,
// and weight the remaining by how much of their quota is reached.
for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
// Get summary data
uint32_t existingMountsDistinctTypesForThisTapepool = 0;
uint32_t existingMountsBasicTypeForThisVo = 0;
common::dataStructures::MountType basicTypeOfThisPotentialMount = common::dataStructures::getMountBasicType(m->type);
common::dataStructures::VirtualOrganization voOfThisPotentialMount = tapepoolVoMap.at(m->tapePool);
bool sleepingMount = false;
try {
existingMountsDistinctTypesForThisTapepool = existingMountsDistinctTypeSummaryPerTapepool
.at(TapePoolMountPair(m->tapePool, m->type))
.totalMounts;
} catch (std::out_of_range &) {}
try {
existingMountsBasicTypeForThisVo = existingMountsBasicTypeSummaryPerVo
.at(VirtualOrganizationMountPair(voOfThisPotentialMount.name, basicTypeOfThisPotentialMount))
.totalMounts;
} catch (std::out_of_range &) {}
uint32_t effectiveExistingMountsForThisTapepool = 0;
//If we have an archive mount, we don't want
if (basicTypeOfThisPotentialMount == common::dataStructures::MountType::ArchiveAllTypes) effectiveExistingMountsForThisTapepool = existingMountsDistinctTypesForThisTapepool;
bool mountPassesACriteria = false;
uint64_t minBytesToWarrantAMount = m_minBytesToWarrantAMount;
uint64_t minFilesToWarrantAMount = m_minFilesToWarrantAMount;
if(m->type == common::dataStructures::MountType::ArchiveForRepack){
minBytesToWarrantAMount *= 2;
minFilesToWarrantAMount *= 2;
}
if (m->bytesQueued / (1 + effectiveExistingMountsForThisTapepool) >= minBytesToWarrantAMount)
mountPassesACriteria = true;
if (m->filesQueued / (1 + effectiveExistingMountsForThisTapepool) >= minFilesToWarrantAMount)
mountPassesACriteria = true;
if (!effectiveExistingMountsForThisTapepool && ((time(nullptr) - m->oldestJobStartTime) > m->minRequestAge))
mountPassesACriteria = true;
if (m->sleepingMount) {
sleepingMount = true;
}
uint64_t maxDrives = 0;
if(basicTypeOfThisPotentialMount == common::dataStructures::MountType::Retrieve) {
maxDrives = voOfThisPotentialMount.readMaxDrives;
} else if (basicTypeOfThisPotentialMount == common::dataStructures::MountType::ArchiveAllTypes) {
maxDrives = voOfThisPotentialMount.writeMaxDrives;
}
if (!mountPassesACriteria || existingMountsBasicTypeForThisVo >= maxDrives || sleepingMount) {
log::ScopedParamContainer params(lc);
params.add("tapePool", m->tapePool);
params.add("vo",voOfThisPotentialMount.name);
if ( m->type == common::dataStructures::MountType::Retrieve) {
params.add("tapeVid", m->vid);
}
params.add("mountType", common::dataStructures::toString(m->type))
.add("existingMountsDistinctTypesForThisTapepool", existingMountsDistinctTypesForThisTapepool)
.add("existingMountsBasicTypeForThisVo",existingMountsBasicTypeForThisVo)
.add("bytesQueued", m->bytesQueued)
.add("minBytesToWarrantMount", minBytesToWarrantAMount)
.add("filesQueued", m->filesQueued)
.add("minFilesToWarrantMount", minFilesToWarrantAMount)
.add("oldestJobAge", time(nullptr) - m->oldestJobStartTime)
.add("youngestJobAge", time(nullptr) - m->youngestJobStartTime)
.add("minArchiveRequestAge", m->minRequestAge)
.add("voReadMaxDrives",voOfThisPotentialMount.readMaxDrives)
.add("voWriteMaxDrives",voOfThisPotentialMount.writeMaxDrives)
.add("maxDrives", maxDrives);
if (sleepingMount) params.add("fullDiskSystem", m->diskSystemSleptFor);
lc.log(log::DEBUG, "In Scheduler::sortAndGetTapesForMountInfo(): Removing potential mount not passing criteria");
m = mountInfo->potentialMounts.erase(m);
} else {
// For the implementation of this ticket: https://gitlab.cern.ch/cta/CTA/-/issues/948
// The max drives allowed is not per-tapepool anymore and is not set by the mount policy neither
// Commenting this line so that we have a trace of what existed previously.
// m->ratioOfMountQuotaUsed = 1.0L * existingMountsPerTapepool / m->maxDrivesAllowed;
// Probably this fair-share activities should be per-VO instead of per-tapepool as it is now.
// If it has to be per-tapepool, then the readMaxDrives and writeMaxDrives should also be implemented in the tapepool
// populate the mount with a weight
//m->ratioOfMountQuotaUsed = 1.0L * existingMountsPerTapepool / m->maxDrivesAllowed;
m->ratioOfMountQuotaUsed = 0.0L;
log::ScopedParamContainer params(lc);
params.add("tapePool", m->tapePool);
params.add("vo",voOfThisPotentialMount.name);
if ( m->type == common::dataStructures::MountType::Retrieve) {
params.add("tapeVid", m->vid);
}
params.add("mountType", common::dataStructures::toString(m->type))
.add("existingMountsDistinctTypesForThisTapepool", existingMountsDistinctTypesForThisTapepool)
.add("existingMountsBasicTypeForThisVo",existingMountsBasicTypeForThisVo)
.add("bytesQueued", m->bytesQueued)
.add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
.add("filesQueued", m->filesQueued)
.add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
.add("oldestJobAge", time(nullptr) - m->oldestJobStartTime)
.add("youngestJobAge", time(nullptr) - m->youngestJobStartTime)
.add("minArchiveRequestAge", m->minRequestAge)
.add("maxDrives", maxDrives)
.add("voReadMaxDrives",voOfThisPotentialMount.readMaxDrives)
.add("voWriteMaxDrives",voOfThisPotentialMount.writeMaxDrives)
.add("ratioOfMountQuotaUsed", m->ratioOfMountQuotaUsed);
lc.log(log::DEBUG, "In Scheduler::sortAndGetTapesForMountInfo(): Will consider potential mount");
m++;
}
}
// We can now sort the potential mounts in decreasing priority order.
// The ordering is defined in operator <.
// We want the result in descending order or priority so we reverse the vector
std::sort(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
std::reverse(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
candidateSortingTime = timer.secs(utils::Timer::resetCounter);
// Find out if we have any potential archive mount in the list. If so, get the
// list of tapes from the catalogue.
if (std::count_if(
mountInfo->potentialMounts.cbegin(), mountInfo->potentialMounts.cend(),
// https://trac.cppcheck.net/ticket/10739
// cppcheck-suppress internalAstError
[](decltype(*mountInfo->potentialMounts.cbegin())& m){ return common::dataStructures::getMountBasicType(m.type) == common::dataStructures::MountType::ArchiveAllTypes; } )) {
tapeList = m_catalogue.getTapesForWriting(logicalLibraryName);
getTapeForWriteTime = timer.secs(utils::Timer::resetCounter);
}
// Remove from the tape list the ones already or soon to be mounted
auto t=tapeList.begin();
while (t!=tapeList.end()) {
if (tapesInUse.count(t->vid)) {
t=tapeList.erase(t);
} else {
t++;
}
}
}
//------------------------------------------------------------------------------
// getLogicalLibrary
//------------------------------------------------------------------------------
std::optional<common::dataStructures::LogicalLibrary> Scheduler::getLogicalLibrary(const std::string& libraryName, double& getLogicalLibraryTime){
utils::Timer timer;
auto logicalLibraries = m_catalogue.getLogicalLibraries();
std::optional<common::dataStructures::LogicalLibrary> ret;
auto logicalLibraryItor = std::find_if(logicalLibraries.begin(),logicalLibraries.end(),[libraryName](const cta::common::dataStructures::LogicalLibrary& ll){
return (ll.name == libraryName);
});
getLogicalLibraryTime += timer.secs(utils::Timer::resetCounter);
if(logicalLibraryItor != logicalLibraries.end()){
ret = *logicalLibraryItor;
}
return ret;
}
void Scheduler::deleteRepackBuffer(std::unique_ptr<cta::disk::Directory> repackBuffer, cta::log::LogContext & lc) {
try{
if(repackBuffer != nullptr && repackBuffer->exist()){
repackBuffer->rmdir();
}
} catch (const cta::exception::XrootCl & ex) {
log::ScopedParamContainer spc(lc);
spc.add("exceptionMsg",ex.getMessageValue());
lc.log(log::ERR,"In Scheduler::deleteRepackBuffer() unable to delete the directory located in " + repackBuffer->getURL());
}
}
uint64_t Scheduler::getNbFilesAlreadyArchived(const common::dataStructures::ArchiveFile& archiveFile) {
return archiveFile.tapeFiles.size();
}
//------------------------------------------------------------------------------
// checkNeededEnvironmentVariables
//------------------------------------------------------------------------------
void Scheduler::checkNeededEnvironmentVariables(){
std::set<std::string> environmentVariablesNotSet;
for(auto & environmentVariable: c_mandatoryEnvironmentVariables){
std::string envVar = cta::utils::getEnv(environmentVariable);
if(envVar.empty()){
environmentVariablesNotSet.insert(environmentVariable);
}
}
if(!environmentVariablesNotSet.empty()){
std::string listVariablesNotSet = "";
bool isFirst = true;
for(auto & environmentVariableNotSet: environmentVariablesNotSet){
if(isFirst){
listVariablesNotSet += "[" + environmentVariableNotSet;
isFirst = false;
} else {
listVariablesNotSet += ", " + environmentVariableNotSet;
}
};
listVariablesNotSet += "]";
std::string errMsg = "In Scheduler::checkNeededEnvironmentVariables(), the following environment variables: "+listVariablesNotSet+" are not set.";
throw cta::exception::Exception(errMsg);
}
}
//------------------------------------------------------------------------------
// getNextMountDryRun
//------------------------------------------------------------------------------
bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const std::string& driveName, log::LogContext& lc) {
// We run the same algorithm as the actual getNextMount without the global lock
// For this reason, we just return true as soon as valid mount has been found.
utils::Timer timer;
double getMountInfoTime = 0;
double getTapeInfoTime = 0;
double candidateSortingTime = 0;
double getTapeForWriteTime = 0;
double decisionTime = 0;
double schedulerDbTime = 0;
double catalogueTime = 0;
double getLogicalLibrariesTime = 0;
auto logicalLibrary = getLogicalLibrary(logicalLibraryName,getLogicalLibrariesTime);
if(logicalLibrary){
if(logicalLibrary.value().isDisabled){
log::ScopedParamContainer params(lc);
params.add("logicalLibrary",logicalLibraryName)
.add("catalogueTime",getLogicalLibrariesTime);
lc.log(log::INFO,"In Scheduler::getNextMountDryRun(): logicalLibrary is disabled");
return false;
}
} else {
log::ScopedParamContainer params(lc);
params.add("logicalLibrary",logicalLibraryName)
.add("catalogueTime",getLogicalLibrariesTime);
lc.log(log::INFO,"In Scheduler::getNextMountDryRun(): logicalLibrary does not exist");
return false;
}
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo;
mountInfo = m_db.getMountInfoNoLock(SchedulerDatabase::PurposeGetMountInfo::GET_NEXT_MOUNT,lc);
getMountInfoTime = timer.secs(utils::Timer::resetCounter);
ExistingMountSummaryPerTapepool existingMountsDistinctTypeSummaryPerTapepool;
ExistingMountSummaryPerVo existingMountBasicTypeSummaryPerVo;
std::set<std::string> tapesInUse;
std::list<catalogue::TapeForWriting> tapeList;
sortAndGetTapesForMountInfo(mountInfo, logicalLibraryName, driveName, timer,
existingMountsDistinctTypeSummaryPerTapepool, existingMountBasicTypeSummaryPerVo, tapesInUse, tapeList,
getTapeInfoTime, candidateSortingTime, getTapeForWriteTime, lc);
// We can now simply iterate on the candidates until we manage to find a valid mount
for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
// If the mount is an archive, we still have to find a tape.
if (common::dataStructures::getMountBasicType(m->type)==common::dataStructures::MountType::ArchiveAllTypes) {
// We need to find a tape for archiving. It should be both in the right
// tape pool and in the drive's logical library
// The first tape matching will go for a prototype.
// TODO: improve to reuse already partially written tapes and randomization
for (auto & t: tapeList) {
if (t.tapePool == m->tapePool) {
// We have our tape. That's enough.
decisionTime += timer.secs(utils::Timer::resetCounter);
schedulerDbTime = getMountInfoTime;
catalogueTime = getTapeInfoTime + getTapeForWriteTime;
uint32_t existingMountsDistinctTypeForThisTapepool = 0;
uint32_t existingMountsBasicTypeForThisVo = 0;
try {
existingMountsDistinctTypeForThisTapepool=existingMountsDistinctTypeSummaryPerTapepool.at(TapePoolMountPair(m->tapePool, m->type)).totalMounts;
} catch (...) {}
try {
existingMountsBasicTypeForThisVo=existingMountBasicTypeSummaryPerVo.at(VirtualOrganizationMountPair(m->vo,common::dataStructures::getMountBasicType(m->type))).totalMounts;
} catch(...) {}
log::ScopedParamContainer params(lc);
params.add("tapePool", m->tapePool)
.add("tapeVid", t.vid)
.add("mountType", common::dataStructures::toString(m->type))
.add("existingMountsDistinctTypeForThisTapepool", existingMountsDistinctTypeForThisTapepool)
.add("existingMountsBasicTypeForThisVo", existingMountsBasicTypeForThisVo)
.add("bytesQueued", m->bytesQueued)
.add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
.add("filesQueued", m->filesQueued)
.add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
.add("oldestJobAge", time(nullptr) - m->oldestJobStartTime)
.add("youngestJobAge", time(nullptr) - m->youngestJobStartTime)
.add("minArchiveRequestAge", m->minRequestAge)
.add("getMountInfoTime", getMountInfoTime)
.add("getTapeInfoTime", getTapeInfoTime)
.add("candidateSortingTime", candidateSortingTime)
.add("getTapeForWriteTime", getTapeForWriteTime)
.add("decisionTime", decisionTime)
.add("schedulerDbTime", schedulerDbTime)
.add("catalogueTime", catalogueTime);
lc.log(log::INFO, "In Scheduler::getNextMountDryRun(): Found a potential mount (archive)");
return true;
}
}
} else if (m->type==common::dataStructures::MountType::Retrieve) {
// We know the tape we intend to mount. We have to validate the tape is
// actually available to read (not mounted or about to be mounted, and pass
// on it if so).
if (tapesInUse.count(m->vid)) continue;
decisionTime += timer.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
uint32_t existingMountsDistinctTypeForThisTapepool = 0;
uint32_t existingMountsBasicTypeForThisVo = 0;
try {
existingMountsDistinctTypeForThisTapepool=existingMountsDistinctTypeSummaryPerTapepool.at(TapePoolMountPair(m->tapePool, m->type)).totalMounts;
} catch (...) {}
try {
existingMountsBasicTypeForThisVo=existingMountBasicTypeSummaryPerVo.at(VirtualOrganizationMountPair(m->vo,common::dataStructures::getMountBasicType(m->type))).totalMounts;
} catch(...) {}
schedulerDbTime = getMountInfoTime;
catalogueTime = getTapeInfoTime + getTapeForWriteTime;
params.add("tapePool", m->tapePool)
.add("tapeVid", m->vid)
.add("mountType", common::dataStructures::toString(m->type))
.add("existingMountsDistinctTypeForThisTapepool", existingMountsDistinctTypeForThisTapepool)
.add("existingMountsBasicTypeForThisVo", existingMountsBasicTypeForThisVo);
if (m->activity) {
params.add("activity", m->activity.value());
}
params.add("bytesQueued", m->bytesQueued)
.add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
.add("filesQueued", m->filesQueued)
.add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
.add("oldestJobAge", time(nullptr) - m->oldestJobStartTime)
.add("youngestJobAge", time(nullptr) - m->youngestJobStartTime)
.add("minArchiveRequestAge", m->minRequestAge)
.add("getMountInfoTime", getMountInfoTime)
.add("getTapeInfoTime", getTapeInfoTime)
.add("candidateSortingTime", candidateSortingTime)
.add("getTapeForWriteTime", getTapeForWriteTime)
.add("decisionTime", decisionTime)
.add("schedulerDbTime", schedulerDbTime)
.add("catalogueTime", catalogueTime);
lc.log(log::INFO, "In Scheduler::getNextMountDryRun(): Found a potential mount (retrieve)");
return true;
}
}
schedulerDbTime = getMountInfoTime;
catalogueTime = getTapeInfoTime + getTapeForWriteTime + getLogicalLibrariesTime;
decisionTime += timer.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
params.add("getMountInfoTime", getMountInfoTime)
.add("getTapeInfoTime", getTapeInfoTime)
.add("candidateSortingTime", candidateSortingTime)
.add("getTapeForWriteTime", getTapeForWriteTime)
.add("decisionTime", decisionTime)
.add("schedulerDbTime", schedulerDbTime)
.add("catalogueTime", catalogueTime);
if ((getMountInfoTime > 1) || (getTapeInfoTime > 1) || (candidateSortingTime > 1) || (getTapeForWriteTime > 1) ||
(decisionTime > 1) || (schedulerDbTime > 1) || (catalogueTime > 1))
lc.log(log::DEBUG, "In Scheduler::getNextMountDryRun(): No valid mount found.");
return false;
}
//------------------------------------------------------------------------------
// getNextMount
//------------------------------------------------------------------------------
std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLibraryName, const std::string &driveName, log::LogContext & lc) {
// In order to decide the next mount to do, we have to take a global lock on
// the scheduling, retrieve a list of all running mounts, queues sizes for
// tapes and tape pools, order the candidates by priority
// below threshold, and pick one at a time, we then attempt to get a tape
// from the catalogue (for the archive mounts), and walk the list until we
// mount or find nothing to do.
// We then skip to the next candidate, until we find a suitable one and
// return the mount, or exhaust all of them an
// Many steps for this logic are not specific for the database and are hence
// implemented in the scheduler itself.
// First, get the mount-related info from the DB
utils::Timer timer;
double getMountInfoTime = 0;
double queueTrimingTime = 0;
double getTapeInfoTime = 0;
double candidateSortingTime = 0;
double getTapeForWriteTime = 0;
double decisionTime = 0;
double mountCreationTime = 0;
double driveStatusSetTime = 0;
double schedulerDbTime = 0;
double getLogicalLibrariesTime = 0;
double catalogueTime = 0;
auto logicalLibrary = getLogicalLibrary(logicalLibraryName,getLogicalLibrariesTime);
if(logicalLibrary){
if(logicalLibrary.value().isDisabled){
log::ScopedParamContainer params(lc);
params.add("logicalLibrary",logicalLibraryName)
.add("catalogueTime",getLogicalLibrariesTime);
lc.log(log::INFO,"In Scheduler::getNextMount(): logicalLibrary is disabled");
return std::unique_ptr<TapeMount>();
}
} else {
log::ScopedParamContainer params(lc);
params.add("logicalLibrary",logicalLibraryName)
.add("catalogueTime",getLogicalLibrariesTime);
lc.log(log::CRIT,"In Scheduler::getNextMount(): logicalLibrary does not exist");
return std::unique_ptr<TapeMount>();
}
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo;
mountInfo = m_db.getMountInfo(lc);
getMountInfoTime = timer.secs(utils::Timer::resetCounter);
if (mountInfo->queueTrimRequired) {
m_db.trimEmptyQueues(lc);
queueTrimingTime = timer.secs(utils::Timer::resetCounter);
}
__attribute__((unused)) SchedulerDatabase::TapeMountDecisionInfo & debugMountInfo = *mountInfo;
ExistingMountSummaryPerTapepool existingMountsDistinctTypeSummaryPerTapepool;
ExistingMountSummaryPerVo existingMountBasicTypeSummaryPerVo;
std::set<std::string> tapesInUse;
std::list<catalogue::TapeForWriting> tapeList;
sortAndGetTapesForMountInfo(mountInfo, logicalLibraryName, driveName, timer,
existingMountsDistinctTypeSummaryPerTapepool, existingMountBasicTypeSummaryPerVo, tapesInUse, tapeList,
getTapeInfoTime, candidateSortingTime, getTapeForWriteTime, lc);
// We can now simply iterate on the candidates until we manage to create a
// mount for one of them
for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
// If the mount is an archive, we still have to find a tape.
if (common::dataStructures::getMountBasicType(m->type)==common::dataStructures::MountType::ArchiveAllTypes) {
// We need to find a tape for archiving. It should be both in the right
// tape pool and in the drive's logical library
// The first tape matching will go for a prototype.
// TODO: improve to reuse already partially written tapes and randomization
for (auto & t: tapeList) {
if (t.tapePool == m->tapePool) {
// We have our tape. Try to create the session. Prepare a return value
// for it.
std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount(m_catalogue));
// Get the db side of the session
try {
decisionTime += timer.secs(utils::Timer::resetCounter);
internalRet->m_dbMount.reset(mountInfo->createArchiveMount(m->type, t,
driveName,
logicalLibraryName,
utils::getShortHostname(),
t.vo,
t.mediaType,
t.vendor,
t.capacityInBytes,
std::nullopt,
t.labelFormat).release());
mountCreationTime += timer.secs(utils::Timer::resetCounter);
internalRet->m_sessionRunning = true;
driveStatusSetTime += timer.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
uint32_t existingMountsDistinctTypeForThisTapepool = 0;
uint32_t existingMountsBasicTypeForThisVo = 0;
try {
existingMountsDistinctTypeForThisTapepool=existingMountsDistinctTypeSummaryPerTapepool.at(TapePoolMountPair(m->tapePool, m->type)).totalMounts;
} catch (...) {}
try {
existingMountsBasicTypeForThisVo=existingMountBasicTypeSummaryPerVo.at(VirtualOrganizationMountPair(m->vo,common::dataStructures::getMountBasicType(m->type))).totalMounts;
} catch(...) {}
schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime;
catalogueTime = getTapeInfoTime + getTapeForWriteTime;
params.add("tapePool", m->tapePool)
.add("tapeVid", t.vid)
.add("vo",t.vo)
.add("mediaType",t.mediaType)
.add("vendor",t.vendor)
.add("mountType", common::dataStructures::toString(m->type))
.add("existingMountsDistinctTypeForThisTapepool", existingMountsDistinctTypeForThisTapepool)
.add("existingMountsBasicTypeForThisVo",existingMountsBasicTypeForThisVo)
.add("bytesQueued", m->bytesQueued)
.add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
.add("filesQueued", m->filesQueued)
.add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
.add("oldestJobAge", time(nullptr) - m->oldestJobStartTime)
.add("youngestJobAge", time(nullptr) - m->youngestJobStartTime)
.add("minArchiveRequestAge", m->minRequestAge)
.add("getMountInfoTime", getMountInfoTime)
.add("queueTrimingTime", queueTrimingTime)
.add("getTapeInfoTime", getTapeInfoTime)
.add("candidateSortingTime", candidateSortingTime)
.add("getTapeForWriteTime", getTapeForWriteTime)
.add("decisionTime", decisionTime)
.add("mountCreationTime", mountCreationTime)
.add("driveStatusSetTime", driveStatusSetTime)
.add("schedulerDbTime", schedulerDbTime)
.add("catalogueTime", catalogueTime);
lc.log(log::INFO, "In Scheduler::getNextMount(): Selected next mount (archive)");
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
log::ScopedParamContainer params(lc);
params.add("Message", ex.getMessage().str());
lc.log(log::WARNING, "In Scheduler::getNextMount(): got an exception trying to schedule an archive mount. Trying others.");
continue;
}
}
}
} else if (m->type==common::dataStructures::MountType::Retrieve) {
// We know the tape we intend to mount. We have to validate the tape is
// actually available to read (not mounted or about to be mounted, and pass
// on it if so).
if (tapesInUse.count(m->vid)) continue;
try {
// create the mount, and populate its DB side.
decisionTime += timer.secs(utils::Timer::resetCounter);
std::unique_ptr<RetrieveMount> internalRet(new RetrieveMount(m_catalogue));
internalRet->m_dbMount.reset(mountInfo->createRetrieveMount(m->vid,
m->tapePool,
driveName,
logicalLibraryName,
utils::getShortHostname(),
m->vo,
m->mediaType,
m->vendor,
m->capacityInBytes,
m->activity,
m->labelFormat).release());
mountCreationTime += timer.secs(utils::Timer::resetCounter);
internalRet->m_sessionRunning = true;
internalRet->m_diskRunning = true;
internalRet->m_tapeRunning = true;
driveStatusSetTime += timer.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
uint32_t existingMountsDistinctTypeForThisTapepool = 0;
uint32_t existingMountsBasicTypeForThisVo = 0;
try {
existingMountsDistinctTypeForThisTapepool=existingMountsDistinctTypeSummaryPerTapepool.at(TapePoolMountPair(m->tapePool, m->type)).totalMounts;
} catch (...) {}
try {
existingMountsBasicTypeForThisVo=existingMountBasicTypeSummaryPerVo.at(VirtualOrganizationMountPair(m->vo,common::dataStructures::getMountBasicType(m->type))).totalMounts;
} catch(...) {}
schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime;
catalogueTime = getTapeInfoTime + getTapeForWriteTime;
std::ostringstream ossLabelFormat;
ossLabelFormat << std::showbase << std::internal << std::setfill('0') << std::hex << std::setw(4) << static_cast<unsigned int>(m->labelFormat);
params.add("tapePool", m->tapePool)
.add("tapeVid", m->vid)
.add("vo",m->vo)
.add("mediaType",m->mediaType)
.add("labelFormat",ossLabelFormat.str())
.add("vendor",m->vendor)
.add("mountType", common::dataStructures::toString(m->type))
.add("existingMountsDistinctTypeForThisTapepool", existingMountsDistinctTypeForThisTapepool)
.add("existingMountsBasicTypeForThisVo",existingMountsBasicTypeForThisVo);
if (m->activity) {
params.add("activity", m->activity.value());
}
params.add("bytesQueued", m->bytesQueued)
.add("bytesQueued", m->bytesQueued)
.add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
.add("filesQueued", m->filesQueued)
.add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
.add("oldestJobAge", time(nullptr) - m->oldestJobStartTime)
.add("youngestJobAge", time(nullptr) - m->youngestJobStartTime)
.add("minArchiveRequestAge", m->minRequestAge)
.add("getMountInfoTime", getMountInfoTime)
.add("queueTrimingTime", queueTrimingTime)
.add("getTapeInfoTime", getTapeInfoTime)
.add("candidateSortingTime", candidateSortingTime)
.add("getTapeForWriteTime", getTapeForWriteTime)
.add("decisionTime", decisionTime)
.add("mountCreationTime", mountCreationTime)
.add("driveStatusSetTime", driveStatusSetTime)
.add("schedulerDbTime", schedulerDbTime)
.add("catalogueTime", catalogueTime);
lc.log(log::INFO, "In Scheduler::getNextMount(): Selected next mount (retrieve)");
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (exception::Exception & ex) {
log::ScopedParamContainer params(lc);
params.add("Message", ex.getMessage().str());
lc.log(log::WARNING, "In Scheduler::getNextMount(): got an exception trying to schedule a retrieve mount. Trying others.");
continue;
}
} else {
throw std::runtime_error("In Scheduler::getNextMount unexpected mount type");
}
}
schedulerDbTime = getMountInfoTime + queueTrimingTime + mountCreationTime + driveStatusSetTime;
catalogueTime = getTapeInfoTime + getTapeForWriteTime + getLogicalLibrariesTime;
decisionTime += timer.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params(lc);
params.add("getMountInfoTime", getMountInfoTime)
.add("queueTrimingTime", queueTrimingTime)
.add("getTapeInfoTime", getTapeInfoTime)
.add("candidateSortingTime", candidateSortingTime)
.add("getTapeForWriteTime", getTapeForWriteTime)
.add("decisionTime", decisionTime)
.add("mountCreationTime", mountCreationTime)
.add("driveStatusSetTime", driveStatusSetTime)
.add("schedulerDbTime", schedulerDbTime)
.add("catalogueTime", catalogueTime);
lc.log(log::DEBUG, "In Scheduler::getNextMount(): No valid mount found.");
return std::unique_ptr<TapeMount>();
}
//------------------------------------------------------------------------------
// getSchedulingInformations
//------------------------------------------------------------------------------
std::list<SchedulingInfos> Scheduler::getSchedulingInformations(log::LogContext& lc) {
std::list<SchedulingInfos> ret;
utils::Timer timer;
double getTapeInfoTime = 0;
double candidateSortingTime = 0;
double getTapeForWriteTime = 0;
ExistingMountSummaryPerTapepool existingMountsDistinctTypeSummaryPerTapepool;
ExistingMountSummaryPerVo existingMountBasicTypeSummaryPerVo;
std::set<std::string> tapesInUse;
std::list<catalogue::TapeForWriting> tapeList;
//get all drive informations and sort them by logical library name
cta::common::dataStructures::SecurityIdentity admin;
auto drives = getDriveStates(admin,lc);
std::map<std::string,std::list<std::string>> logicalLibraryDriveNamesMap;
for(auto & drive : drives){
logicalLibraryDriveNamesMap[drive.logicalLibrary].push_back(drive.driveName);
}
for(auto & kv: logicalLibraryDriveNamesMap){
//get mount informations
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo;
mountInfo = m_db.getMountInfoNoLock(cta::SchedulerDatabase::PurposeGetMountInfo::GET_NEXT_MOUNT,lc);
std::string logicalLibrary = kv.first;
cta::SchedulingInfos schedulingInfos(logicalLibrary);
for(auto & driveName: kv.second){
sortAndGetTapesForMountInfo(mountInfo, logicalLibrary, driveName, timer,
existingMountsDistinctTypeSummaryPerTapepool, existingMountBasicTypeSummaryPerVo, tapesInUse, tapeList,
getTapeInfoTime, candidateSortingTime, getTapeForWriteTime, lc);
//schedulingInfos.addDrivePotentialMount
std::vector<cta::SchedulerDatabase::PotentialMount> potentialMounts = mountInfo->potentialMounts;
for(auto & potentialMount: potentialMounts){
schedulingInfos.addPotentialMount(potentialMount);
}
}
if(!schedulingInfos.getPotentialMounts().empty()){
ret.push_back(schedulingInfos);
}
}
return ret;
}
//------------------------------------------------------------------------------
// getQueuesAndMountSummaries
//------------------------------------------------------------------------------
std::list<common::dataStructures::QueueAndMountSummary> Scheduler::getQueuesAndMountSummaries(log::LogContext& lc) {
std::list<common::dataStructures::QueueAndMountSummary> ret;
// Extract relevant information from the object store.
utils::Timer schedulerDbTimer;
auto mountDecisionInfo=m_db.getMountInfoNoLock(SchedulerDatabase::PurposeGetMountInfo::SHOW_QUEUES,lc);
const auto schedulerDbTime = schedulerDbTimer.secs();
auto & mdi __attribute__((unused)) = *mountDecisionInfo;
std::set<std::string> tapesWithAQueue;
for (const auto & pm: mountDecisionInfo->potentialMounts) {
if(!pm.vid.empty()) {
tapesWithAQueue.emplace(pm.vid);
}
}
for (const auto & em: mountDecisionInfo->existingOrNextMounts) {
if (!em.vid.empty()) {
tapesWithAQueue.emplace(em.vid);
}
}
// Obtain a map of vids to tape info from the catalogue
utils::Timer catalogueVidToLogicalLibraryTimer;
const auto vid_to_logical_library = m_catalogue.getVidToLogicalLibrary(tapesWithAQueue);
const auto catalogueVidToLogicalLibraryTime = catalogueVidToLogicalLibraryTimer.secs();
for (auto & pm: mountDecisionInfo->potentialMounts) {
// Find or create the relevant entry.
auto &summary = common::dataStructures::QueueAndMountSummary::getOrCreateEntry(ret, pm.type, pm.tapePool, pm.vid, vid_to_logical_library);
switch (pm.type) {
case common::dataStructures::MountType::ArchiveForUser:
case common::dataStructures::MountType::ArchiveForRepack:
summary.mountPolicy.archivePriority = pm.priority;
summary.mountPolicy.archiveMinRequestAge = pm.minRequestAge;
summary.bytesQueued = pm.bytesQueued;
summary.filesQueued = pm.filesQueued;
summary.oldestJobAge = time(nullptr) - pm.oldestJobStartTime ;
summary.youngestJobAge = time(nullptr) - pm.youngestJobStartTime;
if (pm.mountPolicyNames) {
summary.mountPolicies = pm.mountPolicyNames.value();
}
if (pm.highestPriorityMountPolicyName) {
summary.highestPriorityMountPolicy = pm.highestPriorityMountPolicyName.value();
}
if (pm.lowestRequestAgeMountPolicyName) {
summary.lowestRequestAgeMountPolicy = pm.lowestRequestAgeMountPolicyName.value();
}
break;
case common::dataStructures::MountType::Retrieve:
// TODO: we should remove the retrieveMinRequestAge if it's redundant, or rename pm.minArchiveRequestAge.
summary.mountPolicy.retrieveMinRequestAge = pm.minRequestAge;
summary.mountPolicy.retrievePriority = pm.priority;
summary.bytesQueued = pm.bytesQueued;
summary.filesQueued = pm.filesQueued;
summary.oldestJobAge = time(nullptr) - pm.oldestJobStartTime ;
summary.youngestJobAge = time(nullptr) - pm.youngestJobStartTime;
if (pm.mountPolicyNames) {
summary.mountPolicies = pm.mountPolicyNames.value();
}
if (pm.highestPriorityMountPolicyName) {
summary.highestPriorityMountPolicy = pm.highestPriorityMountPolicyName.value();
}
if (pm.lowestRequestAgeMountPolicyName) {
summary.lowestRequestAgeMountPolicy = pm.lowestRequestAgeMountPolicyName.value();
}
if (pm.sleepingMount) {
common::dataStructures::QueueAndMountSummary::SleepForSpaceInfo sfsi;
sfsi.startTime = pm.sleepStartTime;
sfsi.diskSystemName = pm.diskSystemSleptFor;
sfsi.sleepTime = pm.sleepTime;
summary.sleepForSpaceInfo = sfsi;
}
break;
default:
break;
}
}
for (auto & em: mountDecisionInfo->existingOrNextMounts) {
auto &summary = common::dataStructures::QueueAndMountSummary::getOrCreateEntry(ret, em.type, em.tapePool, em.vid, vid_to_logical_library);
switch (em.type) {
case common::dataStructures::MountType::ArchiveForUser:
case common::dataStructures::MountType::ArchiveForRepack:
case common::dataStructures::MountType::Retrieve:
if (em.currentMount)
summary.currentMounts++;
/*else
summary.nextMounts++;*/
summary.currentBytes += em.bytesTransferred;
summary.currentFiles += em.filesTransferred;
summary.averageBandwidth = em.averageBandwidth;
break;
default:
break;
}
}
mountDecisionInfo.reset();
double catalogueGetTapePoolTotalTime = 0.0;
double catalogueGetTapesTotalTime = 0.0;
double catalogueGetVoTotalTime = 0.0;
// Add the tape and VO information where useful (archive queues).
for (auto & mountOrQueue: ret) {
if (common::dataStructures::MountType::ArchiveForUser==mountOrQueue.mountType || common::dataStructures::MountType::ArchiveForRepack==mountOrQueue.mountType) {
utils::Timer catalogueGetTapePoolTimer;
const auto tapePool = m_catalogue.getTapePool(mountOrQueue.tapePool);
catalogueGetTapePoolTotalTime += catalogueGetTapePoolTimer.secs();
if (tapePool) {
utils::Timer catalogueGetVoTimer;
const auto vo = m_catalogue.getCachedVirtualOrganizationOfTapepool(tapePool->name);
catalogueGetVoTotalTime += catalogueGetVoTimer.secs();
mountOrQueue.vo = vo.name;
mountOrQueue.readMaxDrives = vo.readMaxDrives;
mountOrQueue.writeMaxDrives = vo.writeMaxDrives;
mountOrQueue.tapesCapacity = tapePool->capacityBytes;
mountOrQueue.filesOnTapes = tapePool->nbPhysicalFiles;
mountOrQueue.dataOnTapes = tapePool->dataBytes;
mountOrQueue.fullTapes = tapePool->nbFullTapes;
mountOrQueue.writableTapes = tapePool->nbWritableTapes;
}
} else if (common::dataStructures::MountType::Retrieve==mountOrQueue.mountType) {
// Get info for this tape.
cta::catalogue::TapeSearchCriteria tsc;
tsc.vid = mountOrQueue.vid;
utils::Timer catalogueGetTapesTimer;
auto tapes=m_catalogue.getTapes(tsc);
catalogueGetTapesTotalTime += catalogueGetTapesTimer.secs();
if (tapes.size() != 1) {
throw cta::exception::Exception("In Scheduler::getQueuesAndMountSummaries(): got unexpected number of tapes from catalogue for a retrieve.");
}
auto &t=tapes.front();
utils::Timer catalogueGetVoTimer;
const auto vo = m_catalogue.getCachedVirtualOrganizationOfTapepool(t.tapePoolName);
catalogueGetVoTotalTime += catalogueGetVoTimer.secs();
mountOrQueue.vo = vo.name;
mountOrQueue.readMaxDrives = vo.readMaxDrives;
mountOrQueue.writeMaxDrives = vo.writeMaxDrives;
mountOrQueue.tapesCapacity += t.capacityInBytes;
mountOrQueue.filesOnTapes += t.lastFSeq;
mountOrQueue.dataOnTapes += t.dataOnTapeInBytes;
if (t.full) mountOrQueue.fullTapes++;
if (!t.full && !t.isDisabled()) mountOrQueue.writableTapes++;
mountOrQueue.tapePool = t.tapePoolName;
}
}
log::ScopedParamContainer spc(lc);
spc.add("catalogueVidToLogicalLibraryTime", catalogueVidToLogicalLibraryTime)
.add("schedulerDbTime", schedulerDbTime)
.add("catalogueGetTapePoolTotalTime", catalogueGetTapePoolTotalTime)
.add("catalogueGetVoTotalTime",catalogueGetVoTotalTime)
.add("catalogueGetTapesTotalTime", catalogueGetTapesTotalTime);
lc.log(log::INFO, "In Scheduler::getQueuesAndMountSummaries(): success.");
return ret;
}
//------------------------------------------------------------------------------
// getNextArchiveJobsToReportBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<ArchiveJob> > Scheduler::getNextArchiveJobsToReportBatch(
uint64_t filesRequested, log::LogContext& logContext) {
// We need to go through the queues of archive jobs to report
std::list<std::unique_ptr<ArchiveJob> > ret;
// Get the list of jobs to report from the scheduler db
auto dbRet = m_db.getNextArchiveJobsToReportBatch(filesRequested, logContext);
for (auto & j: dbRet) {
ret.emplace_back(new ArchiveJob(nullptr, m_catalogue, j->archiveFile,
j->srcURL, j->tapeFile));
ret.back()->m_dbJob.reset(j.release());
}
return ret;
}
//------------------------------------------------------------------------------
// getArchiveJobsFailedSummary
//------------------------------------------------------------------------------
SchedulerDatabase::JobsFailedSummary Scheduler::getArchiveJobsFailedSummary(log::LogContext &logContext) {
return m_db.getArchiveJobsFailedSummary(logContext);
}
//------------------------------------------------------------------------------
// getNextRetrieveJobsToReportBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<RetrieveJob>> Scheduler::
getNextRetrieveJobsToReportBatch(uint64_t filesRequested, log::LogContext &logContext)
{
// We need to go through the queues of retrieve jobs to report
std::list<std::unique_ptr<RetrieveJob>> ret;
// Get the list of jobs to report from the scheduler db
auto dbRet = m_db.getNextRetrieveJobsToReportBatch(filesRequested, logContext);
for (auto &j : dbRet) {
ret.emplace_back(new RetrieveJob(nullptr, j->retrieveRequest, j->archiveFile, j->selectedCopyNb, PositioningMethod::ByFSeq));
ret.back()->m_dbJob.reset(j.release());
}
return ret;
}
//------------------------------------------------------------------------------
// getNextRetrieveJobsFailedBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<RetrieveJob>> Scheduler::
getNextRetrieveJobsFailedBatch(uint64_t filesRequested, log::LogContext &logContext)
{
// We need to go through the queues of failed retrieve jobs
std::list<std::unique_ptr<RetrieveJob>> ret;
// Get the list of failed jobs from the scheduler db
auto dbRet = m_db.getNextRetrieveJobsFailedBatch(filesRequested, logContext);
for (auto &j : dbRet) {
ret.emplace_back(new RetrieveJob(nullptr, j->retrieveRequest, j->archiveFile, j->selectedCopyNb, PositioningMethod::ByFSeq));
ret.back()->m_dbJob.reset(j.release());
}
return ret;
}
//------------------------------------------------------------------------------
// getRetrieveJobsFailedSummary
//------------------------------------------------------------------------------
SchedulerDatabase::JobsFailedSummary Scheduler::getRetrieveJobsFailedSummary(log::LogContext &logContext) {
return m_db.getRetrieveJobsFailedSummary(logContext);
}
//------------------------------------------------------------------------------
// reportArchiveJobsBatch
//------------------------------------------------------------------------------
void Scheduler::reportArchiveJobsBatch(std::list<std::unique_ptr<ArchiveJob> >& archiveJobsBatch,
disk::DiskReporterFactory & reporterFactory, log::TimingList& timingList, utils::Timer& t,
log::LogContext& lc){
// Create the reporters
struct JobAndReporter {
std::unique_ptr<disk::DiskReporter> reporter;
ArchiveJob * archiveJob;
};
std::list<JobAndReporter> pendingReports;
std::list<ArchiveJob *> reportedJobs;
for (auto &j: archiveJobsBatch) {
pendingReports.push_back(JobAndReporter());
auto & current = pendingReports.back();
// We could fail to create the disk reporter or to get the report URL. This should not impact the other jobs.
try {
current.reporter.reset(reporterFactory.createDiskReporter(j->exceptionThrowingReportURL()));
current.reporter->asyncReport();
current.archiveJob = j.get();
} catch (cta::exception::Exception & ex) {
// Whether creation or launching of reporter failed, the promise will not receive result, so we can safely delete it.
// we will first determine if we need to clean up the reporter as well or not.
pendingReports.pop_back();
// We are ready to carry on for other files without interactions.
// Log the error, update the request.
log::ScopedParamContainer params(lc);
params.add("fileId", j->archiveFile.archiveFileID)
.add("reportType", j->reportType())
.add("exceptionMSG", ex.getMessageValue());
lc.log(log::ERR, "In Scheduler::reportArchiveJobsBatch(): failed to launch reporter.");
try {
j->reportFailed(ex.getMessageValue(), lc);
} catch(const cta::exception::NoSuchObject &ex){
params.add("fileId",j->archiveFile.archiveFileID)
.add("reportType",j->reportType())
.add("exceptionMSG",ex.getMessageValue());
lc.log(log::WARNING,"In Scheduler::reportArchiveJobsBatch(): failed to reportFailed the job because it does not exist in the objectstore.");
}
}
}
timingList.insertAndReset("asyncReportLaunchTime", t);
for (auto ¤t: pendingReports) {
try {
current.reporter->waitReport();
reportedJobs.push_back(current.archiveJob);
} catch (cta::exception::Exception & ex) {
// Log the error, update the request.
log::ScopedParamContainer params(lc);
params.add("fileId", current.archiveJob->archiveFile.archiveFileID)
.add("reportType", current.archiveJob->reportType())
.add("exceptionMSG", ex.getMessageValue());
lc.log(log::ERR, "In Scheduler::reportArchiveJobsBatch(): failed to report.");
try {
current.archiveJob->reportFailed(ex.getMessageValue(), lc);
} catch(const cta::exception::NoSuchObject &ex){
params.add("fileId",current.archiveJob->archiveFile.archiveFileID)
.add("reportType",current.archiveJob->reportType())
.add("exceptionMSG",ex.getMessageValue());
lc.log(log::WARNING,"In Scheduler::reportArchiveJobsBatch(): failed to reportFailed the current job because it does not exist in the objectstore.");
}
}
}
timingList.insertAndReset("reportCompletionTime", t);
std::list<SchedulerDatabase::ArchiveJob *> reportedDbJobs;
for (auto &j: reportedJobs) reportedDbJobs.push_back(j->m_dbJob.get());
m_db.setArchiveJobBatchReported(reportedDbJobs, timingList, t, lc);
// Log the successful reports.
for (auto & j: reportedJobs) {
log::ScopedParamContainer params(lc);
params.add("fileId", j->archiveFile.archiveFileID)
.add("reportType", j->reportType());
lc.log(log::INFO, "In Scheduler::reportArchiveJobsBatch(): report successful.");
}
timingList.insertAndReset("reportRecordingInSchedDbTime", t);
log::ScopedParamContainer params(lc);
params.add("totalReports", archiveJobsBatch.size())
.add("failedReports", archiveJobsBatch.size() - reportedJobs.size())
.add("successfulReports", reportedJobs.size());
timingList.addToLog(params);
lc.log(log::INFO, "In Scheduler::reportArchiveJobsBatch(): reported a batch of archive jobs.");
}
//------------------------------------------------------------------------------
// reportRetrieveJobsBatch
//------------------------------------------------------------------------------
void Scheduler::
reportRetrieveJobsBatch(std::list<std::unique_ptr<RetrieveJob>> & retrieveJobsBatch,
disk::DiskReporterFactory & reporterFactory, log::TimingList & timingList, utils::Timer & t, log::LogContext & lc)
{
// Create the reporters
struct JobAndReporter {
std::unique_ptr<disk::DiskReporter> reporter;
RetrieveJob * retrieveJob;
};
std::list<JobAndReporter> pendingReports;
std::list<RetrieveJob*> reportedJobs;
for(auto &j: retrieveJobsBatch) {
pendingReports.push_back(JobAndReporter());
auto & current = pendingReports.back();
// We could fail to create the disk reporter or to get the report URL. This should not impact the other jobs.
try {
current.reporter.reset(reporterFactory.createDiskReporter(j->retrieveRequest.errorReportURL));
current.reporter->asyncReport();
current.retrieveJob = j.get();
} catch (cta::exception::Exception & ex) {
// Whether creation or launching of reporter failed, the promise will not receive result, so we can safely delete it.
// we will first determine if we need to clean up the reporter as well or not.
pendingReports.pop_back();
// We are ready to carry on for other files without interactions.
// Log the error, update the request.
log::ScopedParamContainer params(lc);
params.add("fileId", j->archiveFile.archiveFileID)
.add("reportType", j->reportType())
.add("exceptionMSG", ex.getMessageValue());
lc.log(log::ERR, "In Scheduler::reportRetrieveJobsBatch(): failed to launch reporter.");
j->reportFailed(ex.getMessageValue(), lc);
}
}
timingList.insertAndReset("asyncReportLaunchTime", t);
for(auto ¤t: pendingReports) {
try {
current.reporter->waitReport();
reportedJobs.push_back(current.retrieveJob);
} catch (cta::exception::Exception & ex) {
// Log the error, update the request.
log::ScopedParamContainer params(lc);
params.add("fileId", current.retrieveJob->archiveFile.archiveFileID)
.add("reportType", current.retrieveJob->reportType())
.add("exceptionMSG", ex.getMessageValue());
lc.log(log::ERR, "In Scheduler::reportRetrieveJobsBatch(): failed to report.");
current.retrieveJob->reportFailed(ex.getMessageValue(), lc);
}
}
timingList.insertAndReset("reportCompletionTime", t);
std::list<SchedulerDatabase::RetrieveJob *> reportedDbJobs;
for(auto &j: reportedJobs) reportedDbJobs.push_back(j->m_dbJob.get());
m_db.setRetrieveJobBatchReportedToUser(reportedDbJobs, timingList, t, lc);
// Log the successful reports.
for(auto & j: reportedJobs) {
log::ScopedParamContainer params(lc);
params.add("fileId", j->archiveFile.archiveFileID)
.add("reportType", j->reportType());
lc.log(log::INFO, "In Scheduler::reportRetrieveJobsBatch(): report successful.");
}
timingList.insertAndReset("reportRecordingInSchedDbTime", t);
log::ScopedParamContainer params(lc);
params.add("totalReports", retrieveJobsBatch.size())
.add("failedReports", retrieveJobsBatch.size() - reportedJobs.size())
.add("successfulReports", reportedJobs.size());
timingList.addToLog(params);
lc.log(log::ERR, "In Scheduler::reportRetrieveJobsBatch(): reported a batch of retrieve jobs.");
}
cta::catalogue::Catalogue & Scheduler::getCatalogue(){
return m_catalogue;
}
} // namespace cta