Skip to content
Snippets Groups Projects
ArchiveJob.cpp 8.34 KiB
/*
 * @project      The CERN Tape Archive (CTA)
 * @copyright    Copyright © 2021-2022 CERN
 * @license      This program is free software, distributed under the terms of the GNU General Public
 *               Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can
 *               redistribute it and/or modify it under the terms of the GPL Version 3, or (at your
 *               option) any later version.
 *
 *               This program is distributed in the hope that it will be useful, but WITHOUT ANY
 *               WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
 *               PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 *               In applying this licence, CERN does not waive the privileges and immunities
 *               granted to it by virtue of its status as an Intergovernmental Organization or
 *               submit itself to any jurisdiction.
 */

#include "scheduler/ArchiveJob.hpp"
#include "scheduler/ArchiveMount.hpp"
#include "disk/DiskReporterFactory.hpp"
#include <limits>
#include <cryptopp/base64.h>

//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
cta::ArchiveJob::~ArchiveJob() throw() {
}

//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::ArchiveJob::ArchiveJob(ArchiveMount *mount,
                            catalogue::Catalogue& catalogue,
                            const common::dataStructures::ArchiveFile& archiveFile,
                            const std::string& srcURL,
                            const common::dataStructures::TapeFile& tapeFile) :
  m_mount(mount), m_catalogue(catalogue),
  archiveFile(archiveFile),
  srcURL(srcURL),
  tapeFile(tapeFile) {}

//------------------------------------------------------------------------------
// getReportTiming()
//------------------------------------------------------------------------------
double cta::ArchiveJob::reportTime() {
  return m_reporterTimer.secs();
}

//------------------------------------------------------------------------------
// ArchiveJob::validateAndGetTapeFileWritten
//------------------------------------------------------------------------------
cta::catalogue::TapeItemWrittenPointer cta::ArchiveJob::validateAndGetTapeFileWritten() {
  validate();
  auto fileReportUP = std::make_unique<catalogue::TapeFileWritten>();
  auto& fileReport = *fileReportUP;
  fileReport.archiveFileId = archiveFile.archiveFileID;
  fileReport.blockId = tapeFile.blockId;
  fileReport.checksumBlob = tapeFile.checksumBlob;
  fileReport.copyNb = tapeFile.copyNb;
  fileReport.diskFileId = archiveFile.diskFileId;
  fileReport.diskFileOwnerUid = archiveFile.diskFileInfo.owner_uid;
  fileReport.diskFileGid = archiveFile.diskFileInfo.gid;
  fileReport.diskInstance = archiveFile.diskInstance;
  fileReport.fSeq = tapeFile.fSeq;
  fileReport.size = archiveFile.fileSize;
  fileReport.storageClassName = archiveFile.storageClass;
  fileReport.tapeDrive = getMount().getDrive();
  fileReport.vid = tapeFile.vid;
  return cta::catalogue::TapeItemWrittenPointer(fileReportUP.release());
}

//------------------------------------------------------------------------------
// ArchiveJob::validate
//------------------------------------------------------------------------------
void cta::ArchiveJob::validate() {
  // First check that the block Id for the file has been set.
  if (tapeFile.blockId == std::numeric_limits<decltype(tapeFile.blockId)>::max())
    throw BlockIdNotSet("In cta::ArchiveJob::validate(): Block ID not set");
  // Also check the checksum has been set
  if (archiveFile.checksumBlob.empty() || tapeFile.checksumBlob.empty())
    throw ChecksumNotSet("In cta::ArchiveJob::validate(): checksums not set");
  // And matches
  archiveFile.checksumBlob.validate(tapeFile.checksumBlob);
}

//------------------------------------------------------------------------------
// ArchiveJob::exceptionThrowingReportURL
//------------------------------------------------------------------------------
std::string cta::ArchiveJob::exceptionThrowingReportURL() {
  switch (m_dbJob->reportType) {
    case SchedulerDatabase::ArchiveJob::ReportType::CompletionReport:
      return m_dbJob->archiveReportURL;
    case SchedulerDatabase::ArchiveJob::ReportType::FailureReport: {
      if (m_dbJob->latestError.empty()) {
        throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): empty failure reason.");
      }
      std::string base64ErrorReport;
      // Construct a pipe: msg -> sign -> Base64 encode -> result goes into ret.
      const bool noNewLineInBase64Output = false;
      CryptoPP::StringSource ss1(m_dbJob->latestError, true,
                                 new CryptoPP::Base64Encoder(
                                   new CryptoPP::StringSink(base64ErrorReport), noNewLineInBase64Output));
      return m_dbJob->errorReportURL + base64ErrorReport;
    }
    case SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired:
      throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): job status NoReportRequired does not require reporting.");
    case SchedulerDatabase::ArchiveJob::ReportType::Report:
      throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): job status Report does not require reporting.");
  }
  throw exception::Exception("In ArchiveJob::exceptionThrowingReportURL(): invalid report type reportType=" +
                             std::to_string(static_cast<uint8_t>(m_dbJob->reportType)));
}

//------------------------------------------------------------------------------
// ArchiveJob::reportURL
//------------------------------------------------------------------------------
std::string cta::ArchiveJob::reportURL() noexcept {
  try {
    return exceptionThrowingReportURL();
  } catch (exception::Exception& ex) {
    return ex.what();
  } catch (...) {
    return "In ArchiveJob::reportURL(): unknown exception";
  }
}

//------------------------------------------------------------------------------
// ArchiveJob::reportType
//------------------------------------------------------------------------------
std::string cta::ArchiveJob::reportType() {
  switch (m_dbJob->reportType) {
    case SchedulerDatabase::ArchiveJob::ReportType::CompletionReport:
      return "CompletionReport";
    case SchedulerDatabase::ArchiveJob::ReportType::FailureReport:
      return "FailureReport";
    case SchedulerDatabase::ArchiveJob::ReportType::NoReportRequired:
      return "NoReportRequired";
    case SchedulerDatabase::ArchiveJob::ReportType::Report:
      return "Report";
    default: {
      throw exception::Exception("In ArchiveJob::reportType(): job status does not require reporting.");
    }
  }
  throw exception::Exception("In ArchiveJob::reportType(): invalid report type.");
}

//------------------------------------------------------------------------------
// ArchiveJob::reportFailed
//------------------------------------------------------------------------------
void cta::ArchiveJob::reportFailed(const std::string& failureReason, log::LogContext& lc) {
  // This is fully delegated to the DB, which will handle the queueing for next steps, if any.
  m_dbJob->failReport(failureReason, lc);
}


//------------------------------------------------------------------------------
// ArchiveJob::transferFailed
//------------------------------------------------------------------------------
void cta::ArchiveJob::transferFailed(const std::string& failureReason, log::LogContext& lc) {
  // This is fully delegated to the DB, which will handle the queueing for next steps, if any.
  m_dbJob->failTransfer(failureReason, lc);
}

//------------------------------------------------------------------------------
// waitForReporting
//------------------------------------------------------------------------------
void cta::ArchiveJob::waitForReporting() {
  m_reporterState.get_future().get();
}

//------------------------------------------------------------------------------
// cta::ArchiveJob::getMount()
//------------------------------------------------------------------------------
cta::ArchiveMount& cta::ArchiveJob::getMount() {
  if (m_mount) return *m_mount;
  throw exception::Exception("In ArchiveJob::getMount(): no mount set.");
}