Commit a5aecb50 authored by Eric Cano's avatar Eric Cano
Browse files

Added full emulation of a tape session in the archive_and_retrieve_new_file...

Added full emulation of a tape session in the archive_and_retrieve_new_file unit test for the scheduler.
Fixed several bugs.
parent 633dff91
......@@ -30,18 +30,17 @@ namespace cta {
uint64_t maxAge; /**< The maximum age for a request before trigerring
* a request (in seconds) */
uint16_t quota; /**< The maximum number of mounts for this tape pool */
MountCriteria(uint64_t maxFiles, uint64_t maxBytes, uint64_t maxAge,
uint16_t quota):
maxFilesQueued(maxFiles), maxBytesQueued(maxBytesQueued), maxAge(maxAge),
quota(quota) {}
MountCriteria(uint64_t mf, uint64_t mb, uint64_t ma,
uint16_t q):
maxFilesQueued(mf), maxBytesQueued(mb), maxAge(ma), quota(q) {}
MountCriteria(): maxFilesQueued(0), maxBytesQueued(0), maxAge(0), quota(0) {}
};
struct MountCriteriaByDirection {
MountCriteria archive;
MountCriteria retrieve;
MountCriteriaByDirection(const MountCriteria& archive, const MountCriteria & retrieve):
archive(archive), retrieve(retrieve) {}
MountCriteriaByDirection(const MountCriteria& a, const MountCriteria & r):
archive(a), retrieve(r) {}
MountCriteriaByDirection(): archive(), retrieve() {}
};
}
\ No newline at end of file
......@@ -372,7 +372,14 @@ cta::MountCriteriaByDirection
void cta::objectstore::TapePool::setMountCriteriaByDirection(
const MountCriteriaByDirection& mountCriteria) {
checkPayloadWritable();
m_payload.mutable_archivemountcriteria()->set_maxbytesbeforemount(mountCriteria.archive.maxBytesQueued);
m_payload.mutable_archivemountcriteria()->set_maxfilesbeforemount(mountCriteria.archive.maxFilesQueued);
m_payload.mutable_archivemountcriteria()->set_maxsecondsbeforemount(mountCriteria.archive.maxAge);
m_payload.mutable_archivemountcriteria()->set_quota(mountCriteria.archive.quota);
m_payload.mutable_retievemountcriteria()->set_maxbytesbeforemount(mountCriteria.retrieve.maxBytesQueued);
m_payload.mutable_retievemountcriteria()->set_maxfilesbeforemount(mountCriteria.retrieve.maxFilesQueued);
m_payload.mutable_retievemountcriteria()->set_maxsecondsbeforemount(mountCriteria.retrieve.maxAge);
m_payload.mutable_retievemountcriteria()->set_quota(mountCriteria.retrieve.quota);
}
......
......@@ -96,6 +96,7 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>
if (tpool.getJobsSummary().files) {
tmdi.potentialMounts.push_back(SchedulerDatabase::PotentialMount());
auto & m = tmdi.potentialMounts.back();
m.tapePool = tpp->tapePool;
m.type = cta::MountType::ARCHIVE;
m.bytesQueued = tpool.getJobsSummary().bytes;
m.filesQueued = tpool.getJobsSummary().files;
......
......@@ -69,10 +69,12 @@ public:
}
virtual ~OStoreDBWrapper() throw() {
m_OStoreDB.setAgent(*((objectstore::Agent*)NULL));
objectstore::ScopedExclusiveLock agl(m_agent);
m_agent.fetch();
m_agent.removeAndUnregisterSelf();
try {
m_OStoreDB.setAgent(*((objectstore::Agent*)NULL));
objectstore::ScopedExclusiveLock agl(m_agent);
m_agent.fetch();
m_agent.removeAndUnregisterSelf();
} catch (cta::exception::Exception &) {}
}
virtual void assertIsAdminOnAdminHost(const SecurityIdentity& id) const {
......
......@@ -817,11 +817,12 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(
mountPassesACriteria = true;
if (!existingMounts && ((time(NULL) - m->oldestJobStartTime) > (int64_t)m->mountCriteria.maxAge))
mountPassesACriteria = true;
if (!mountPassesACriteria || existingMounts > m->mountCriteria.quota) {
if (!mountPassesACriteria || existingMounts >= m->mountCriteria.quota) {
m = mountInfo->potentialMounts.erase(m);
} else {
// populate the mount with a weight
m->ratioOfMountQuotaUsed = 1.0L * existingMounts / m->mountCriteria.quota;
m++;
}
}
......@@ -844,7 +845,7 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(
for (auto t=tapesList.begin(); t!=tapesList.end(); t++) {
if (t->logicalLibraryName == logicalLibraryName &&
t->tapePoolName == m->tapePool &&
!t->status.availableToWrite()) {
t->status.availableToWrite()) {
// We have our tape. Try to create the session. Prepare a return value
// for it.
std::unique_ptr<ArchiveMount> internalRet(new ArchiveMount);
......
......@@ -25,11 +25,11 @@
#include "scheduler/ArchiveToFileRequest.hpp"
#include "scheduler/ArchiveToTapeCopyRequest.hpp"
#include "scheduler/LogicalLibrary.hpp"
//#include "scheduler/mockDB/MockSchedulerDatabaseFactory.hpp"
#include "scheduler/MountRequest.hpp"
#include "scheduler/Scheduler.hpp"
#include "scheduler/SchedulerDatabase.hpp"
#include "scheduler/TapeMount.hpp"
#include "scheduler/ArchiveMount.hpp"
#include "common/SecurityIdentity.hpp"
#include "common/archiveNS/StorageClass.hpp"
#include "common/archiveNS/Tape.hpp"
......@@ -2287,8 +2287,8 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
const std::string tapePoolComment = "Tape-pool comment";
ASSERT_NO_THROW(scheduler.createTapePool(s_adminOnAdminHost, tapePoolName,
nbPartialTapes, tapePoolComment));
MountCriteriaByDirection mcbd(MountCriteria(0,0,0,1), MountCriteria(0,0,0,1));
ASSERT_NO_THROW(scheduler.setTapePoolMountCriteria("TapeTapePool", mcbd));
MountCriteriaByDirection mcbd(MountCriteria(1,1,0,1), MountCriteria(1,1,0,1));
ASSERT_NO_THROW(scheduler.setTapePoolMountCriteria("TestTapePool", mcbd));
const std::string libraryName = "TestLogicalLibrary";
const std::string libraryComment = "Library comment";
......@@ -2405,10 +2405,24 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
s_remoteFileRawPath1), std::exception);
}
// Emulate a tape server by asking for a mount and then a file
std::unique_ptr<cta::TapeMount> mount;
ASSERT_NO_THROW(mount.reset(scheduler.getNextMount(libraryName, "drive0").release()));
ASSERT_NE((cta::TapeMount*)NULL, mount.get());
{
// Emulate a tape server by asking for a mount and then a file (and succeed
// the transfer)
std::unique_ptr<cta::TapeMount> mount;
ASSERT_NO_THROW(mount.reset(scheduler.getNextMount(libraryName, "drive0").release()));
ASSERT_NE((cta::TapeMount*)NULL, mount.get());
ASSERT_EQ(cta::MountType::ARCHIVE, mount.get()->getMountType());
std::unique_ptr<cta::ArchiveMount> archiveMount;
ASSERT_NO_THROW(archiveMount.reset(dynamic_cast<cta::ArchiveMount*>(mount.release())));
ASSERT_NE((cta::ArchiveMount*)NULL, archiveMount.get());
std::unique_ptr<cta::ArchiveJob> archiveJob;
ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release()));
ASSERT_NE((cta::ArchiveJob*)NULL, archiveJob.get());
archiveJob->complete();
ASSERT_NO_THROW(archiveJob.reset(archiveMount->getNextJob().release()));
ASSERT_EQ((cta::ArchiveJob*)NULL, archiveJob.get());
ASSERT_NO_THROW(archiveMount->complete());
}
{
std::list<std::string> archiveFiles;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment