Commit 826fff31 authored by Eric Cano's avatar Eric Cano
Browse files

Started to implement the retrieve mounts in both Scheduler and OStoreDB.

Adjusted unit test to accomodate the new structure (SchedulerDatabase::RetrieveMount is now a pure virtual class).
parent cef6a019
......@@ -68,3 +68,11 @@ bool cta::Tape::Status::availableToWrite() {
return !busy && !archived && !disabled && !readonly && !full;
}
//------------------------------------------------------------------------------
// Status::availableToRead()
//------------------------------------------------------------------------------
bool cta::Tape::Status::availableToRead() {
return !busy && !archived && !disabled;
}
......@@ -116,6 +116,7 @@ struct Tape {
bool readonly;
bool full;
bool availableToWrite();
bool availableToRead();
};
/**
......
......@@ -1220,7 +1220,27 @@ OStoreDB::TapeMountDecisionInfo::TapeMountDecisionInfo(
std::unique_ptr<SchedulerDatabase::RetrieveMount>
OStoreDB::TapeMountDecisionInfo::createRetrieveMount(
const std::string& vid, const std::string driveName) {
const std::string& vid, const std::string driveName,
const std::string& logicalLibrary, const std::string& hostName, time_t startTime) {
// In order to create the mount, we have to:
// Check we actually hold the scheduling lock
// Check the tape exists, add it to ownership and set its activity status to
// busy, with the current agent pointing to it for unbusying
// Set the drive status to up, but do not commit anything to the drive register
// the drive register does not need garbage collection as it should reflect the
// latest known state of the drive (and its absence of updating if needed)
// Prepare the return value
std::unique_ptr<OStoreDB::RetrieveMount> privateRet(
new OStoreDB::RetrieveMount(m_objectStore, m_agent));
auto &rm = *privateRet;
// Check we hold the scheduling lock
if (!m_lockTaken)
throw SchedulingLockNotHeld("In OStoreDB::TapeMountDecisionInfo::createRetrieveMount: "
"cannot create mount without holding scheduling lock");
// Find the tape and update it
rm.mountInfo.vid = vid;
rm.mountInfo.drive = driveName;
rm.mountInfo.logicalLibrary = "";
throw NotImplemented("Not Implemented");
}
......@@ -1339,11 +1359,25 @@ void OStoreDB::ArchiveMount::complete(time_t completionTime) {
t.commit();
}
OStoreDB::ArchiveJob::ArchiveJob(const std::string& jobAddress,
objectstore::Backend& os, objectstore::Agent& ag): m_jobOwned(false),
m_objectStore(os), m_agent(ag), m_atfr(jobAddress, os) {}
OStoreDB::RetrieveMount::RetrieveMount(objectstore::Backend& os, objectstore::Agent& a):
m_objectStore(os), m_agent(a) { }
const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo() {
return mountInfo;
}
auto OStoreDB::RetrieveMount::getNextJob() -> std::unique_ptr<RetrieveJob> {
throw NotImplemented("In OStoreDB::RetrieveMount::getNextJob: not implemented");
}
void OStoreDB::RetrieveMount::complete(time_t completionTime) {
throw NotImplemented("In OStoreDB::RetrieveMount::getNextJob: not implemented");
}
void OStoreDB::ArchiveJob::fail() {
if (!m_jobOwned)
......
......@@ -54,8 +54,10 @@ public:
const std::string & vid, const std::string & tapePool,
const std::string driveName, const std::string& logicalLibrary,
const std::string & hostName, time_t startTime);
virtual std::unique_ptr<SchedulerDatabase::RetrieveMount> createRetrieveMount(const std::string & vid,
const std::string driveName);
virtual std::unique_ptr<SchedulerDatabase::RetrieveMount> createRetrieveMount(
const std::string & vid, const std::string driveName,
const std::string& logicalLibrary, const std::string& hostName,
time_t startTime);
virtual ~TapeMountDecisionInfo();
private:
TapeMountDecisionInfo (objectstore::Backend &, objectstore::Agent &);
......@@ -102,6 +104,19 @@ public:
objectstore::Agent & m_agent;
objectstore::ArchiveToFileRequest m_atfr;
};
/* === Retrieve Mount handling ============================================ */
class RetrieveMount: public SchedulerDatabase::RetrieveMount {
friend class TapeMountDecisionInfo;
private:
RetrieveMount(objectstore::Backend &, objectstore::Agent &);
objectstore::Backend & m_objectStore;
objectstore::Agent & m_agent;
public:
virtual const MountInfo & getMountInfo();
virtual std::unique_ptr<RetrieveJob> getNextJob();
virtual void complete(time_t completionTime);
};
/* === Admin host handling ================================================ */
virtual void createAdminHost(const std::string& hostName,
......
......@@ -36,19 +36,19 @@ cta::RetrieveJob::RetrieveJob(/*RetrieveMount &mount,*/
archiveFile(archiveFile),
remotePathAndStatus(remotePathAndStatus),
tapeFileLocation(tapeFileLocation),
positioningMethod(positioningMethod) {}
positioningMethod(positioningMethod),
transferredSize(std::numeric_limits<decltype(transferredSize)>::max()) {}
//------------------------------------------------------------------------------
// complete
//------------------------------------------------------------------------------
void cta::RetrieveJob::complete(const uint32_t checksumOfTransfer,
const uint64_t fileSizeOfTransfer) {
void cta::RetrieveJob::complete() {
}
//------------------------------------------------------------------------------
// failed
//------------------------------------------------------------------------------
void cta::RetrieveJob::failed(const exception::Exception &ex) {
void cta::RetrieveJob::failed() {
}
//------------------------------------------------------------------------------
......
......@@ -25,6 +25,7 @@
#include "scheduler/PositioningMethod.hpp"
#include <string>
#include <limits>
namespace cta {
......@@ -46,7 +47,7 @@ protected:
/**
* Empty constructor. TODO: to be removed in the future when we put in the reference to the owning mount;
*/
RetrieveJob() {}
RetrieveJob(): transferredSize(std::numeric_limits<decltype(transferredSize)>::max()) {}
/**
* Constructor. It is not public as it is generated by the RetrieveMount.
......@@ -71,22 +72,18 @@ public:
virtual ~RetrieveJob() throw() = 0;
/**
* Indicates that the job was successful
*
* @param checksumOfTransfer The adler-32 checksum of the file as calculated
* during the execution of the job.
* @param fileSizeOfTransfer The size of the file as calculated during the
* execution of the job.
* Indicates that the job was successful. The checksum and the size of the
* transfer should already stored in the object beforehand. Result setting
* and calling complete are done in 2 different threads (disk write and
* reporter thread, respectively).
*/
virtual void complete(const uint32_t checksumOfTransfer,
const uint64_t fileSizeOfTransfer);
virtual void complete();
/**
* Indicates that the job failed
*
* @param ex The reason for the failure.
* Indicates that the job failed. Like for complete(), reason for failure
* should already be recorded in the object beforehand.
*/
virtual void failed(const exception::Exception &ex);
virtual void failed();
/**
* Indicates that the job should be tried again (typically reaching the end
......@@ -119,6 +116,23 @@ public:
*/
PositioningMethod positioningMethod;
/**
* The checksum of the transferred data. This should be set before calling
* complete()
*/
Checksum transferredChecksum;
/**
* The size of the transferred data. This should be set before calling
* complete().
*/
uint64_t transferredSize;
/**
* The error string. This should be set before calling failed().
*/
std::string failureMessage;
}; // class RetrieveJob
} // namespace cta
......@@ -888,7 +888,25 @@ std::unique_ptr<cta::TapeMount> cta::Scheduler::getNextMount(
}
}
} else if (m->type==cta::MountType::RETRIEVE) {
throw NotImplemented("");
// We know the tape we intend to mount. We have to validate the tape is
// actually available to read, and pass on it if no.
auto tapesList = m_db.getTapes();
for (auto t=tapesList.begin(); t!=tapesList.end(); t++) {
if (t->vid == m->vid && t->status.availableToRead()) {
try {
std::unique_ptr<RetrieveMount> internalRet (new RetrieveMount());
// Get the db side of the session
internalRet->m_dbMount.reset(mountInfo->createRetrieveMount(t->vid,
driveName,
logicalLibraryName,
Utils::getShortHostname(),
time(NULL)).release());
return std::unique_ptr<TapeMount> (internalRet.release());
} catch (cta::exception::Exception & ex) {
continue;
}
}
}
} else {
throw std::runtime_error("In Scheduler::getNextMount unexpected mount type");
}
......
......@@ -235,8 +235,21 @@ public:
/*============ Retrieve management: tape server side ======================*/
class RetrieveMount {};
class RetrieveJob;
class RetrieveMount {
public:
struct MountInfo {
std::string vid;
std::string logicalLibrary;
std::string tapePool;
std::string drive;
uint64_t mountId;
} mountInfo;
virtual const MountInfo & getMountInfo() = 0;
virtual std::unique_ptr<RetrieveJob> getNextJob() = 0;
virtual void complete(time_t completionTime) = 0;
virtual ~RetrieveMount() {}
};
class RetrieveJob {
friend class RetrieveMount;
......@@ -319,7 +332,8 @@ public:
* lock.
*/
virtual std::unique_ptr<RetrieveMount> createRetrieveMount(const std::string & vid,
const std::string driveName) = 0;
const std::string driveName, const std::string& logicalLibrary,
const std::string& hostName, time_t startTime) = 0;
/** Destructor: releases the global lock if not already done */
virtual ~TapeMountDecisionInfo() {};
};
......
......@@ -30,6 +30,7 @@
#include "scheduler/SchedulerDatabase.hpp"
#include "scheduler/TapeMount.hpp"
#include "scheduler/ArchiveMount.hpp"
#include "scheduler/RetrieveMount.hpp"
#include "common/SecurityIdentity.hpp"
#include "common/archiveNS/StorageClass.hpp"
#include "common/archiveNS/Tape.hpp"
......@@ -2479,6 +2480,25 @@ TEST_P(SchedulerTest, archive_and_retrieve_new_file) {
ASSERT_FALSE(archiveFiles.find("/grandparent/parent_file") ==
archiveFiles.end());
}
{
// Emulate a tape server by asking for a mount and then a file (and succeed
// the transfer)
std::unique_ptr<cta::TapeMount> mount;
/*ASSERT_NO_THROW*/(mount.reset(scheduler.getNextMount(libraryName, "drive0").release()));
ASSERT_NE((cta::TapeMount*)NULL, mount.get());
ASSERT_EQ(cta::MountType::RETRIEVE, mount.get()->getMountType());
std::unique_ptr<cta::RetrieveMount> retrieveMount;
ASSERT_NO_THROW(retrieveMount.reset(dynamic_cast<cta::RetrieveMount*>(mount.release())));
ASSERT_NE((cta::RetrieveMount*)NULL, retrieveMount.get());
std::unique_ptr<cta::RetrieveJob> retrieveJob;
ASSERT_NO_THROW(retrieveJob.reset(retrieveMount->getNextJob().release()));
ASSERT_NE((cta::RetrieveJob*)NULL, retrieveJob.get());
ASSERT_NO_THROW(retrieveJob->complete());
ASSERT_NO_THROW(retrieveJob.reset(retrieveMount->getNextJob().release()));
ASSERT_EQ((cta::RetrieveJob*)NULL, retrieveJob.get());
ASSERT_NO_THROW(retrieveMount->complete());
}
}
TEST_P(SchedulerTest, retrieve_non_existing_file) {
......
......@@ -36,13 +36,6 @@
namespace unitTests{
class TestingArchiveMount: public cta::ArchiveMount {
public:
TestingArchiveMount(std::unique_ptr<cta::SchedulerDatabase::ArchiveMount> dbrm):
ArchiveMount(*((cta::NameServer *)NULL),std::move(dbrm)) {
}
};
class TestingArchiveJob: public cta::ArchiveJob {
public:
TestingArchiveJob(): cta::ArchiveJob(*((cta::ArchiveMount *)NULL),
......
......@@ -120,7 +120,10 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
}
} //end of while(1)
logWithStat(LOG_INFO, "File successfully transfered to disk",lc);
reporter.reportCompletedJob(std::move(m_retrieveJob),checksum,m_stats.dataVolume);
m_retrieveJob->transferredSize = m_stats.dataVolume;
m_retrieveJob->transferredChecksum = cta::Checksum(cta::Checksum::CHECKSUMTYPE_ADLER32,
cta::ByteArray(checksum));
reporter.reportCompletedJob(std::move(m_retrieveJob));
m_stats.waitReportingTime+=localTime.secs(castor::utils::Timer::resetCounter);
m_stats.transferTime = transferTime.secs();
m_stats.totalTime = totalTime.secs();
......@@ -152,7 +155,8 @@ bool DiskWriteTask::execute(RecallReportPacker& reporter,log::LogContext& lc,
.add("errorCode", e.code());
logWithStat(LOG_ERR, "File writing to disk failed.", lc);
lc.logBacktrace(LOG_ERR, e.backtrace());
reporter.reportFailedJob(std::move(m_retrieveJob),e);
m_retrieveJob->failureMessage = e.getMessageValue();
reporter.reportFailedJob(std::move(m_retrieveJob));
//got an exception, return false
......
......@@ -40,6 +40,11 @@
#include <gtest/gtest.h>
namespace unitTests{
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
};
class TestingRetrieveMount: public cta::RetrieveMount {
public:
......@@ -78,7 +83,7 @@ namespace unitTests{
castor::log::StringLogger log("castor_tape_tapeserver_daemon_DiskWriteTaskFailedBlock");
castor::log::LogContext lc(log);
std::unique_ptr<cta::SchedulerDatabase::RetrieveMount> dbrm(new cta::SchedulerDatabase::RetrieveMount);
std::unique_ptr<cta::SchedulerDatabase::RetrieveMount> dbrm(new TestingDatabaseRetrieveMount());
TestingRetrieveMount trm(std::move(dbrm));
MockRecallReportPacker report(&trm,lc);
EXPECT_CALL(report,reportFailedJob_(_,_));
......
......@@ -33,6 +33,12 @@
namespace unitTests{
class TestingDatabaseRetrieveMount: public cta::SchedulerDatabase::RetrieveMount {
virtual const MountInfo & getMountInfo() { throw std::runtime_error("Not implemented"); }
virtual std::unique_ptr<cta::SchedulerDatabase::RetrieveJob> getNextJob() { throw std::runtime_error("Not implemented");}
virtual void complete(time_t completionTime) { throw std::runtime_error("Not implemented"); }
};
class TestingRetrieveMount: public cta::RetrieveMount {
public:
TestingRetrieveMount(std::unique_ptr<cta::SchedulerDatabase::RetrieveMount> dbrm): RetrieveMount(std::move(dbrm)) {
......@@ -45,6 +51,8 @@ namespace unitTests{
}
};
using namespace castor::tape::tapeserver::daemon;
using namespace castor::tape::tapeserver::client;
struct MockRecallReportPacker : public RecallReportPacker {
......@@ -73,7 +81,7 @@ namespace unitTests{
castor::log::StringLogger log("castor_tape_tapeserver_daemon_DiskWriteThreadPoolTest");
castor::log::LogContext lc(log);
std::unique_ptr<cta::SchedulerDatabase::RetrieveMount> dbrm(new cta::SchedulerDatabase::RetrieveMount);
std::unique_ptr<cta::SchedulerDatabase::RetrieveMount> dbrm(new TestingDatabaseRetrieveMount);
TestingRetrieveMount trm(std::move(dbrm));
MockRecallReportPacker report(&trm,lc);
......
......@@ -59,18 +59,16 @@ RecallReportPacker::~RecallReportPacker(){
//------------------------------------------------------------------------------
//reportCompletedJob
//------------------------------------------------------------------------------
void RecallReportPacker::reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob,
u_int32_t checksum, u_int64_t size){
std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulRetrieveJob),checksum,size));
void RecallReportPacker::reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob){
std::unique_ptr<Report> rep(new ReportSuccessful(std::move(successfulRetrieveJob)));
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
//------------------------------------------------------------------------------
//reportFailedJob
//------------------------------------------------------------------------------
void RecallReportPacker::reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob,
const castor::exception::Exception &ex){
std::unique_ptr<Report> rep(new ReportError(std::move(failedRetrieveJob),ex));
void RecallReportPacker::reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob){
std::unique_ptr<Report> rep(new ReportError(std::move(failedRetrieveJob)));
castor::server::MutexLocker ml(&m_producterProtection);
m_fifo.push(rep.release());
}
......@@ -94,7 +92,7 @@ void RecallReportPacker::reportEndOfSessionWithErrors(const std::string msg,int
//ReportSuccessful::execute
//------------------------------------------------------------------------------
void RecallReportPacker::ReportSuccessful::execute(RecallReportPacker& parent){
m_successfulRetrieveJob->complete(m_checksum, m_size);
m_successfulRetrieveJob->complete();
}
//------------------------------------------------------------------------------
......@@ -152,8 +150,8 @@ void RecallReportPacker::ReportEndofSessionWithErrors::execute(RecallReportPacke
//------------------------------------------------------------------------------
void RecallReportPacker::ReportError::execute(RecallReportPacker& parent){
parent.m_errorHappened=true;
parent.m_lc.log(LOG_ERR,m_ex.getMessageValue());
m_failedRetrieveJob->failed(cta::exception::Exception(m_ex.getMessageValue()));
parent.m_lc.log(LOG_ERR,m_failedRetrieveJob->failureMessage);
m_failedRetrieveJob->failed();
}
//------------------------------------------------------------------------------
//WorkerThread::WorkerThread
......
......@@ -54,7 +54,7 @@ public:
* @param migratedFile the file successfully migrated
* @param checksum the checksum the DWT has computed for the file
*/
virtual void reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob, u_int32_t checksum, u_int64_t size);
virtual void reportCompletedJob(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob);
/**
* Create into the MigrationReportPacker a report for the failed migration
......@@ -62,7 +62,7 @@ public:
* @param migratedFile the file which failed
* @param ex the reason for the failure
*/
virtual void reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob, const castor::exception::Exception& ex);
virtual void reportFailedJob(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob);
/**
* Create into the MigrationReportPacker a report for the nominal end of session
......@@ -102,30 +102,23 @@ private:
bool goingToEnd() const {return m_endNear;};
};
class ReportSuccessful : public Report {
u_int32_t m_checksum;
u_int64_t m_size;
/**
* The successful retrieve job to be reported immediately
*/
std::unique_ptr<cta::RetrieveJob> m_successfulRetrieveJob;
public:
ReportSuccessful(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob,u_int32_t checksum,
u_int64_t size):
Report(false),m_checksum(checksum),m_size(size), m_successfulRetrieveJob(std::move(successfulRetrieveJob)){}
ReportSuccessful(std::unique_ptr<cta::RetrieveJob> successfulRetrieveJob):
Report(false), m_successfulRetrieveJob(std::move(successfulRetrieveJob)){}
virtual void execute(RecallReportPacker& reportPacker);
};
class ReportError : public Report {
const castor::exception::Exception m_ex;
/**
* The failed retrieve job to be reported immediately
*/
std::unique_ptr<cta::RetrieveJob> m_failedRetrieveJob;
public:
ReportError(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob, const castor::exception::Exception &ex):
ReportError(std::unique_ptr<cta::RetrieveJob> failedRetrieveJob):
Report(false),
m_ex(ex),
m_failedRetrieveJob(std::move(failedRetrieveJob)) {
}
......
......@@ -118,8 +118,8 @@ TEST_F(castor_tape_tapeserver_daemonTest, RecallReportPackerNominal) {
castor::tape::tapeserver::daemon::RecallReportPacker rrp(&retrieveMount,lc);
rrp.startThreads();
rrp.reportCompletedJob(std::move(job1),0,0);
rrp.reportCompletedJob(std::move(job2),0,0);
rrp.reportCompletedJob(std::move(job1));
rrp.reportCompletedJob(std::move(job2));
rrp.reportEndOfSession();
rrp.waitThread();
......@@ -159,12 +159,13 @@ TEST_F(castor_tape_tapeserver_daemonTest, RecallReportPackerBadBadEnd) {
castor::tape::tapeserver::daemon::RecallReportPacker rrp(&retrieveMount,lc);
rrp.startThreads();
rrp.reportCompletedJob(std::move(job1),0,0);
rrp.reportCompletedJob(std::move(job2),0,0);
rrp.reportCompletedJob(std::move(job1));
rrp.reportCompletedJob(std::move(job2));
const std::string error_msg = "ERROR_TEST_MSG";
const castor::exception::Exception ex(error_msg);
rrp.reportFailedJob(std::move(job3),ex);
job3->failureMessage = ex.getMessageValue();
rrp.reportFailedJob(std::move(job3));
rrp.reportEndOfSession();
rrp.waitThread();
......
......@@ -589,7 +589,7 @@ XrootC2FSWriteFile::XrootC2FSWriteFile(const std::string &url,
m_signedURL = m_URL + opaqueBloc.str();
// ... and finally open the file for write (deleting any existing one in case)
XrootClEx::throwOnError(m_xrootFile.Open(m_signedURL, OpenFlags::Delete),
XrootClEx::throwOnError(m_xrootFile.Open(m_signedURL, OpenFlags::Delete | OpenFlags::Write),
std::string("In XrootC2FSWriteFile::XrootC2FSWriteFile failed XrdCl::File::Open() on ")
+m_URL);
}
......@@ -600,7 +600,7 @@ XrootWriteFile::XrootWriteFile(const std::string& xrootUrl) {
m_URL = xrootUrl;
// and simply open
using XrdCl::OpenFlags;
XrootClEx::throwOnError(m_xrootFile.Open(m_URL, OpenFlags::Delete),
XrootClEx::throwOnError(m_xrootFile.Open(m_URL, OpenFlags::Delete | OpenFlags::Write),
std::string("In XrootWriteFile::XrootWriteFile failed XrdCl::File::Open() on ")+m_URL);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment