Commit 6c0ce3c6 authored by Eric Cano's avatar Eric Cano
Browse files

Added Utils::getShortHostname()

Added getting the per tape status in objectstore::TapePool::dumpTapes() (renamed to objectstore::TapePool::dumpTapesAndFetchStatus()).
Implemented the mount selection logic for archives in Scheduler::getNextMount().
parent 066dcd68
......@@ -17,6 +17,7 @@
*/
#include "common/exception/Exception.hpp"
#include "common/exception/Errnum.hpp"
#include "common/strerror_r_wrapper.hpp"
#include "common/Utils.hpp"
......@@ -30,6 +31,7 @@
#include <sys/types.h>
#include <uuid/uuid.h>
#include <zlib.h>
#include <sys/utsname.h>
using cta::exception::Exception;
......@@ -482,3 +484,16 @@ uint32_t cta::Utils::getAdler32(const uint8_t *buf, const uint32_t len)
const uint32_t checksum = adler32(0L, Z_NULL, 0);
return adler32(checksum, (const Bytef*)buf, len);
}
//------------------------------------------------------------------------------
// getShortHostname
//------------------------------------------------------------------------------
std::string cta::Utils::getShortHostname() {
struct utsname un;
exception::Errnum::throwOnMinusOne(uname (&un));
std::vector<std::string> snn;
splitString(un.nodename, '.', snn);
return snn.at(0);
}
......@@ -207,6 +207,13 @@ public:
* @returns true if the string is a valid unsigned integer, else false.
*/
static bool isValidUInt(const std::string &str) throw();
/**
* Gets the short host name from the system
*
* @return the short host name
*/
static std::string getShortHostname();
/**
* Returns the alder32 checksum of the specified buffer.
......
......@@ -42,14 +42,16 @@ cta::Tape::Tape(
const std::string &tapePoolName,
const uint64_t capacityInBytes,
const uint64_t dataOnTapeInBytes,
const CreationLog & creationLog):
const CreationLog & creationLog,
const Status & status):
vid(vid),
nbFiles(nbFiles),
logicalLibraryName(logicalLibraryName),
tapePoolName(tapePoolName),
capacityInBytes(capacityInBytes),
dataOnTapeInBytes(dataOnTapeInBytes),
creationLog(creationLog){
creationLog(creationLog),
status(status) {
}
//------------------------------------------------------------------------------
......@@ -58,3 +60,11 @@ cta::Tape::Tape(
bool cta::Tape::operator<(const Tape &rhs) const throw() {
return vid < rhs.vid;
}
//------------------------------------------------------------------------------
// Status::availableToWrite()
//------------------------------------------------------------------------------
bool cta::Tape::Status::availableToWrite() {
return !busy && !archived && !disabled && !readonly && !full;
}
......@@ -40,7 +40,7 @@ struct Tape {
* Destructor.
*/
~Tape() throw();
struct Status;
/**
* Constructor.
*
......@@ -61,7 +61,8 @@ struct Tape {
const std::string &tapePoolName,
const uint64_t capacityInBytes,
const uint64_t dataOnTapeInBytes,
const CreationLog &creationLog);
const CreationLog &creationLog,
const Status & status);
/**
* Less than operator.
......@@ -104,6 +105,25 @@ struct Tape {
* The record of the entry's creation
*/
CreationLog creationLog;
/**
* Type holding the tape's status
*/
struct Status {
bool busy;
bool archived;
bool disabled;
bool readonly;
bool full;
bool availableToWrite();
};
/**
* The tape's status
*/
Status status;
}; // class Tape
......
......@@ -16,7 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "scheduler/DiskException.hpp"
#include "common/exception/DiskException.hpp"
//------------------------------------------------------------------------------
// constructor
......
......@@ -143,17 +143,31 @@ void cta::objectstore::TapePool::removeTapeAndCommit(const std::string& vid) {
}
}
auto cta::objectstore::TapePool::dumpTapes() -> std::list<TapeDump>{
auto cta::objectstore::TapePool::dumpTapesAndFetchStatus() -> std::list<TapeDump>{
checkPayloadReadable();
std::list<TapeDump> ret;
auto & tl = m_payload.tapes();
for (auto t=tl.begin(); t!=tl.end(); t++) {
for (auto tp=tl.begin(); tp!=tl.end(); tp++) {
cta::Tape::Status stat;
try {
objectstore::Tape t(tp->address(), m_objectStore);
objectstore::ScopedSharedLock tlock(t);
t.fetch();
stat.archived = t.isArchived();
stat.busy = t.isBusy();
stat.disabled = t.isDisabled();
stat.full = t.isFull();
stat.readonly = t.isReadOnly();
} catch (cta::exception::Exception & ex) {
continue;
}
ret.push_back(TapeDump());
ret.back().address = t->address();
ret.back().vid = t->vid();
ret.back().capacityInBytes = t->capacity();
ret.back().logicalLibraryName = t->library();
ret.back().log.deserialize(t->log());
ret.back().address = tp->address();
ret.back().vid = tp->vid();
ret.back().capacityInBytes = tp->capacity();
ret.back().logicalLibraryName = tp->library();
ret.back().log.deserialize(tp->log());
ret.back().status = stat;
}
return ret;
}
......
......@@ -26,6 +26,7 @@
#include "ArchiveToFileRequest.hpp"
#include "CreationLog.hpp"
#include "Agent.hpp"
#include "common/archiveNS/Tape.hpp"
namespace cta { namespace objectstore {
......@@ -61,8 +62,9 @@ public:
std::string logicalLibraryName;
uint64_t capacityInBytes;
objectstore::CreationLog log;
cta::Tape::Status status;
};
std::list<TapeDump> dumpTapes();
std::list<TapeDump> dumpTapesAndFetchStatus();
// Archive jobs management ===================================================
void addJob(const ArchiveToFileRequest::JobDump & job,
......
......@@ -118,7 +118,7 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>
}
// For each tape in the pool, list the tapes with work
auto tl = tpool.dumpTapes();
auto tl = tpool.dumpTapesAndFetchStatus();
for (auto tp = tl.begin(); tp!= tl.end(); tp++) {
objectstore::Tape t(tp->address, m_objectStore);
objectstore::ScopedSharedLock tl(t);
......@@ -547,7 +547,6 @@ void OStoreDB::createTape(const std::string& vid,
}
cta::Tape OStoreDB::getTape(const std::string &vid) const {
cta::Tape ret;
// Got through all tape pools. Get the list of them
RootEntry re(m_objectStore);
ScopedSharedLock rel(re);
......@@ -561,15 +560,16 @@ cta::Tape OStoreDB::getTape(const std::string &vid) const {
// considered as a dangling pointer (and skip it)
if (tp.getOwner() != re.getAddressIfSet())
continue;
auto tl=tp.dumpTapes();
auto tl=tp.dumpTapesAndFetchStatus();
for (auto ti=tl.begin(); ti!=tl.end(); ti++) {
if (vid == ti->vid) {
objectstore::Tape t(ti->address, m_objectStore);
ScopedSharedLock tl(t);
t.fetch();
const uint64_t nbFiles = 0; // TO BE DONE
cta::Tape::Status status;
return cta::Tape(ti->vid, nbFiles, ti->logicalLibraryName,
tpi->tapePool, ti->capacityInBytes, t.getStoredData(), ti->log);
tpi->tapePool, ti->capacityInBytes, t.getStoredData(), ti->log, status);
}
}
}
......@@ -591,14 +591,15 @@ std::list<cta::Tape> OStoreDB::getTapes() const {
// considered as a dangling pointer (and skip it)
if (tp.getOwner() != re.getAddressIfSet())
continue;
auto tl=tp.dumpTapes();
auto tl=tp.dumpTapesAndFetchStatus();
for (auto ti=tl.begin(); ti!=tl.end(); ti++) {
objectstore::Tape t(ti->address, m_objectStore);
ScopedSharedLock tl(t);
t.fetch();
const uint64_t nbFiles = 0; // TO BE DONE
ret.push_back(cta::Tape(ti->vid, nbFiles, ti->logicalLibraryName,
tpi->tapePool, ti->capacityInBytes, t.getStoredData(), ti->log));
tpi->tapePool, ti->capacityInBytes, t.getStoredData(), ti->log,
ti->status));
}
}
return ret;
......@@ -645,7 +646,7 @@ void OStoreDB::deleteLogicalLibrary(const SecurityIdentity& requester,
// considered as a dangling pointer.
if (tp.getOwner() != re.getAddressIfSet())
continue;
auto tl=tp.dumpTapes();
auto tl=tp.dumpTapesAndFetchStatus();
for (auto t=tl.begin(); t!=tl.end(); t++) {
if (t->logicalLibraryName == name)
throw LibraryInUse("In OStoreDB::deleteLogicalLibrary: trying to delete a library used by a tape.");
......@@ -962,7 +963,7 @@ void OStoreDB::queue(const cta::RetrieveToFileRequest& rqst) {
objectstore::TapePool tp(pool->address, m_objectStore);
objectstore::ScopedSharedLock tpl(tp);
tp.fetch();
auto tapes = tp.dumpTapes();
auto tapes = tp.dumpTapesAndFetchStatus();
for(auto tape=tapes.begin(); tape!=tapes.end(); tape++)
vidToAddress[tape->vid] = tape->address;
}
......@@ -1096,7 +1097,7 @@ std::unique_ptr<SchedulerDatabase::ArchiveMount>
objectstore::TapePool tp(tpAdress, m_objectStore);
objectstore::ScopedSharedLock tpl(tp);
tp.fetch();
auto tlist = tp.dumpTapes();
auto tlist = tp.dumpTapesAndFetchStatus();
std::string tAddress;
for (auto tptr = tlist.begin(); tptr!=tlist.end(); tptr++) {
if (tptr->vid == vid)
......
......@@ -47,6 +47,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm>
//------------------------------------------------------------------------------
// TransferFailureToStr
......@@ -765,13 +766,101 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(
std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo> mountInfo;
mountInfo = m_db.getMountInfo();
// Prioritize the mounts.
throw NotImplemented("");
// TODO: finish
// dbMount = m_db.getNextMount(logicalLibraryName, driveName);
//
// return wrapDbMountInSchedulerMount(std::move(dbMount));
// We should now filter the potential mounts to keep only the ones we are
// compatible with (match the logical library for retrieves).
// We also only want the potential mounts for which we still have
// We cannot filter the archives yet
for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
if (m->type == MountType::RETRIEVE && m->logicalLibrary != logicalLibraryName) {
m = mountInfo->potentialMounts.erase(m);
} else {
m++;
}
}
// With the existing mount list, we can now populate the potential mount list
// with the per tape pool existing mount statistics.
typedef std::pair<std::string, cta::MountType::Enum> tpType;
std::map<tpType, uint32_t> existingMountsSummary;
for (auto em=mountInfo->existingMounts.begin(); em!=mountInfo->existingMounts.end(); em++) {
try {
existingMountsSummary.at(tpType(em->tapePool, em->type))++;
} catch (std::out_of_range &) {
existingMountsSummary[tpType(em->tapePool, em->type)] = 1;
}
}
// We can now filter out the potential mounts for which their mount criteria
// is already met, filter out the potential mounts for which the maximum mount
// quota is already reached, and weight the remaining by how much of their quota
// is reached
for (auto m = mountInfo->potentialMounts.begin(); m!= mountInfo->potentialMounts.end();) {
// Get summary data
uint32_t existingMounts;
try {
existingMounts = existingMountsSummary.at(tpType(m->tapePool, m->type));
} catch (std::out_of_range) {
existingMounts = 0;
}
bool mountPassesACriteria = false;
if (m->bytesQueued / (1 + existingMounts) > m->mountCriteria.maxBytesQueued)
mountPassesACriteria = true;
if (m->filesQueued / (1 + existingMounts) > m->mountCriteria.maxFilesQueued)
mountPassesACriteria = true;
if (!existingMounts && ((time(NULL) - m->oldestJobStartTime) > (int64_t)m->mountCriteria.maxAge))
mountPassesACriteria = true;
if (!mountPassesACriteria || existingMounts > m->mountQuota.quota) {
m = mountInfo->potentialMounts.erase(m);
} else {
// populate the mount with a weight
m->ratioOfMountQuotaUsed = 1.0L * existingMounts / m->mountQuota.quota;
}
}
// We can now sort the potential mounts in decreasing priority order.
// The ordering is defined in operator <.
// We want the result in descending order or priority so we reverse the vector
std::sort(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
std::reverse(mountInfo->potentialMounts.begin(), mountInfo->potentialMounts.end());
// We can now simply iterate on the candidates until we manage to create a
// mount for one of them
for (auto m = mountInfo->potentialMounts.begin(); m!=mountInfo->potentialMounts.end(); m++) {
// If the mount is an archive, we still have to find a tape.
if (m->type==cta::MountType::ARCHIVE) {
// We need to find a tape for archiving. It should be both in the right
// tape pool and in the drive's logical library
auto tapesList = m_db.getTapes();
// The first tape matching will go for a prototype.
// TODO: improve to reuse already partially written tapes
for (auto t=tapesList.begin(); t!=tapesList.end(); t++) {
if (t->logicalLibraryName == logicalLibraryName &&
t->tapePoolName == m->tapePool &&
!t->status.availableToWrite()) {
// We have our tape. Try to create the session. Prepare a return value
// for it.
std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount);
// Get the db side of the session
try {
internalRet->m_dbMount.reset(mountInfo->createArchiveMount(t->vid,
t->tapePoolName,
driveName,
logicalLibraryName,
Utils::getShortHostname(),
time(NULL)).release());
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
continue;
}
}
}
} else if (m->type==cta::MountType::RETRIEVE) {
throw NotImplemented("");
} else {
throw std::runtime_error("In Scheduler::getNextMount unexpected mount type");
}
}
return std::unique_ptr<TapeMount>();
}
//
......
......@@ -24,6 +24,7 @@
#include <string>
#include <memory>
#include <vector>
#include <stdexcept>
#include "common/archiveNS/ArchiveFile.hpp"
#include "common/remoteFS/RemotePathAndStatus.hpp"
#include "scheduler/MountType.hpp"
......@@ -266,8 +267,24 @@ public:
MountCriteria mountCriteria; /**< The mount criteria collection */
MountQuota mountQuota; /**< The mount quota collection */
std::string logicalLibrary; /**< The logical library (for a retrieve) */
double ratioOfMountQuotaUsed; /**< The [ 0.0, 1.0 [ ratio of existing mounts/quota (for faire share of mounts)*/
bool operator < (const PotentialMount &other) const {
if (priority < other.priority)
return true;
if (priority > other.priority)
return false;
if (type == cta::MountType::Enum::ARCHIVE && other.type != cta::MountType::Enum::ARCHIVE)
return false;
if (other.type == cta::MountType::Enum::ARCHIVE && type != cta::MountType::Enum::ARCHIVE)
return true;
if (ratioOfMountQuotaUsed < other.ratioOfMountQuotaUsed)
return true;
return false;
}
};
/**
* Information about the existing mounts.
*/
......
......@@ -29,6 +29,7 @@
#include "scheduler/MountRequest.hpp"
#include "scheduler/Scheduler.hpp"
#include "scheduler/SchedulerDatabase.hpp"
#include "scheduler/TapeMount.hpp"
#include "common/SecurityIdentity.hpp"
#include "common/archiveNS/StorageClass.hpp"
#include "common/archiveNS/Tape.hpp"
......@@ -2402,8 +2403,10 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
s_remoteFileRawPath1), std::exception);
}
// Emulate a tape server by ask for a mount and then a file
//const MountRequest *mount = scheduler.getNextMount(libraryName);
// Emulate a tape server by asking for a mount and then a file
std::unique_ptr<cta::TapeMount> mount;
ASSERT_NO_THROW(mount.reset(scheduler.getNextMount(libraryName, "drive0").release()));
ASSERT_NE((cta::TapeMount*)NULL, mount.get());
{
std::list<std::string> archiveFiles;
......@@ -2560,6 +2563,7 @@ TEST_P(SchedulerTest, setOwner_statFile_top_level) {
}
}
static cta::MockNameServerFactory mockNsFactory;
static cta::MockSchedulerDatabaseFactory mockDbFactory;
......
......@@ -684,7 +684,8 @@ cta::Tape cta::MockSchedulerDatabase::getTape(const std::string &vid) const {
sqlite3_column_int(statement.get(),idx("GID"))),
(char *)sqlite3_column_text(statement.get(),idx("COMMENT")),
time_t(sqlite3_column_int(statement.get(),idx("CREATIONTIME"))),
(char *)sqlite3_column_text(statement.get(),idx("COMMENT")))
(char *)sqlite3_column_text(statement.get(),idx("COMMENT"))),
Tape::Status()
);
}
break;
......@@ -1537,7 +1538,8 @@ std::list<cta::Tape> cta::MockSchedulerDatabase::getTapes() const {
sqlite3_column_int(statement.get(),idx("GID"))),
(char *)sqlite3_column_text(statement.get(),idx("COMMENT")),
time_t(sqlite3_column_int(statement.get(),idx("CREATIONTIME"))),
(char *)sqlite3_column_text(statement.get(),idx("COMMENT")))
(char *)sqlite3_column_text(statement.get(),idx("COMMENT"))),
Tape::Status()
));
}
return tapes;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment