Skip to content
Snippets Groups Projects
ArchiveJob.cpp 7.16 KiB
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "scheduler/ArchiveJob.hpp"
#include "scheduler/ArchiveMount.hpp"
#include "eos/DiskReporterFactory.hpp"
#include <limits>
#include <cryptopp/base64.h>

//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
cta::ArchiveJob::~ArchiveJob() throw() {
}

//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::ArchiveJob::ArchiveJob(ArchiveMount &mount,
  catalogue::Catalogue & catalogue,
  const common::dataStructures::ArchiveFile &archiveFile,
  const std::string &srcURL,
  const common::dataStructures::TapeFile &tapeFile):
  m_mount(mount), m_catalogue(catalogue),
  archiveFile(archiveFile),
  srcURL(srcURL),
  tapeFile(tapeFile) {}

//------------------------------------------------------------------------------
// asyncReportComplete
//------------------------------------------------------------------------------
void cta::ArchiveJob::asyncReportComplete() {
  m_reporter.reset(m_mount.createDiskReporter(m_dbJob->archiveReportURL, m_reporterState));
  m_reporter->asyncReportArchiveFullyComplete();
  m_reporterTimer.reset();
}

//------------------------------------------------------------------------------
// getReportTiming()
//------------------------------------------------------------------------------
double cta::ArchiveJob::reportTime() {
  return m_reporterTimer.secs();
}

//------------------------------------------------------------------------------
// ArchiveJob::validateAndGetTapeFileWritten
//------------------------------------------------------------------------------
cta::catalogue::TapeFileWritten cta::ArchiveJob::validateAndGetTapeFileWritten() {
  validate();
  catalogue::TapeFileWritten fileReport;
  fileReport.archiveFileId = archiveFile.archiveFileID;
  fileReport.blockId = tapeFile.blockId;
  fileReport.checksumType = tapeFile.checksumType;
  fileReport.checksumValue = tapeFile.checksumValue;
  fileReport.compressedSize = tapeFile.compressedSize;
  fileReport.copyNb = tapeFile.copyNb;
  fileReport.diskFileId = archiveFile.diskFileId;
  fileReport.diskFileUser = archiveFile.diskFileInfo.owner;
  fileReport.diskFileGroup = archiveFile.diskFileInfo.group;
  fileReport.diskFilePath = archiveFile.diskFileInfo.path;
  fileReport.diskFileRecoveryBlob = archiveFile.diskFileInfo.recoveryBlob;
  fileReport.diskInstance = archiveFile.diskInstance;
  fileReport.fSeq = tapeFile.fSeq;
  fileReport.size = archiveFile.fileSize;
  fileReport.storageClassName = archiveFile.storageClass;
  fileReport.tapeDrive = m_mount.getDrive();
  fileReport.vid = tapeFile.vid;
  return fileReport;
}

//------------------------------------------------------------------------------
// ArchiveJob::validate
//------------------------------------------------------------------------------
void cta::ArchiveJob::validate(){
  // First check that the block Id for the file has been set.
  if (tapeFile.blockId ==
      std::numeric_limits<decltype(tapeFile.blockId)>::max())
    throw BlockIdNotSet("In cta::ArchiveJob::validate(): Block ID not set");
  // Also check the checksum has been set
  if (archiveFile.checksumType.empty() || archiveFile.checksumValue.empty() || 
      tapeFile.checksumType.empty() || tapeFile.checksumValue.empty())
    throw ChecksumNotSet("In cta::ArchiveJob::validate(): checksums not set");
  // And matches
  if (archiveFile.checksumType != tapeFile.checksumType || 
      archiveFile.checksumValue != tapeFile.checksumValue)
    throw ChecksumMismatch(std::string("In cta::ArchiveJob::validate(): checksum mismatch!")
            +" Archive file checksum type: "+archiveFile.checksumType
            +" Archive file checksum value: "+archiveFile.checksumValue
            +" Tape file checksum type: "+tapeFile.checksumType
            +" Tape file checksum value: "+tapeFile.checksumValue);
}

//------------------------------------------------------------------------------
// ArchiveJob::reportURL
//------------------------------------------------------------------------------
std::string cta::ArchiveJob::reportURL() {
  return m_dbJob->archiveReportURL;
}

//------------------------------------------------------------------------------
// failed
//------------------------------------------------------------------------------
void cta::ArchiveJob::failed(const std::string &failureReason,  log::LogContext & lc) {
  if (m_dbJob->fail(failureReason, lc)) {
    std::string base64ErrorReport;
    // Construct a pipe: msg -> sign -> Base64 encode -> result goes into ret.
    const bool noNewLineInBase64Output = false;
    CryptoPP::StringSource ss1(failureReason, true, 
        new CryptoPP::Base64Encoder(
          new CryptoPP::StringSink(base64ErrorReport), noNewLineInBase64Output));
    std::string fullReportURL = m_dbJob->errorReportURL + base64ErrorReport;
    // That's all job's already done.
    std::promise<void> reporterState;
    utils::Timer t;
    std::unique_ptr<cta::eos::DiskReporter> reporter(m_mount.createDiskReporter(fullReportURL, reporterState));
    reporter->asyncReportArchiveFullyComplete();
    try {
      reporterState.get_future().get();
      log::ScopedParamContainer params(lc);
      params.add("fileId", m_dbJob->archiveFile.archiveFileID)
            .add("diskInstance", m_dbJob->archiveFile.diskInstance)
            .add("diskFileId", m_dbJob->archiveFile.diskFileId)
            .add("fullReportURL", fullReportURL)
            .add("errorReport", failureReason)
            .add("reportTime", t.secs());
      lc.log(log::INFO, "In ArchiveJob::failed(): reported error to client.");
    } catch (cta::exception::Exception & ex) {
      log::ScopedParamContainer params(lc);
      params.add("fileId", m_dbJob->archiveFile.archiveFileID)
            .add("diskInstance", m_dbJob->archiveFile.diskInstance)
            .add("diskFileId", m_dbJob->archiveFile.diskFileId)
            .add("errorReport", failureReason)
            .add("exceptionMsg", ex.getMessageValue())
            .add("reportTime", t.secs());
      lc.log(log::ERR, "In ArchiveJob::failed(): failed to report error to client.");
      lc.logBacktrace(log::ERR, ex.backtrace());
    }
  }
}

//------------------------------------------------------------------------------
// waitForReporting
//------------------------------------------------------------------------------
void cta::ArchiveJob::waitForReporting() {
  m_reporterState.get_future().get();
}