Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
cta
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container Registry
Harbor Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
dCache
cta
Commits
c1b1d7f4
Commit
c1b1d7f4
authored
9 years ago
by
Daniele Kruse
Browse files
Options
Downloads
Patches
Plain Diff
added old code for getnextmount for future adaptation
parent
aedd9c9a
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
scheduler/OStoreDB/OStoreDB.cpp
+106
-0
106 additions, 0 deletions
scheduler/OStoreDB/OStoreDB.cpp
scheduler/Scheduler.cpp
+139
-0
139 additions, 0 deletions
scheduler/Scheduler.cpp
with
245 additions
and
0 deletions
scheduler/OStoreDB/OStoreDB.cpp
+
106
−
0
View file @
c1b1d7f4
...
...
@@ -171,6 +171,112 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>
return
ret
;
}
/* Old getMountInfo
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>
OStoreDB::getMountInfo() {
//Allocate the getMountInfostructure to return.
assertAgentSet();
std::unique_ptr<TapeMountDecisionInfo> privateRet (new TapeMountDecisionInfo(
m_objectStore, *m_agent));
TapeMountDecisionInfo & tmdi=*privateRet;
// Get all the tape pools and tapes with queues (potential mounts)
objectstore::RootEntry re(m_objectStore);
objectstore::ScopedSharedLock rel(re);
re.fetch();
// Take an exclusive lock on the scheduling and fetch it.
tmdi.m_schedulerGlobalLock.reset(
new SchedulerGlobalLock(re.getSchedulerGlobalLock(), m_objectStore));
tmdi.m_lockOnSchedulerGlobalLock.lock(*tmdi.m_schedulerGlobalLock);
tmdi.m_lockTaken = true;
tmdi.m_schedulerGlobalLock->fetch();
auto tpl = re.dumpTapePools();
for (auto tpp=tpl.begin(); tpp!=tpl.end(); tpp++) {
// Get the tape pool object
objectstore::TapePool tpool(tpp->address, m_objectStore);
// debug utility variable
std::string __attribute__((__unused__)) poolName = tpp->tapePool;
objectstore::ScopedSharedLock tpl(tpool);
tpool.fetch();
// If there are files queued, we create an entry for this tape pool in the
// mount candidates list.
if (tpool.getJobsSummary().files) {
tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
auto & m = tmdi.potentialMounts.back();
m.tapePool = tpp->tapePool;
m.type = cta::MountType::ARCHIVE;
m.bytesQueued = tpool.getJobsSummary().bytes;
m.filesQueued = tpool.getJobsSummary().files;
m.oldestJobStartTime = tpool.getJobsSummary().oldestJobStartTime;
m.priority = tpool.getJobsSummary().priority;
m.mountCriteria.maxFilesQueued =
tpool.getMountCriteriaByDirection().archive.maxFilesQueued;
m.mountCriteria.maxBytesQueued =
tpool.getMountCriteriaByDirection().archive.maxBytesQueued;
m.mountCriteria.maxAge =
tpool.getMountCriteriaByDirection().archive.maxAge;
m.mountCriteria.quota =
tpool.getMountCriteriaByDirection().archive.quota;
m.logicalLibrary = "";
}
// For each tape in the pool, list the tapes with work
auto tl = tpool.dumpTapesAndFetchStatus();
for (auto tp = tl.begin(); tp!= tl.end(); tp++) {
objectstore::Tape t(tp->address, m_objectStore);
objectstore::ScopedSharedLock tl(t);
t.fetch();
if (t.getJobsSummary().files) {
tmdi.potentialMounts.push_back(PotentialMount());
auto & m = tmdi.potentialMounts.back();
m.type = cta::MountType::RETRIEVE;
m.bytesQueued = t.getJobsSummary().bytes;
m.filesQueued = t.getJobsSummary().files;
m.oldestJobStartTime = t.getJobsSummary().oldestJobStartTime;
m.priority = t.getJobsSummary().priority;
m.vid = t.getVid();
m.logicalLibrary = t.getLogicalLibrary();
m.mountCriteria.maxFilesQueued =
tpool.getMountCriteriaByDirection().retrieve.maxFilesQueued;
m.mountCriteria.maxBytesQueued =
tpool.getMountCriteriaByDirection().retrieve.maxBytesQueued;
m.mountCriteria.maxAge =
tpool.getMountCriteriaByDirection().retrieve.maxAge;
m.mountCriteria.quota =
tpool.getMountCriteriaByDirection().retrieve.quota;
m.logicalLibrary = t.getLogicalLibrary();
}
}
}
// Dedication information comes here
// TODO
//
// Collect information about the existing mounts
objectstore::DriveRegister dr(re.getDriveRegisterAddress(), m_objectStore);
objectstore::ScopedSharedLock drl(dr);
dr.fetch();
auto dl = dr.dumpDrives();
using common::DriveStatus;
std::set<int> activeDriveStatuses = {
(int)DriveStatus::Starting,
(int)DriveStatus::Mounting,
(int)DriveStatus::Transfering,
(int)DriveStatus::Unloading,
(int)DriveStatus::Unmounting,
(int)DriveStatus::DrainingToDisk };
for (auto d=dl.begin(); d!= dl.end(); d++) {
if (activeDriveStatuses.count((int)d->status)) {
tmdi.existingMounts.push_back(ExistingMount());
tmdi.existingMounts.back().type = d->mountType;
tmdi.existingMounts.back().tapePool = d->currentTapePool;
}
}
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> ret(std::move(privateRet));
return ret;
}
*/
void
OStoreDB
::
createStorageClass
(
const
std
::
string
&
name
,
const
uint16_t
nbCopies
,
const
cta
::
CreationLog
&
creationLog
)
{
RootEntry
re
(
m_objectStore
);
...
...
This diff is collapsed.
Click to expand it.
scheduler/Scheduler.cpp
+
139
−
0
View file @
c1b1d7f4
...
...
@@ -265,5 +265,144 @@ std::list<cta::common::dataStructures::DriveState> cta::Scheduler::getDriveState
// getNextMount
//------------------------------------------------------------------------------
std
::
unique_ptr
<
cta
::
TapeMount
>
cta
::
Scheduler
::
getNextMount
(
const
std
::
string
&
logicalLibraryName
,
const
std
::
string
&
driveName
)
{
/*
// In order to decide the next mount to do, we have to take a global lock on
// the scheduling, retrieve a list of all running mounts, queues sizes for
// tapes and tape pools, filter the tapes which are actually accessible to
// this drive (by library and dedication), order the candidates by priority
// below threshold, and pick one at a time. In addition, for archives, we
// might not find a suitable tape (by library and dedication). In such a case,
// we should find out if no tape at all is available, and log an error if
// so.
// We then skip to the next candidate, until we find a suitable one and
// return the mount, or exhaust all of them an
// Many steps for this logic are not specific for the database and are hence
// implemented in the scheduler itself.
// First, get the mount-related info from the DB
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo;
mountInfo = m_db.getMountInfo();
// We should now filter the potential mounts to keep only the ones we are
// compatible with (match the logical library for retrieves).
// We also only want the potential mounts for which we still have
// We cannot filter the archives yet
for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
if (m->type == MountType::RETRIEVE && m->logicalLibrary != logicalLibraryName) {
m = mountInfo->potentialMounts.erase(m);
} else {
m++;
}
}
// With the existing mount list, we can now populate the potential mount list
// with the per tape pool existing mount statistics.
typedef std::pair<std::string, cta::MountType::Enum> tpType;
std::map<tpType, uint32_t> existingMountsSummary;
for (auto em=mountInfo->existingMounts.begin(); em!=mountInfo->existingMounts.end(); em++) {
try {
existingMountsSummary.at(tpType(em->tapePool, em->type))++;
} catch (std::out_of_range &) {
existingMountsSummary[tpType(em->tapePool, em->type)] = 1;
}
}
// We can now filter out the potential mounts for which their mount criteria
// is already met, filter out the potential mounts for which the maximum mount
// quota is already reached, and weight the remaining by how much of their quota
// is reached
for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
// Get summary data
uint32_t existingMounts;
try {
existingMounts = existingMountsSummary.at(tpType(m->tapePool, m->type));
} catch (std::out_of_range &) {
existingMounts = 0;
}
bool mountPassesACriteria = false;
if (m->bytesQueued / (1 + existingMounts) >= m->mountCriteria.maxBytesQueued)
mountPassesACriteria = true;
if (m->filesQueued / (1 + existingMounts) >= m->mountCriteria.maxFilesQueued)
mountPassesACriteria = true;
if (!existingMounts && ((time(NULL) - m->oldestJobStartTime) > (int64_t)m->mountCriteria.maxAge))
mountPassesACriteria = true;
if (!mountPassesACriteria || existingMounts >= m->mountCriteria.quota) {
m = mountInfo->potentialMounts.erase(m);
} else {
// populate the mount with a weight
m->ratioOfMountQuotaUsed = 1.0L * existingMounts / m->mountCriteria.quota;
m++;
}
}
// We can now sort the potential mounts in decreasing priority order.
// The ordering is defined in operator <.
// We want the result in descending order or priority so we reverse the vector
std::sort(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
std::reverse(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
// We can now simply iterate on the candidates until we manage to create a
// mount for one of them
for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
// If the mount is an archive, we still have to find a tape.
if (m->type==cta::MountType::ARCHIVE) {
// We need to find a tape for archiving. It should be both in the right
// tape pool and in the drive's logical library
auto tapesList = m_db.getTapes();
// The first tape matching will go for a prototype.
// TODO: improve to reuse already partially written tapes
for (auto t=tapesList.begin(); t!=tapesList.end(); t++) {
if (t->logicalLibraryName == logicalLibraryName &&
t->tapePoolName == m->tapePool &&
t->status.availableToWrite()) {
// We have our tape. Try to create the session. Prepare a return value
// for it.
std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount(m_ns));
// Get the db side of the session
try {
internalRet->m_dbMount.reset(mountInfo->createArchiveMount(t->vid,
t->tapePoolName,
driveName,
logicalLibraryName,
Utils::getShortHostname(),
time(NULL)).release());
internalRet->m_sessionRunning = true;
internalRet->setDriveStatus(cta::common::DriveStatus::Starting);
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
continue;
}
}
}
} else if (m->type==cta::MountType::RETRIEVE) {
// We know the tape we intend to mount. We have to validate the tape is
// actually available to read, and pass on it if no.
auto tapesList = m_db.getTapes();
for (auto t=tapesList.begin(); t!=tapesList.end(); t++) {
if (t->vid == m->vid && t->status.availableToRead()) {
try {
// create the mount, and populate its DB side.
std::unique_ptr<RetrieveMount> internalRet (
new RetrieveMount(mountInfo->createRetrieveMount(t->vid,
t->tapePoolName,
driveName,
logicalLibraryName,
Utils::getShortHostname(),
time(NULL))));
internalRet->m_sessionRunning = true;
internalRet->m_diskRunning = true;
internalRet->m_tapeRunning = true;
internalRet->setDriveStatus(cta::common::DriveStatus::Starting);
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
std::string debug=ex.getMessageValue();
continue;
}
}
}
} else {
throw std::runtime_error("In Scheduler::getNextMount unexpected mount type");
}
}
*/
return
std
::
unique_ptr
<
TapeMount
>
();
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment