-
Jorge Camarero Vera authoredJorge Camarero Vera authored
ArchiveMount.cpp 15.81 KiB
/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2015-2021 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <iostream>
#include "common/exception/NoSuchObject.hpp"
#include "common/make_unique.hpp"
#include "objectstore/Backend.hpp"
#include "scheduler/ArchiveMount.hpp"
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::ArchiveMount::ArchiveMount(catalogue::Catalogue & catalogue): m_catalogue(catalogue), m_sessionRunning(false){
}
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::ArchiveMount::ArchiveMount(catalogue::Catalogue & catalogue,
std::unique_ptr<SchedulerDatabase::ArchiveMount> dbMount): m_catalogue(catalogue),
m_sessionRunning(false) {
m_dbMount.reset(
dynamic_cast<SchedulerDatabase::ArchiveMount*>(dbMount.release()));
if(!m_dbMount.get()) {
throw WrongMountType(std::string(__FUNCTION__) +
": could not cast mount to SchedulerDatabase::ArchiveMount");
}
}
//------------------------------------------------------------------------------
// getMountType
//------------------------------------------------------------------------------
cta::common::dataStructures::MountType cta::ArchiveMount::getMountType() const {
return m_dbMount->mountInfo.mountType;
}
//------------------------------------------------------------------------------
// getVid
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getVid() const {
return m_dbMount->mountInfo.vid;
}
//------------------------------------------------------------------------------
// getDrive
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getDrive() const {
return m_dbMount->mountInfo.drive;
}
//------------------------------------------------------------------------------
// getPoolName
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getPoolName() const {
return m_dbMount->mountInfo.tapePool;
}
//------------------------------------------------------------------------------
// getVo
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getVo() const {
return m_dbMount->mountInfo.vo;
}
//------------------------------------------------------------------------------
// getMediaType
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getMediaType() const{
return m_dbMount->mountInfo.mediaType;
}
//------------------------------------------------------------------------------
// getVendor
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getVendor() const{
return m_dbMount->mountInfo.vendor;
}
//------------------------------------------------------------------------------
// getCapacityInBytes
//------------------------------------------------------------------------------
uint64_t cta::ArchiveMount::getCapacityInBytes() const
{
return m_dbMount->mountInfo.capacityInBytes;
}
//------------------------------------------------------------------------------
// getNbFiles
//------------------------------------------------------------------------------
uint32_t cta::ArchiveMount::getNbFiles() const {
return m_dbMount->nbFilesCurrentlyOnTape;
}
//------------------------------------------------------------------------------
// createDiskReporter
//------------------------------------------------------------------------------
cta::disk::DiskReporter* cta::ArchiveMount::createDiskReporter(std::string& URL) {
return m_reporterFactory.createDiskReporter(URL);
}
//------------------------------------------------------------------------------
// getMountTransactionId
//------------------------------------------------------------------------------
std::string cta::ArchiveMount::getMountTransactionId() const {
std::stringstream id;
if (!m_dbMount.get())
throw exception::Exception("In cta::ArchiveMount::getMountTransactionId(): got NULL dbMount");
id << m_dbMount->mountInfo.mountId;
return id.str();
}
//------------------------------------------------------------------------------
// updateCatalogueWithTapeFilesWritten
//------------------------------------------------------------------------------
void cta::ArchiveMount::updateCatalogueWithTapeFilesWritten(const std::set<cta::catalogue::TapeItemWrittenPointer> &tapeFilesWritten) {
m_catalogue.filesWrittenToTape(tapeFilesWritten);
}
//------------------------------------------------------------------------------
// getNextJobBatch
//------------------------------------------------------------------------------
std::list<std::unique_ptr<cta::ArchiveJob> > cta::ArchiveMount::getNextJobBatch(uint64_t filesRequested,
uint64_t bytesRequested, log::LogContext& logContext) {
// Check we are still running the session
if (!m_sessionRunning)
throw SessionNotRunning("In ArchiveMount::getNextJobBatch(): trying to get job from complete/not started session");
// try and get a new job from the DB side
std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> dbJobBatch(m_dbMount->getNextJobBatch(filesRequested,
bytesRequested, logContext));
std::list<std::unique_ptr<ArchiveJob>> ret;
// We prepare the response
for (auto & sdaj: dbJobBatch) {
ret.emplace_back(new ArchiveJob(this, m_catalogue,
sdaj->archiveFile, sdaj->srcURL, sdaj->tapeFile));
ret.back()->m_dbJob.reset(sdaj.release());
}
return ret;
}
//------------------------------------------------------------------------------
// reportJobsBatchWritten
//------------------------------------------------------------------------------
void cta::ArchiveMount::reportJobsBatchTransferred(std::queue<std::unique_ptr<cta::ArchiveJob> > & successfulArchiveJobs,
std::queue<cta::catalogue::TapeItemWritten> & skippedFiles, std::queue<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>>& failedToReportArchiveJobs,cta::log::LogContext& logContext) {
std::set<cta::catalogue::TapeItemWrittenPointer> tapeItemsWritten;
std::list<std::unique_ptr<cta::ArchiveJob> > validatedSuccessfulArchiveJobs;
std::list<std::unique_ptr<cta::SchedulerDatabase::ArchiveJob>> validatedSuccessfulDBArchiveJobs;
std::unique_ptr<cta::ArchiveJob> job;
std::string failedValidationJobReportURL;
try{
uint64_t files=0;
uint64_t bytes=0;
double catalogueTime=0;
double schedulerDbTime=0;
double clientReportingTime=0;
while(!successfulArchiveJobs.empty()) {
// Get the next job to report and make sure we will not attempt to process it twice.
job = std::move(successfulArchiveJobs.front());
successfulArchiveJobs.pop();
if (!job.get()) continue;
cta::log::ScopedParamContainer params(logContext);
params.add("tapeVid",job->tapeFile.vid)
.add("mountType",cta::common::dataStructures::toString(job->m_mount->getMountType()))
.add("fileId",job->archiveFile.archiveFileID)
.add("type", "ReportSuccessful");
logContext.log(cta::log::INFO, "In cta::ArchiveMount::reportJobsBatchTransferred(), archive job succesful.");
try {
tapeItemsWritten.emplace(job->validateAndGetTapeFileWritten().release());
} catch (const cta::exception::Exception &ex){
//We put the not validated job into this list in order to insert the job
//into the failedToReportArchiveJobs list in the exception catching block
failedValidationJobReportURL = job->reportURL();
validatedSuccessfulDBArchiveJobs.emplace_back(std::move(job->m_dbJob));
throw ex;
}
files++;
bytes+=job->archiveFile.fileSize;
validatedSuccessfulArchiveJobs.emplace_back(std::move(job));
job.reset();
}
while (!skippedFiles.empty()) {
auto tiwup = cta::make_unique<cta::catalogue::TapeItemWritten>();
*tiwup = skippedFiles.front();
skippedFiles.pop();
tapeItemsWritten.emplace(tiwup.release());
}
utils::Timer t;
// Now get the db mount to mark the jobs as successful.
// Extract the db jobs from the scheduler jobs.
for (auto &schJob: validatedSuccessfulArchiveJobs) {
validatedSuccessfulDBArchiveJobs.emplace_back(std::move(schJob->m_dbJob));
}
validatedSuccessfulArchiveJobs.clear();
updateCatalogueWithTapeFilesWritten(tapeItemsWritten);
catalogueTime=t.secs(utils::Timer::resetCounter);
{
cta::log::ScopedParamContainer params(logContext);
params.add("tapeFilesWritten", tapeItemsWritten.size())
.add("files", files)
.add("bytes", bytes)
.add("catalogueTime", catalogueTime);
logContext.log(cta::log::INFO, "Catalog updated for batch of jobs");
}
// We can now pass thevalidatedSuccessfulArchiveJobs list for the dbMount to process. We are done at that point.
// Reporting to client will be queued if needed and done in another process.
m_dbMount->setJobBatchTransferred(validatedSuccessfulDBArchiveJobs, logContext);
schedulerDbTime=t.secs(utils::Timer::resetCounter);
cta::log::ScopedParamContainer params(logContext);
params.add("files", files)
.add("bytes", bytes)
.add("catalogueTime", catalogueTime)
.add("schedulerDbTime", schedulerDbTime)
.add("totalTime", catalogueTime + schedulerDbTime + clientReportingTime);
logContext.log(log::INFO, "In ArchiveMount::reportJobsBatchWritten(): recorded a batch of archive jobs in metadata.");
} catch (const cta::exception::NoSuchObject& ex){
cta::log::ScopedParamContainer params(logContext);
params.add("exceptionMessageValue", ex.getMessageValue());
if (job.get()) {
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportURL", failedValidationJobReportURL);
}
const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): job does not exist in the objectstore.";
logContext.log(cta::log::WARNING, msg_error);
} catch(const cta::exception::Exception& e){
cta::log::ScopedParamContainer params(logContext);
params.add("exceptionMessageValue", e.getMessageValue());
if (job.get()) {
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path)
.add("reportURL", failedValidationJobReportURL);
}
const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an exception";
logContext.log(cta::log::ERR, msg_error);
//If validatedSuccessfulArchiveJobs has still jobs in it, it means that
//the validation job->validateAndGetTapeFileWritten() failed for one job and
//threw an exception. We will then have to fail all the others.
for(auto &ctaJob: validatedSuccessfulArchiveJobs){
if(ctaJob.get())
validatedSuccessfulDBArchiveJobs.emplace_back(std::move(ctaJob->m_dbJob));
}
for(auto &aj: validatedSuccessfulDBArchiveJobs){
if(aj.get())
failedToReportArchiveJobs.push(std::move(aj));
}
throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error);
} catch(const std::exception& e){
cta::log::ScopedParamContainer params(logContext);
params.add("exceptionWhat", e.what());
if (job.get()) {
params.add("fileId", job->archiveFile.archiveFileID)
.add("diskInstance", job->archiveFile.diskInstance)
.add("diskFileId", job->archiveFile.diskFileId)
.add("lastKnownDiskPath", job->archiveFile.diskFileInfo.path);
}
const std::string msg_error="In ArchiveMount::reportJobsBatchWritten(): got an standard exception";
logContext.log(cta::log::ERR, msg_error);
for(auto &aj: validatedSuccessfulDBArchiveJobs){
if(aj.get())
failedToReportArchiveJobs.push(std::move(aj));
}
throw cta::ArchiveMount::FailedMigrationRecallResult(msg_error);
}
}
//------------------------------------------------------------------------------
// complete
//------------------------------------------------------------------------------
void cta::ArchiveMount::complete() {
// Just set the session as complete in the DB.
m_dbMount->complete(time(NULL));
// and record we are done with the mount
m_sessionRunning = false;
}
//------------------------------------------------------------------------------
// abort
//------------------------------------------------------------------------------
void cta::ArchiveMount::abort(const std::string& reason) {
complete();
setDriveStatus(cta::common::dataStructures::DriveStatus::Down, reason);
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
cta::ArchiveMount::~ArchiveMount() throw() {
}
//------------------------------------------------------------------------------
// setDriveStatus()
//------------------------------------------------------------------------------
void cta::ArchiveMount::setDriveStatus(cta::common::dataStructures::DriveStatus status, const cta::optional<std::string> & reason) {
m_dbMount->setDriveStatus(status, time(NULL), reason);
}
//------------------------------------------------------------------------------
// setTapeSessionStats()
//------------------------------------------------------------------------------
void cta::ArchiveMount::setTapeSessionStats(const castor::tape::tapeserver::daemon::TapeSessionStats &stats) {
m_dbMount->setTapeSessionStats(stats);
}
//------------------------------------------------------------------------------
// setTapeMounted()
//------------------------------------------------------------------------------
void cta::ArchiveMount::setTapeMounted(cta::log::LogContext& logContext) const {
utils::Timer t;
log::ScopedParamContainer spc(logContext);
try {
m_catalogue.tapeMountedForArchive(m_dbMount->getMountInfo().vid, m_dbMount->getMountInfo().drive);
auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
spc.add("catalogueTime", catalogueTime);
logContext.log(log::INFO, "In ArchiveMount::setTapeMounted(): success.");
} catch (cta::exception::Exception &ex) {
auto catalogueTimeFailed = t.secs(cta::utils::Timer::resetCounter);
spc.add("catalogueTime", catalogueTimeFailed);
logContext.log(cta::log::WARNING,
"Failed to update catalogue for the tape mounted for archive.");
}
}
//------------------------------------------------------------------------------
// setTapeFull()
//------------------------------------------------------------------------------
void cta::ArchiveMount::setTapeFull() {
m_catalogue.noSpaceLeftOnTape(m_dbMount->getMountInfo().vid);
}