Commit f242b946 authored by Eric Cano's avatar Eric Cano
Browse files

Reviewed and added logs to Scheduler::GetNextMount()

Simplified handling of already (or soon to be) mounted tapes.
Added support for next mounts.
Added logging of decision steps.
parent c5e3fba0
......@@ -145,7 +145,9 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>
m.logicalLibrary = ""; // The logical library is not known here, and will be determined by the caller.
}
}
// Collect information about the existing mounts
// Collect information about the existing and next mounts
// If a next mount exists it "counts double", but the corresponding drive
// is either about to mount, or about to replace its current mount.
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
objectstore::ScopedSharedLock drl(dr);
dr.fetch();
......@@ -158,12 +160,22 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>
(int)cta::common::dataStructures::DriveStatus::Unloading,
(int)cta::common::dataStructures::DriveStatus::Unmounting,
(int)cta::common::dataStructures::DriveStatus::DrainingToDisk };
std::set<int> activeMountTypes = {
(int)cta::common::dataStructures::MountType::Archive,
(int)cta::common::dataStructures::MountType::Retrieve,
(int)cta::common::dataStructures::MountType::Label };
for (auto d=dl.begin(); d!= dl.end(); d++) {
if (activeDriveStatuses.count((int)d->driveStatus)) {
tmdi.existingMounts.push_back(ExistingMount());
tmdi.existingMounts.back().type = d->mountType;
tmdi.existingMounts.back().tapePool = d->currentTapePool;
tmdi.existingMounts.back().driveName = d->driveName;
tmdi.existingOrNextMounts.push_back(ExistingMount());
tmdi.existingOrNextMounts.back().type = d->mountType;
tmdi.existingOrNextMounts.back().tapePool = d->currentTapePool;
tmdi.existingOrNextMounts.back().driveName = d->driveName;
}
if (activeMountTypes.count((int)d->nextMountType)) {
tmdi.existingOrNextMounts.push_back(ExistingMount());
tmdi.existingOrNextMounts.back().type = d->nextMountType;
tmdi.existingOrNextMounts.back().tapePool = d->nextTapepool;
tmdi.existingOrNextMounts.back().driveName = d->driveName;
}
}
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> ret(std::move(privateRet));
......
......@@ -127,7 +127,7 @@ void Scheduler::queueRetrieve(
using utils::postEllipsis;
using utils::midEllipsis;
utils::Timer t;
// Get the
// Get the queue criteria
const common::dataStructures::RetrieveFileQueueCriteria queueCriteria =
m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester);
// Get the statuses of the tapes on which we have files.
......@@ -421,7 +421,7 @@ std::list<common::dataStructures::DriveState> Scheduler::getDriveStates(const co
//------------------------------------------------------------------------------
// getNextMount
//------------------------------------------------------------------------------
std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLibraryName, const std::string &driveName) {
std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLibraryName, const std::string &driveName, log::LogContext & lc) {
// In order to decide the next mount to do, we have to take a global lock on
// the scheduling, retrieve a list of all running mounts, queues sizes for
// tapes and tape pools, order the candidates by priority
......@@ -470,7 +470,8 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib
// with the per tape pool existing mount statistics.
typedef std::pair<std::string, common::dataStructures::MountType> tpType;
std::map<tpType, uint32_t> existingMountsSummary;
for (auto & em: mountInfo->existingMounts) {
std::set<std::string> tapesInUse;
for (auto & em: mountInfo->existingOrNextMounts) {
// If a mount is still listed for our own drive, it is a leftover that we disregard.
if (em.driveName!=driveName) {
try {
......@@ -478,6 +479,13 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib
} catch (std::out_of_range &) {
existingMountsSummary[tpType(em.tapePool, em.type)] = 1;
}
if (em.vid.size()) {
tapesInUse.insert(em.vid);
log::ScopedParamContainer params(lc);
params.add("vid", em.vid)
.add("mountType", common::dataStructures::toString(em.type));
lc.log(log::DEBUG,"In Scheduler::getNextMount(): tapeAlreadyInUse found.");
}
}
}
......@@ -501,6 +509,17 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib
if (!existingMounts && ((time(NULL) - m->oldestJobStartTime) > m->minArchiveRequestAge))
mountPassesACriteria = true;
if (!mountPassesACriteria || existingMounts >= m->maxDrivesAllowed) {
log::ScopedParamContainer params(lc);
params.add("tapepool", m->tapePool)
.add("mountType", common::dataStructures::toString(m->type))
.add("existingMounts", existingMounts)
.add("bytesQueued", m->bytesQueued)
.add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
.add("filesQueued", m->filesQueued)
.add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
.add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
.add("minArchiveRequestAge", m->minArchiveRequestAge);
lc.log(log::DEBUG, "Removing potential mount not passing criteria");
m = mountInfo->potentialMounts.erase(m);
} else {
// populate the mount with a weight
......@@ -523,17 +542,17 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib
[](decltype(*mountInfo->potentialMounts.cbegin())& m){ return m.type == common::dataStructures::MountType::Archive; } )) {
tapeList = m_catalogue.getTapesForWriting(logicalLibraryName);
}
auto fullTapeList=m_catalogue.getTapes(catalogue::TapeSearchCriteria());
for (auto & ftle: fullTapeList) {
ftle.capacityInBytes++;
}
for (auto & t:tapeList) {
auto const & tc=t;
uint64_t cap = tc.capacityInBytes+1;
cap++;
}
// Remove from the tape list the ones already or soon to be mounted
auto t=tapeList.begin();
while (t!=tapeList.end()) {
if (tapesInUse.count(t->vid)) {
t=tapeList.erase(t);
} else {
t++;
}
}
// We can now simply iterate on the candidates until we manage to create a
// mount for one of them
for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
......@@ -557,40 +576,77 @@ std::unique_ptr<TapeMount> Scheduler::getNextMount(const std::string &logicalLib
time(NULL)).release());
internalRet->m_sessionRunning = true;
internalRet->setDriveStatus(common::dataStructures::DriveStatus::Starting);
log::ScopedParamContainer params(lc);
uint32_t existingMounts = 0;
try {
existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type));
} catch (...) {}
params.add("tapepool", m->tapePool)
.add("vid", t.vid)
.add("mountType", common::dataStructures::toString(m->type))
.add("existingMounts", existingMounts)
.add("bytesQueued", m->bytesQueued)
.add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
.add("filesQueued", m->filesQueued)
.add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
.add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
.add("minArchiveRequestAge", m->minArchiveRequestAge);
lc.log(log::DEBUG, "In Scheduler::getNextMount(): Selected next mount (archive)");
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (exception::Exception & ex) {
} catch (cta::exception::Exception & ex) {
log::ScopedParamContainer params(lc);
params.add("Message", ex.getMessage());
lc.log(log::WARNING, "In Scheduler::getNextMount(): got an exception trying to schedule an archive mount. Trying others.");
continue;
}
}
}
} else if (m->type==common::dataStructures::MountType::Retrieve) {
// We know the tape we intend to mount. We have to validate the tape is
// actually available to read, and pass on it if no.
auto drives = m_db.getDriveStates();
if (!std::count_if(drives.cbegin(), drives.cend(), [&](decltype(*drives.cbegin()) & d){ return d.currentVid == m->vid; })) {
// actually available to read (not mounted or about to be mounted, and pass
// on it if so).
if (tapesInUse.count(m->vid)) continue;
try {
// create the mount, and populate its DB side.
std::unique_ptr<RetrieveMount> internalRet (
new RetrieveMount(mountInfo->createRetrieveMount(m->vid,
m->tapePool,
driveName,
logicalLibraryName,
utils::getShortHostname(),
time(NULL))));
internalRet->m_sessionRunning = true;
internalRet->m_diskRunning = true;
internalRet->m_tapeRunning = true;
internalRet->setDriveStatus(common::dataStructures::DriveStatus::Starting);
log::ScopedParamContainer params(lc);
uint32_t existingMounts = 0;
try {
// create the mount, and populate its DB side.
std::unique_ptr<RetrieveMount> internalRet (
new RetrieveMount(mountInfo->createRetrieveMount(m->vid,
m->tapePool,
driveName,
logicalLibraryName,
utils::getShortHostname(),
time(NULL))));
internalRet->m_sessionRunning = true;
internalRet->m_diskRunning = true;
internalRet->m_tapeRunning = true;
internalRet->setDriveStatus(common::dataStructures::DriveStatus::Starting);
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (exception::Exception & ex) {
std::string debug=ex.getMessageValue();
continue;
}
existingMounts=existingMountsSummary.at(tpType(m->tapePool, m->type));
} catch (...) {}
params.add("tapepool", m->tapePool)
.add("vid", m->vid)
.add("mountType", common::dataStructures::toString(m->type))
.add("existingMounts", existingMounts)
.add("bytesQueued", m->bytesQueued)
.add("minBytesToWarrantMount", m_minBytesToWarrantAMount)
.add("filesQueued", m->filesQueued)
.add("minFilesToWarrantMount", m_minFilesToWarrantAMount)
.add("oldestJobAge", time(NULL) - m->oldestJobStartTime)
.add("minArchiveRequestAge", m->minArchiveRequestAge);
lc.log(log::DEBUG, "In Scheduler::getNextMount(): Selected next mount (retrieve)");
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (exception::Exception & ex) {
log::ScopedParamContainer params(lc);
params.add("Message", ex.getMessage());
lc.log(log::WARNING, "In Scheduler::getNextMount(): got an exception trying to schedule a retrieve mount. Trying others.");
continue;
}
} else {
throw std::runtime_error("In Scheduler::getNextMount unexpected mount type");
}
}
lc.log(log::DEBUG, "In Scheduler::getNextMount(): No valid mount found.");
return std::unique_ptr<TapeMount>();
}
......
......@@ -238,7 +238,7 @@ public:
const cta::common::dataStructures::SecurityIdentity &cliIdentity) const;
/*============== Actual mount scheduling ===================================*/
std::unique_ptr<TapeMount> getNextMount(const std::string &logicalLibraryName, const std::string &driveName);
std::unique_ptr<TapeMount> getNextMount(const std::string &logicalLibraryName, const std::string &driveName, log::LogContext & lc);
/*============== Administrator management ==================================*/
void authorizeAdmin(const cta::common::dataStructures::SecurityIdentity &cliIdentity);
......
......@@ -400,6 +400,7 @@ public:
std::string driveName;
cta::common::dataStructures::MountType type;
std::string tapePool;
std::string vid;
};
/**
......@@ -419,7 +420,7 @@ public:
class TapeMountDecisionInfo {
public:
std::vector<PotentialMount> potentialMounts; /**< All the potential mounts */
std::vector<ExistingMount> existingMounts; /**< Existing mounts */
std::vector<ExistingMount> existingOrNextMounts; /**< Existing mounts */
std::map<std::string, DedicationEntry> dedicationInfo; /**< Drives dedication info */
/**
* Create a new archive mount. This implicitly releases the global scheduling
......
......@@ -34,6 +34,10 @@
#include "tests/TempFile.hpp"
#include "common/log/DummyLogger.hpp"
#include "objectstore/BackendRadosTestSwitch.hpp"
#include "tests/TestsCompileTimeSwitches.hpp"
#ifdef STDOUT_LOGGING
#include "common/log/StdoutLogger.hpp"
#endif
#include <exception>
#include <gtest/gtest.h>
......@@ -334,8 +338,11 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
auto &catalogue = getCatalogue();
setupDefaultCatalogue();
#ifdef STDOUT_LOGGING
log::StdoutLogger dl("unitTest");
#else
log::DummyLogger dl("");
#endif
log::LogContext lc(dl);
uint64_t archiveFileId;
......@@ -409,7 +416,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
cta::common::dataStructures::DriveInfo driveInfo = { driveName, "myHost", s_libraryName };
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Down);
scheduler.reportDriveStatus(driveInfo, cta::common::dataStructures::MountType::NoMount, cta::common::dataStructures::DriveStatus::Up);
mount.reset(scheduler.getNextMount(s_libraryName, "drive0").release());
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE((cta::TapeMount*)NULL, mount.get());
ASSERT_EQ(cta::common::dataStructures::MountType::Archive, mount.get()->getMountType());
std::unique_ptr<cta::ArchiveMount> archiveMount;
......@@ -472,7 +479,7 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
// Emulate a tape server by asking for a mount and then a file (and succeed
// the transfer)
std::unique_ptr<cta::TapeMount> mount;
mount.reset(scheduler.getNextMount(s_libraryName, "drive0").release());
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE((cta::TapeMount*)NULL, mount.get());
ASSERT_EQ(cta::common::dataStructures::MountType::Retrieve, mount.get()->getMountType());
std::unique_ptr<cta::RetrieveMount> retrieveMount;
......@@ -495,7 +502,11 @@ TEST_P(SchedulerTest, retry_archive_until_max_reached) {
auto &scheduler = getScheduler();
auto &catalogue = getCatalogue();
#ifdef STDOUT_LOGGING
log::StdoutLogger dl("unitTest");
#else
log::DummyLogger dl("");
#endif
log::LogContext lc(dl);
uint64_t archiveFileId;
......@@ -549,7 +560,7 @@ TEST_P(SchedulerTest, retry_archive_until_max_reached) {
{
// Emulate a tape server by asking for a mount and then a file
std::unique_ptr<cta::TapeMount> mount;
mount.reset(scheduler.getNextMount(s_libraryName, "drive0").release());
mount.reset(scheduler.getNextMount(s_libraryName, "drive0", lc).release());
ASSERT_NE((cta::TapeMount*)NULL, mount.get());
ASSERT_EQ(cta::common::dataStructures::MountType::Archive, mount.get()->getMountType());
std::unique_ptr<cta::ArchiveMount> archiveMount;
......
......@@ -129,7 +129,7 @@ schedule:
// 2b) Get initial mount information
std::unique_ptr<cta::TapeMount> tapeMount;
try {
tapeMount.reset(m_scheduler.getNextMount(m_driveConfig.logicalLibrary, m_driveConfig.unitName).release());
tapeMount.reset(m_scheduler.getNextMount(m_driveConfig.logicalLibrary, m_driveConfig.unitName, lc).release());
} catch (cta::exception::Exception & e) {
cta::log::ScopedParamContainer localParams(lc);
localParams.add("errorMessage", e.getMessageValue());
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment