Commit 8f2d02ae authored by Eric Cano's avatar Eric Cano
Browse files

Implemented getNextMount for ArchiveForRepack mounts.

parent c59e9fa1
......@@ -18,24 +18,42 @@
#include "common/dataStructures/MountType.hpp"
std::string cta::common::dataStructures::toString(cta::common::dataStructures::MountType type) {
namespace cta {
namespace common {
namespace dataStructures {
std::string toString(cta::common::dataStructures::MountType type) {
switch(type) {
case cta::common::dataStructures::MountType::ArchiveForUser:
case MountType::ArchiveForUser:
return "ArchiveForUser";
case cta::common::dataStructures::MountType::ArchiveForRepack:
case MountType::ArchiveForRepack:
return "ArchiveForRepack";
case cta::common::dataStructures::MountType::Retrieve:
case MountType::ArchiveAllTypes:
return "ArchiveAllTypes";
case MountType::Retrieve:
return "Retrieve";
case cta::common::dataStructures::MountType::Label:
case MountType::Label:
return "Label";
case cta::common::dataStructures::MountType::NoMount:
case MountType::NoMount:
return "-";
default:
return "UNKNOWN";
}
}
std::ostream & cta::common::dataStructures::operator<<(std::ostream &os,
MountType getMountBasicType(MountType type) {
switch(type) {
case MountType::ArchiveForUser:
case MountType::ArchiveForRepack:
return MountType::ArchiveAllTypes;
default:
return type;
}
}
std::ostream & operator<<(std::ostream &os,
const cta::common::dataStructures::MountType &obj) {
return os << toString(obj);
}
}}} // namespace cta::common::dataStructures
......@@ -28,12 +28,18 @@ enum class MountType: uint32_t {
ArchiveForRepack = 2,
Retrieve = 3,
Label = 4,
NoMount = 0
NoMount = 0,
/// A summary type used in scheduling.
ArchiveAllTypes = 99
};
/// A function summarizing subtypes (currently only Archive) to simplify scheduling.
MountType getMountBasicType(MountType type);
std::string toString(MountType type);
std::ostream &operator <<(std::ostream& os, const MountType &obj);
} // namespace dataStructures
} // namespace common
} // namespace cta
......@@ -479,6 +479,18 @@ struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PoppedElemen
}
};
template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForRepack>::PopCriteria:
public ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PopCriteria {
using ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PopCriteria::PopCriteria;
};
template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForRepack>::PoppedElementsSummary:
public ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PoppedElementsSummary {
using ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::PoppedElementsSummary::PoppedElementsSummary;
};
template<>
struct ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForUser>::QueueType {
objectstore::JobQueueType value = objectstore::JobQueueType::JobsToTransferForUser;
......
......@@ -70,4 +70,12 @@ addToLog(log::ScopedParamContainer &params) {
.add("files", summary.files);
}
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForRepack>::
getContainerSummary(Container& cont) -> ContainerSummary {
ContainerSummary ret;
ret.JobsSummary::operator=(cont.getJobsSummary());
return ret;
}
}} // namespace cta::objectstore
......@@ -54,19 +54,5 @@ namespace cta { namespace objectstore {
}
return ret;
}
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForRepack>::PopCriteria::
operator-=(const PoppedElementsSummary& pes) -> PopCriteria & {
files -= pes.files;
return *this;
}
template<>
auto ContainerTraits<ArchiveQueue,ArchiveQueueToTransferForRepack>::
getContainerSummary(Container& cont) -> ContainerSummary {
ContainerSummary ret;
ret.JobsSummary::operator=(cont.getJobsSummary());
return ret;
}
}}
......@@ -185,7 +185,7 @@ void OStoreDB::ping() {
void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, RootEntry& re,
log::LogContext & logContext) {
utils::Timer t, t2;
// Walk the archive queues for statistics
// Walk the archive queues for user for statistics
for (auto & aqp: re.dumpArchiveQueues(JobQueueType::JobsToTransferForUser)) {
objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore);
// debug utility variable
......@@ -201,7 +201,7 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
params.add("queueObject", aqp.address)
.add("tapePool", aqp.tapePool)
.add("exceptionMessage", ex.getMessageValue());
logContext.log(log::WARNING, "In OStoreDB::fetchMountInfo(): failed to lock/fetch an archive queue. Skipping it.");
logContext.log(log::WARNING, "In OStoreDB::fetchMountInfo(): failed to lock/fetch an archive queue for user. Skipping it.");
continue;
}
// If there are files queued, we create an entry for this tape pool in the
......@@ -225,10 +225,57 @@ void OStoreDB::fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, Ro
log::ScopedParamContainer params (logContext);
params.add("queueObject", aqp.address)
.add("tapePool", aqp.tapePool)
.add("queueType", toString(cta::common::dataStructures::MountType::ArchiveForUser))
.add("queueLockTime", queueLockTime)
.add("queueFetchTime", queueFetchTime)
.add("processingTime", processingTime);
logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched an archive queue.");
logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched an archive for user queue.");
}
// Walk the archive queues for user for statistics
for (auto & aqp: re.dumpArchiveQueues(JobQueueType::JobsToTransferForRepack)) {
objectstore::ArchiveQueue aqueue(aqp.address, m_objectStore);
// debug utility variable
std::string __attribute__((__unused__)) poolName = aqp.tapePool;
objectstore::ScopedSharedLock aqlock;
double queueLockTime = 0;
double queueFetchTime = 0;
try {
aqueue.fetchNoLock();
queueFetchTime = t.secs(utils::Timer::resetCounter);
} catch (cta::exception::Exception &ex) {
log::ScopedParamContainer params (logContext);
params.add("queueObject", aqp.address)
.add("tapePool", aqp.tapePool)
.add("exceptionMessage", ex.getMessageValue());
logContext.log(log::WARNING, "In OStoreDB::fetchMountInfo(): failed to lock/fetch an archive queue for repack. Skipping it.");
continue;
}
// If there are files queued, we create an entry for this tape pool in the
// mount candidates list.
if (aqueue.getJobsSummary().jobs) {
tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
auto & m = tmdi.potentialMounts.back();
m.tapePool = aqp.tapePool;
m.type = cta::common::dataStructures::MountType::ArchiveForRepack;
m.bytesQueued = aqueue.getJobsSummary().bytes;
m.filesQueued = aqueue.getJobsSummary().jobs;
m.oldestJobStartTime = aqueue.getJobsSummary().oldestJobStartTime;
m.priority = aqueue.getJobsSummary().priority;
m.maxDrivesAllowed = aqueue.getJobsSummary().maxDrivesAllowed;
m.minRequestAge = aqueue.getJobsSummary().minArchiveRequestAge;
m.logicalLibrary = "";
} else {
tmdi.queueTrimRequired = true;
}
auto processingTime = t.secs(utils::Timer::resetCounter);
log::ScopedParamContainer params (logContext);
params.add("queueObject", aqp.address)
.add("tapePool", aqp.tapePool)
.add("queueType", toString(cta::common::dataStructures::MountType::ArchiveForRepack))
.add("queueLockTime", queueLockTime)
.add("queueFetchTime", queueFetchTime)
.add("processingTime", processingTime);
logContext.log(log::INFO, "In OStoreDB::fetchMountInfo(): fetched an archive for repack queue.");
}
// Walk the retrieve queues for statistics
for (auto & rqp: re.dumpRetrieveQueues(JobQueueType::JobsToTransferForUser)) {
......@@ -441,6 +488,7 @@ void OStoreDB::trimEmptyQueues(log::LogContext& lc) {
// OStoreDB::TapeMountDecisionInfoNoLock::createArchiveMount()
//------------------------------------------------------------------------------
std::unique_ptr<SchedulerDatabase::ArchiveMount> OStoreDB::TapeMountDecisionInfoNoLock::createArchiveMount(
common::dataStructures::MountType type,
const catalogue::TapeForWriting& tape,
const std::string driveName,
const std::string& logicalLibrary,
......@@ -2931,14 +2979,26 @@ void OStoreDB::setDriveShutdown(common::dataStructures::DriveState & driveState,
//------------------------------------------------------------------------------
std::unique_ptr<SchedulerDatabase::ArchiveMount>
OStoreDB::TapeMountDecisionInfo::createArchiveMount(
common::dataStructures::MountType type,
const catalogue::TapeForWriting & tape, const std::string driveName,
const std::string& logicalLibrary, const std::string& hostName, const std::string& vo, const std::string& mediaType,
const std::string& vendor,uint64_t capacityInBytes, time_t startTime) {
// In order to create the mount, we have to:
// Check we actually hold the scheduling lock
// Set the drive status to up, and indicate which tape we use.
objectstore::JobQueueType queueType;
switch (type) {
case common::dataStructures::MountType::ArchiveForUser:
queueType = objectstore::JobQueueType::JobsToTransferForUser;
break;
case common::dataStructures::MountType::ArchiveForRepack:
queueType = objectstore::JobQueueType::JobsToTransferForRepack;
break;
default:
throw cta::exception::Exception("In OStoreDB::TapeMountDecisionInfo::createArchiveMount(): unexpected mount type.");
}
std::unique_ptr<OStoreDB::ArchiveMount> privateRet(
new OStoreDB::ArchiveMount(m_oStoreDB));
new OStoreDB::ArchiveMount(m_oStoreDB, queueType));
auto &am = *privateRet;
// Check we hold the scheduling lock
if (!m_lockTaken)
......@@ -2996,7 +3056,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount>
OStoreDB::TapeMountDecisionInfo::TapeMountDecisionInfo(OStoreDB & oStoreDb): m_lockTaken(false), m_oStoreDB(oStoreDb) {}
//------------------------------------------------------------------------------
// OStoreDB::TapeMountDecisionInfo::createArchiveMount()
// OStoreDB::TapeMountDecisionInfo::createRetrieveMount()
//------------------------------------------------------------------------------
std::unique_ptr<SchedulerDatabase::RetrieveMount>
OStoreDB::TapeMountDecisionInfo::createRetrieveMount(
......@@ -3077,7 +3137,8 @@ OStoreDB::TapeMountDecisionInfo::~TapeMountDecisionInfo() {
//------------------------------------------------------------------------------
// OStoreDB::ArchiveMount::ArchiveMount()
//------------------------------------------------------------------------------
OStoreDB::ArchiveMount::ArchiveMount(OStoreDB & oStoreDB): m_oStoreDB(oStoreDB) {}
OStoreDB::ArchiveMount::ArchiveMount(OStoreDB & oStoreDB, objectstore::JobQueueType queueType):
m_oStoreDB(oStoreDB), m_queueType(queueType) {}
//------------------------------------------------------------------------------
// OStoreDB::ArchiveMount::getMountInfo()
......@@ -3091,29 +3152,55 @@ const SchedulerDatabase::ArchiveMount::MountInfo& OStoreDB::ArchiveMount::getMou
//------------------------------------------------------------------------------
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > OStoreDB::ArchiveMount::getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, log::LogContext& logContext) {
typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser> AQAlgos;
AQAlgos aqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
AQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, popCriteria, logContext);
// We can construct the return value.
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret;
for (auto & j: jobs.elements) {
std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), m_oStoreDB));
aj->tapeFile.copyNb = j.copyNb;
aj->archiveFile = j.archiveFile;
aj->archiveReportURL = j.archiveReportURL;
aj->errorReportURL = j.errorReportURL;
aj->srcURL = j.srcURL;
aj->tapeFile.fSeq = ++nbFilesCurrentlyOnTape;
aj->tapeFile.vid = mountInfo.vid;
aj->tapeFile.blockId =
std::numeric_limits<decltype(aj->tapeFile.blockId)>::max();
aj->m_jobOwned = true;
aj->m_mountId = mountInfo.mountId;
aj->m_tapePool = mountInfo.tapePool;
ret.emplace_back(std::move(aj));
if (m_queueType == objectstore::JobQueueType::JobsToTransferForUser) {
typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForUser> AQAlgos;
AQAlgos aqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
AQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, popCriteria, logContext);
// We can construct the return value.
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret;
for (auto & j: jobs.elements) {
std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), m_oStoreDB));
aj->tapeFile.copyNb = j.copyNb;
aj->archiveFile = j.archiveFile;
aj->archiveReportURL = j.archiveReportURL;
aj->errorReportURL = j.errorReportURL;
aj->srcURL = j.srcURL;
aj->tapeFile.fSeq = ++nbFilesCurrentlyOnTape;
aj->tapeFile.vid = mountInfo.vid;
aj->tapeFile.blockId =
std::numeric_limits<decltype(aj->tapeFile.blockId)>::max();
aj->m_jobOwned = true;
aj->m_mountId = mountInfo.mountId;
aj->m_tapePool = mountInfo.tapePool;
ret.emplace_back(std::move(aj));
}
return ret;
} else {
typedef objectstore::ContainerAlgorithms<ArchiveQueue,ArchiveQueueToTransferForRepack> AQAlgos;
AQAlgos aqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
AQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
auto jobs = aqAlgos.popNextBatch(mountInfo.tapePool, popCriteria, logContext);
// We can construct the return value.
std::list<std::unique_ptr<SchedulerDatabase::ArchiveJob> > ret;
for (auto & j: jobs.elements) {
std::unique_ptr<OStoreDB::ArchiveJob> aj(new OStoreDB::ArchiveJob(j.archiveRequest->getAddressIfSet(), m_oStoreDB));
aj->tapeFile.copyNb = j.copyNb;
aj->archiveFile = j.archiveFile;
aj->archiveReportURL = j.archiveReportURL;
aj->errorReportURL = j.errorReportURL;
aj->srcURL = j.srcURL;
aj->tapeFile.fSeq = ++nbFilesCurrentlyOnTape;
aj->tapeFile.vid = mountInfo.vid;
aj->tapeFile.blockId =
std::numeric_limits<decltype(aj->tapeFile.blockId)>::max();
aj->m_jobOwned = true;
aj->m_mountId = mountInfo.mountId;
aj->m_tapePool = mountInfo.tapePool;
ret.emplace_back(std::move(aj));
}
return ret;
}
return ret;
}
//------------------------------------------------------------------------------
......
......@@ -94,6 +94,7 @@ public:
CTA_GENERATE_EXCEPTION_CLASS(TapeNotWritable);
CTA_GENERATE_EXCEPTION_CLASS(TapeIsBusy);
std::unique_ptr<SchedulerDatabase::ArchiveMount> createArchiveMount(
common::dataStructures::MountType mountType,
const catalogue::TapeForWriting & tape,
const std::string driveName, const std::string& logicalLibrary,
const std::string & hostName,
......@@ -121,6 +122,7 @@ public:
class TapeMountDecisionInfoNoLock: public SchedulerDatabase::TapeMountDecisionInfo {
public:
std::unique_ptr<SchedulerDatabase::ArchiveMount> createArchiveMount(
common::dataStructures::MountType mountType,
const catalogue::TapeForWriting & tape,
const std::string driveName, const std::string& logicalLibrary,
const std::string & hostName, const std::string& vo, const std::string& mediaType,
......@@ -155,8 +157,9 @@ public:
class ArchiveMount: public SchedulerDatabase::ArchiveMount {
friend class TapeMountDecisionInfo;
private:
ArchiveMount(OStoreDB & oStoreDB);
ArchiveMount(OStoreDB & oStoreDB, objectstore::JobQueueType queueType);
OStoreDB & m_oStoreDB;
objectstore::JobQueueType m_queueType;
public:
CTA_GENERATE_EXCEPTION_CLASS(MaxFSeqNotGoingUp);
const MountInfo & getMountInfo() override;
......
......@@ -166,7 +166,7 @@ TEST_P(OStoreDBTest, getBatchArchiveJob) {
tape.lastFSeq = 1;
tape.tapePool = "Tapepool1";
tape.vid = "tape";
auto mount = mountInfo->createArchiveMount(tape, "drive", "library", "host", "vo","mediaType","vendor",123456789,::time(nullptr));
auto mount = mountInfo->createArchiveMount(cta::common::dataStructures::MountType::ArchiveForUser, tape, "drive", "library", "host", "vo","mediaType","vendor",123456789,::time(nullptr));
auto giveAll = std::numeric_limits<uint64_t>::max();
auto jobs = mount->getNextJobBatch(giveAll, giveAll, lc);
ASSERT_EQ(8, jobs.size());
......
......@@ -764,9 +764,9 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
// If a mount is still listed for our own drive, it is a leftover that we disregard.
if (em.driveName!=driveName) {
try {
existingMountsSummary.at(tpType(em.tapePool, em.type))++;
existingMountsSummary.at(tpType(em.tapePool, common::dataStructures::getMountBasicType(em.type)))++;
} catch (std::out_of_range &) {
existingMountsSummary[tpType(em.tapePool, em.type)] = 1;
existingMountsSummary[tpType(em.tapePool, common::dataStructures::getMountBasicType(em.type))] = 1;
}
if (em.vid.size()) {
tapesInUse.insert(em.vid);
......@@ -787,7 +787,7 @@ void Scheduler::sortAndGetTapesForMountInfo(std::unique_ptr<SchedulerDatabase::T
// Get summary data
uint32_t existingMounts;
try {
existingMounts = existingMountsSummary.at(tpType(m->tapePool, m->type));
existingMounts = existingMountsSummary.at(tpType(m->tapePool, common::dataStructures::getMountBasicType(m->type)));
} catch (std::out_of_range &) {
existingMounts = 0;
}
......@@ -899,7 +899,7 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const
// We can now simply iterate on the candidates until we manage to find a valid mount
for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
// If the mount is an archive, we still have to find a tape.
if (m->type==common::dataStructures::MountType::ArchiveForUser) {
if (common::dataStructures::getMountBasicType(m->type)==common::dataStructures::MountType::ArchiveAllTypes) {
// We need to find a tape for archiving. It should be both in the right
// tape pool and in the drive's logical library
// The first tape matching will go for a prototype.
......@@ -912,7 +912,7 @@ bool Scheduler::getNextMountDryRun(const std::string& logicalLibraryName, const
catalogueTime = getTapeInfoTime + getTapeForWriteTime;
uint32_t existingMounts = 0;
try {
existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type));
existingMounts=existingMountsSummary.at(tpType(m->tapePool, common::dataStructures::getMountBasicType(m->type)));
} catch (...) {}
log::ScopedParamContainer params(lc);
params.add("tapePool", m->tapePool)
......@@ -1033,7 +1033,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib
// mount for one of them
for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
// If the mount is an archive, we still have to find a tape.
if (m->type==common::dataStructures::MountType::ArchiveForUser) {
if (common::dataStructures::getMountBasicType(m->type)==common::dataStructures::MountType::ArchiveAllTypes) {
// We need to find a tape for archiving. It should be both in the right
// tape pool and in the drive's logical library
// The first tape matching will go for a prototype.
......@@ -1046,7 +1046,7 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib
// Get the db side of the session
try {
decisionTime += timer.secs(utils::Timer::resetCounter);
internalRet->m_dbMount.reset(mountInfo->createArchiveMount(t,
internalRet->m_dbMount.reset(mountInfo->createArchiveMount(m->type, t,
driveName,
logicalLibraryName,
utils::getShortHostname(),
......
......@@ -591,6 +591,7 @@ public:
* lock.
*/
virtual std::unique_ptr<ArchiveMount> createArchiveMount(
common::dataStructures::MountType mountType,
const catalogue::TapeForWriting & tape, const std::string driveName,
const std::string & logicalLibrary, const std::string & hostName,
const std::string& vo, const std::string& mediaType,
......
......@@ -195,7 +195,7 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
cta::catalogue::TapeForWriting tfw;
tfw.tapePool = "tapePool";
tfw.vid = "vid";
auto am = moutInfo->createArchiveMount(tfw, "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr));
auto am = moutInfo->createArchiveMount(common::dataStructures::MountType::ArchiveForUser, tfw, "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr));
bool done = false;
size_t count = 0;
while (!done) {
......@@ -282,7 +282,7 @@ TEST_P(SchedulerDatabaseTest, createManyArchiveJobs) {
auto count = 0;
#else
moutInfo = db.getMountInfo(lc);
am = moutInfo->createArchiveMount(tfw, "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr));
am = moutInfo->createArchiveMount(common::dataStructures::MountType::ArchiveForUser, tfw, "drive", "library", "host", "vo","mediaType", "vendor",123456789,time(nullptr));
done = false;
count = 0;
#endif
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment