Skip to content
Snippets Groups Projects
Commit 08435858 authored by Eric Cano's avatar Eric Cano
Browse files

Re-implemented Scheduler::getNextMount()

parent 9271c2d7
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,10 @@
*/
#include "scheduler/Scheduler.hpp"
#include "Scheduler.hpp"
#include "ArchiveMount.hpp"
#include "RetrieveMount.hpp"
#include "common/utils/utils.hpp"
#include <iostream>
#include <sstream>
......@@ -282,15 +285,12 @@ std::list<cta::common::dataStructures::DriveState> cta::Scheduler::getDriveState
// getNextMount
//------------------------------------------------------------------------------
std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(const std::string &logicalLibraryName, const std::string &driveName) {
/*
// In order to decide the next mount to do, we have to take a global lock on
// the scheduling, retrieve a list of all running mounts, queues sizes for
// tapes and tape pools, filter the tapes which are actually accessible to
// this drive (by library and dedication), order the candidates by priority
// below threshold, and pick one at a time. In addition, for archives, we
// might not find a suitable tape (by library and dedication). In such a case,
// we should find out if no tape at all is available, and log an error if
// so.
// tapes and tape pools, order the candidates by priority
// below threshold, and pick one at a time, we then attempt to get a tape
// from the catalogue (for the archive mounts), and walk the list until we
// mount or find nothing to do.
// We then skip to the next candidate, until we find a suitable one and
// return the mount, or exhaust all of them an
// Many steps for this logic are not specific for the database and are hence
......@@ -336,17 +336,17 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(const std::string &
existingMounts = 0;
}
bool mountPassesACriteria = false;
if (m->bytesQueued / (1 + existingMounts) >= m->mountCriteria.maxBytesQueued)
if (m->bytesQueued / (1 + existingMounts) >= m_minBytesToWarrantAMount)
mountPassesACriteria = true;
if (m->filesQueued / (1 + existingMounts) >= m->mountCriteria.maxFilesQueued)
if (m->filesQueued / (1 + existingMounts) >= m_minFilesToWarrantAMount)
mountPassesACriteria = true;
if (!existingMounts && ((time(NULL) - m->oldestJobStartTime) > (int64_t)m->mountCriteria.maxAge))
if (!existingMounts && ((time(NULL) - m->oldestJobStartTime) > m->minArchiveRequestAge))
mountPassesACriteria = true;
if (!mountPassesACriteria || existingMounts >= m->mountCriteria.quota) {
if (!mountPassesACriteria || existingMounts >= m->maxDrivesAllowed) {
m = mountInfo->potentialMounts.erase(m);
} else {
// populate the mount with a weight
m->ratioOfMountQuotaUsed = 1.0L * existingMounts / m->mountCriteria.quota;
m->ratioOfMountQuotaUsed = 1.0L * existingMounts / m->maxDrivesAllowed;
m++;
}
}
......@@ -357,6 +357,15 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(const std::string &
std::sort(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
std::reverse(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
// Find out if we have any potential archive mount in the list. If so, get the
// list of tapes from the catalogue.
std::list<cta::catalogue::TapeForWriting> tapeList;
if (std::count_if(
mountInfo->potentialMounts.cbegin(), mountInfo->potentialMounts.cend(),
[](decltype(*mountInfo->potentialMounts.cbegin())& m){ return m.type == cta::MountType::ARCHIVE; } )) {
tapeList = m_catalogue.getTapesForWriting(logicalLibraryName);
}
// We can now simply iterate on the candidates until we manage to create a
// mount for one of them
for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
......@@ -364,23 +373,20 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(const std::string &
if (m->type==cta::MountType::ARCHIVE) {
// We need to find a tape for archiving. It should be both in the right
// tape pool and in the drive's logical library
auto tapesList = m_db.getTapes();
// The first tape matching will go for a prototype.
// TODO: improve to reuse already partially written tapes
for (auto t=tapesList.begin(); t!=tapesList.end(); t++) {
if (t->logicalLibraryName == logicalLibraryName &&
t->tapePoolName == m->tapePool &&
t->status.availableToWrite()) {
// TODO: improve to reuse already partially written tapes and randomization
for (auto & t: tapeList) {
if (t.tapePool == m->tapePool) {
// We have our tape. Try to create the session. Prepare a return value
// for it.
std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount(m_ns));
std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount(m_catalogue));
// Get the db side of the session
try {
internalRet->m_dbMount.reset(mountInfo->createArchiveMount(t->vid,
t->tapePoolName,
internalRet->m_dbMount.reset(mountInfo->createArchiveMount(t.vid,
t.tapePool,
driveName,
logicalLibraryName,
Utils::getShortHostname(),
cta::utils::getShortHostname(),
time(NULL)).release());
internalRet->m_sessionRunning = true;
internalRet->setDriveStatus(cta::common::DriveStatus::Starting);
......@@ -393,33 +399,30 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(const std::string &
} else if (m->type==cta::MountType::RETRIEVE) {
// We know the tape we intend to mount. We have to validate the tape is
// actually available to read, and pass on it if no.
auto tapesList = m_db.getTapes();
for (auto t=tapesList.begin(); t!=tapesList.end(); t++) {
if (t->vid == m->vid && t->status.availableToRead()) {
try {
// create the mount, and populate its DB side.
std::unique_ptr<RetrieveMount> internalRet (
new RetrieveMount(mountInfo->createRetrieveMount(t->vid,
t->tapePoolName,
driveName,
logicalLibraryName,
Utils::getShortHostname(),
time(NULL))));
internalRet->m_sessionRunning = true;
internalRet->m_diskRunning = true;
internalRet->m_tapeRunning = true;
internalRet->setDriveStatus(cta::common::DriveStatus::Starting);
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
std::string debug=ex.getMessageValue();
continue;
}
auto drives = m_db.getDriveStates();
if (!std::count_if(drives.cbegin(), drives.cend(), [&](decltype(*drives.cbegin()) & d){ return d.currentVid == m->vid; })) {
try {
// create the mount, and populate its DB side.
std::unique_ptr<RetrieveMount> internalRet (
new RetrieveMount(mountInfo->createRetrieveMount(m->vid,
m->tapePool,
driveName,
logicalLibraryName,
cta::utils::getShortHostname(),
time(NULL))));
internalRet->m_sessionRunning = true;
internalRet->m_diskRunning = true;
internalRet->m_tapeRunning = true;
internalRet->setDriveStatus(cta::common::DriveStatus::Starting);
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
std::string debug=ex.getMessageValue();
continue;
}
}
} else {
throw std::runtime_error("In Scheduler::getNextMount unexpected mount type");
}
}
*/
throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
return std::unique_ptr<cta::TapeMount>();
}
......@@ -279,7 +279,7 @@ public:
std::string tapePool; /**< The name of the tape pool for both archive and retrieve */
uint64_t priority; /**< The priority for the mount, defined as the highest priority of all queued jobs */
uint64_t maxDrivesAllowed; /**< The maximum number of drives allowed for this tape pool, defined as the highest value amongst jobs */
uint64_t minArchiveRequestAge; /**< The maximum amount of time to wait before forcing a mount in the absence of enough data. Defined as the smallest value amongst jobs.*/
time_t minArchiveRequestAge; /**< The maximum amount of time to wait before forcing a mount in the absence of enough data. Defined as the smallest value amongst jobs.*/
uint64_t filesQueued; /**< The number of files queued for this queue */
uint64_t bytesQueued; /**< The amount of data currently queued */
time_t oldestJobStartTime; /**< Creation time of oldest request */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment