Skip to content
Snippets Groups Projects
ArchiveRequest.cpp 24.96 KiB
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "ArchiveRequest.hpp"
#include "GenericObject.hpp"
#include "ArchiveQueue.hpp"
#include "Helpers.hpp"
#include "common/dataStructures/EntryLog.hpp"
#include "MountPolicySerDeser.hpp"

#include <algorithm>
#include <google/protobuf/util/json_util.h>
#include <cmath>

namespace cta { namespace objectstore {

cta::objectstore::ArchiveRequest::ArchiveRequest(const std::string& address, Backend& os): 
  ObjectOps<serializers::ArchiveRequest, serializers::ArchiveRequest_t>(os, address){ }

cta::objectstore::ArchiveRequest::ArchiveRequest(Backend& os): 
  ObjectOps<serializers::ArchiveRequest, serializers::ArchiveRequest_t>(os) { }

cta::objectstore::ArchiveRequest::ArchiveRequest(GenericObject& go):
  ObjectOps<serializers::ArchiveRequest, serializers::ArchiveRequest_t>(go.objectStore()) {
  // Here we transplant the generic object into the new object
  go.transplantHeader(*this);
  // And interpret the header.
  getPayloadFromHeader();
}

void cta::objectstore::ArchiveRequest::initialize() {
  // Setup underlying object
  ObjectOps<serializers::ArchiveRequest, serializers::ArchiveRequest_t>::initialize();
  // This object is good to go (to storage)
  m_payloadInterpreted = true;
}

void cta::objectstore::ArchiveRequest::addJob(uint16_t copyNumber,
  const std::string& tapepool, const std::string& archivequeueaddress, 
    uint16_t maxRetiesWithinMount, uint16_t maxTotalRetries) {
  checkPayloadWritable();
  auto *j = m_payload.add_jobs();
  j->set_copynb(copyNumber);
  j->set_status(serializers::ArchiveJobStatus::AJS_LinkingToArchiveQueue);
  j->set_tapepool(tapepool);
  j->set_owner(archivequeueaddress);
  // XXX This field (archivequeueaddress) is a leftover from a past layout when tape pools were static
  // in the object store, and should be eventually removed.
  j->set_archivequeueaddress("");
  j->set_totalretries(0);
  j->set_retrieswithinmount(0);
  j->set_lastmountwithfailure(0);
  j->set_maxretrieswithinmount(maxRetiesWithinMount);
  j->set_maxtotalretries(maxTotalRetries);
}

bool cta::objectstore::ArchiveRequest::setJobSuccessful(uint16_t copyNumber) {
  checkPayloadWritable();
  auto * jl = m_payload.mutable_jobs();
  for (auto j=jl->begin(); j!=jl->end(); j++) {
    if (j->copynb() == copyNumber) {
      j->set_status(serializers::ArchiveJobStatus::AJS_Complete);
      for (auto j2=jl->begin(); j2!=jl->end(); j2++) {
        if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete && 
            j2->status()!= serializers::ArchiveJobStatus::AJS_Failed)
          return false;
      }
      return true;
    }
  }
  throw NoSuchJob("In ArchiveRequest::setJobSuccessful(): job not found");
}

bool cta::objectstore::ArchiveRequest::addJobFailure(uint16_t copyNumber,
    uint64_t mountId, log::LogContext & lc) {
  checkPayloadWritable();
  auto * jl = m_payload.mutable_jobs();
  // Find the job and update the number of failures 
  // (and return the job status: failed (true) or to be retried (false))
  for (auto & j: *jl) {
    if (j.copynb() == copyNumber) {
      if (j.lastmountwithfailure() == mountId) {
        j.set_retrieswithinmount(j.retrieswithinmount() + 1);
      } else {
        j.set_retrieswithinmount(1);
        j.set_lastmountwithfailure(mountId);
      }
      j.set_totalretries(j.totalretries() + 1);
    }
    if (j.totalretries() >= j.maxtotalretries()) {
      j.set_status(serializers::AJS_Failed);
      finishIfNecessary(lc);
      return true;
    } else {
      j.set_status(serializers::AJS_PendingMount);
      return false;
    }
  }
  throw NoSuchJob ("In ArchiveRequest::addJobFailure(): could not find job");
}


void cta::objectstore::ArchiveRequest::setAllJobsLinkingToArchiveQueue() {
  checkPayloadWritable();
  auto * jl=m_payload.mutable_jobs();
  for (auto j=jl->begin(); j!=jl->end(); j++) {
    j->set_status(serializers::AJS_LinkingToArchiveQueue);
  }
}

void cta::objectstore::ArchiveRequest::setAllJobsFailed() {
  checkPayloadWritable();
  auto * jl=m_payload.mutable_jobs();
  for (auto j=jl->begin(); j!=jl->end(); j++) {
    j->set_status(serializers::AJS_Failed);
  }
}

void ArchiveRequest::setArchiveFile(const cta::common::dataStructures::ArchiveFile& archiveFile) {
  checkPayloadWritable();
  // TODO: factor out the archivefile structure from the flat ArchiveRequest.
  m_payload.set_archivefileid(archiveFile.archiveFileID);
  m_payload.set_checksumtype(archiveFile.checksumType);
  m_payload.set_checksumvalue(archiveFile.checksumValue);
  m_payload.set_creationtime(archiveFile.creationTime);
  m_payload.set_diskfileid(archiveFile.diskFileId);
  m_payload.mutable_diskfileinfo()->set_group(archiveFile.diskFileInfo.group);
  m_payload.mutable_diskfileinfo()->set_owner(archiveFile.diskFileInfo.owner);
  m_payload.mutable_diskfileinfo()->set_path(archiveFile.diskFileInfo.path);
  m_payload.mutable_diskfileinfo()->set_recoveryblob(archiveFile.diskFileInfo.recoveryBlob);
  m_payload.set_diskinstance(archiveFile.diskInstance);
  m_payload.set_filesize(archiveFile.fileSize);
  m_payload.set_reconcilationtime(archiveFile.reconciliationTime);
  m_payload.set_storageclass(archiveFile.storageClass);
}

cta::common::dataStructures::ArchiveFile ArchiveRequest::getArchiveFile() {
  checkPayloadReadable();
  cta::common::dataStructures::ArchiveFile ret;
  ret.archiveFileID = m_payload.archivefileid();
  ret.checksumType = m_payload.checksumtype();
  ret.checksumValue = m_payload.checksumvalue();
  ret.creationTime = m_payload.creationtime();
  ret.diskFileId = m_payload.diskfileid();
  ret.diskFileInfo.group = m_payload.diskfileinfo().group();
  ret.diskFileInfo.owner = m_payload.diskfileinfo().owner();
  ret.diskFileInfo.path = m_payload.diskfileinfo().path();
  ret.diskFileInfo.recoveryBlob = m_payload.diskfileinfo().recoveryblob();
  ret.diskInstance = m_payload.diskinstance();
  ret.fileSize = m_payload.filesize();
  ret.reconciliationTime = m_payload.reconcilationtime();
  ret.storageClass = m_payload.storageclass();
  return ret;
}

//------------------------------------------------------------------------------
// setArchiveReportURL
//------------------------------------------------------------------------------
void ArchiveRequest::setArchiveReportURL(const std::string& URL) {
  checkPayloadWritable();
  m_payload.set_archivereporturl(URL);
}

//------------------------------------------------------------------------------
// getArviveReportURL
//------------------------------------------------------------------------------
std::string ArchiveRequest::getArchiveReportURL() {
  checkPayloadReadable();
  return m_payload.archivereporturl();
}


//------------------------------------------------------------------------------
// setMountPolicy
//------------------------------------------------------------------------------
void cta::objectstore::ArchiveRequest::setMountPolicy(const cta::common::dataStructures::MountPolicy &mountPolicy) {
  checkPayloadWritable();
  MountPolicySerDeser(mountPolicy).serialize(*m_payload.mutable_mountpolicy());
}

//------------------------------------------------------------------------------
// getMountPolicy
//------------------------------------------------------------------------------
cta::common::dataStructures::MountPolicy cta::objectstore::ArchiveRequest::getMountPolicy() {
  checkPayloadReadable();
  MountPolicySerDeser mp;
  mp.deserialize(m_payload.mountpolicy());
  return mp;
}

//------------------------------------------------------------------------------
// setRequester
//------------------------------------------------------------------------------
void ArchiveRequest::setRequester(const cta::common::dataStructures::UserIdentity &requester) {
  checkPayloadWritable();
  auto payloadRequester = m_payload.mutable_requester();
  payloadRequester->set_name(requester.name);
  payloadRequester->set_group(requester.group);
}

//------------------------------------------------------------------------------
// getRequester
//------------------------------------------------------------------------------
cta::common::dataStructures::UserIdentity ArchiveRequest::getRequester() {
  checkPayloadReadable();
  cta::common::dataStructures::UserIdentity requester;
  auto payloadRequester = m_payload.requester();
  requester.name=payloadRequester.name();
  requester.group=payloadRequester.group();
  return requester;
}

//------------------------------------------------------------------------------
// setSrcURL
//------------------------------------------------------------------------------
void ArchiveRequest::setSrcURL(const std::string &srcURL) {
  checkPayloadWritable();
  m_payload.set_srcurl(srcURL);
}

//------------------------------------------------------------------------------
// getSrcURL
//------------------------------------------------------------------------------
std::string ArchiveRequest::getSrcURL() {
  checkPayloadReadable();
  return m_payload.srcurl();
}

//------------------------------------------------------------------------------
// setCreationLog
//------------------------------------------------------------------------------
void ArchiveRequest::setEntryLog(const cta::common::dataStructures::EntryLog &creationLog) {
  checkPayloadWritable();
  auto payloadCreationLog = m_payload.mutable_creationlog();
  payloadCreationLog->set_time(creationLog.time);
  payloadCreationLog->set_host(creationLog.host);
  payloadCreationLog->set_username(creationLog.username);
}

//------------------------------------------------------------------------------
// getCreationLog
//------------------------------------------------------------------------------
cta::common::dataStructures::EntryLog ArchiveRequest::getEntryLog() {
  checkPayloadReadable();
  EntryLogSerDeser el;
  el.deserialize(m_payload.creationlog());
  return el;
}

auto ArchiveRequest::dumpJobs() -> std::list<ArchiveRequest::JobDump> {
  checkPayloadReadable();
  std::list<JobDump> ret;
  auto & jl = m_payload.jobs();
  for (auto j=jl.begin(); j!=jl.end(); j++) {
    ret.push_back(JobDump());
    ret.back().copyNb = j->copynb();
    ret.back().tapePool = j->tapepool();
    ret.back().owner = j->owner();
    ret.back().status = j->status();
  }
  return ret;
}

//------------------------------------------------------------------------------
// garbageCollect
//------------------------------------------------------------------------------
void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentReference & agentReference, log::LogContext & lc,
    cta::catalogue::Catalogue & catalogue) {
  checkPayloadWritable();
  // The behavior here depends on which job the agent is supposed to own.
  // We should first find this job (if any). This is for covering the case
  // of a selected job. The Request could also still being connected to tape
  // pools. In this case we will finish the connection to tape pools unconditionally.
  auto * jl = m_payload.mutable_jobs();
  bool anythingGarbageCollected=false;
  for (auto j=jl->begin(); j!=jl->end(); j++) {
    auto owner=j->owner();
    auto status=j->status();
    if (status==serializers::AJS_LinkingToArchiveQueue ||
        ( (status==serializers::AJS_Selected || status==serializers::AJS_PendingMount)
             && owner==presumedOwner)) {
        // If the job was being connected to the tape pool or was selected
        // by the dead agent, then we have to ensure it is indeed connected to
        // the tape pool and set its status to pending.
        // (Re)connect the job to the tape pool and make it pending.
        // If we fail to reconnect, we have to fail the job and potentially
        // finish the request.
      std::string queueObject="Not defined yet";
      anythingGarbageCollected=true;
      try {
        utils::Timer t;
        // Get the queue where we should requeue the job. The queue might need to be
        // recreated (this will be done by helper).
        ArchiveQueue aq(m_objectStore);
        ScopedExclusiveLock aql;
        Helpers::getLockedAndFetchedQueue<ArchiveQueue>(aq, aql, agentReference, j->tapepool(), lc);
        queueObject=aq.getAddressIfSet();
        ArchiveRequest::JobDump jd;
        jd.copyNb = j->copynb();
        jd.tapePool = j->tapepool();
        jd.owner = j->owner();
        jd.status = j->status();
        if (aq.addJobIfNecessary(jd, getAddressIfSet(), getArchiveFile().archiveFileID,
          getArchiveFile().fileSize, getMountPolicy(), getEntryLog().time))
          aq.commit();
        auto queueUpdateTime = t.secs(utils::Timer::resetCounter);
        j->set_owner(aq.getAddressIfSet());
        j->set_status(serializers::AJS_PendingMount);
        commit();
        aql.release();
        auto commitUnlockQueueTime = t.secs(utils::Timer::resetCounter);
        {
          log::ScopedParamContainer params(lc);
          params.add("jobObject", getAddressIfSet())
                .add("queueObject", queueObject)
                .add("presumedOwner", presumedOwner)
                .add("copyNb", j->copynb())
                .add("queueUpdateTime", queueUpdateTime)
                .add("commitUnlockQueueTime", commitUnlockQueueTime);
          lc.log(log::INFO, "In ArchiveRequest::garbageCollect(): requeued job.");
        }
        timespec ts;
        // We will sleep a bit to make sure other processes can also access the queue
        // as we are very likely to be part of a tight loop.
        // TODO: ideally, end of session requeueing and garbage collection should be
        // done in parallel.
        // We sleep half the time it took to queue to give a chance to other lockers.
        double secSleep, fracSecSleep;
        fracSecSleep = std::modf(queueUpdateTime / 2, &secSleep);
        ts.tv_sec = secSleep;
        ts.tv_nsec = std::round(fracSecSleep * 1000 * 1000 * 1000);
        nanosleep(&ts, nullptr);
        auto sleepTime = t.secs();
        log::ScopedParamContainer params(lc);
        params.add("jobObject", getAddressIfSet())
              .add("queueObject", queueObject)
              .add("presumedOwner", presumedOwner)
              .add("copyNb", j->copynb())
              .add("queueUpdateTime", queueUpdateTime)
              .add("commitUnlockQueueTime", commitUnlockQueueTime)
              .add("sleepTime", sleepTime);
        lc.log(log::INFO, "In ArchiveRequest::garbageCollect(): slept some time to not sit on the queue after GC requeueing.");
      } catch (...) {
        // We could not requeue the job: fail it.
        j->set_status(serializers::AJS_Failed);
        log::ScopedParamContainer params(lc);
        params.add("jobObject", getAddressIfSet())
              .add("queueObject", queueObject)
              .add("presumedOwner", presumedOwner)
              .add("copyNb", j->copynb());
        // Log differently depending on the exception type.
        std::string backtrace = "";
        try {
          std::rethrow_exception(std::current_exception());
        } catch (cta::exception::Exception &ex) {
          params.add("exceptionMessage", ex.getMessageValue());
          backtrace = ex.backtrace();
        } catch (std::exception & ex) {
          params.add("exceptionWhat", ex.what());
        } catch (...) {
          params.add("exceptionType", "unknown");
        }        
        // This could be the end of the request, with various consequences.
        // This is handled here:
        if (finishIfNecessary(lc)) {
          std::string message="In ArchiveRequest::garbageCollect(): failed to requeue the job. Failed it and removed the request as a consequence.";
          if (backtrace.size()) message += " Backtrace follows.";
          lc.log(log::ERR, message);
          if (backtrace.size()) lc.logBacktrace(log::ERR, backtrace);
          return;
        } else {
          commit();
          lc.log(log::ERR, "In ArchiveRequest::garbageCollect(): failed to requeue the job and failed it.");
        }
      }
    }
  }
  if (!anythingGarbageCollected) {
    log::ScopedParamContainer params(lc);
    params.add("jobObject", getAddressIfSet())
          .add("presumedOwner", presumedOwner);
    lc.log(log::INFO, "In ArchiveRequest::garbageCollect(): nothing to garbage collect.");
  }
}

void ArchiveRequest::setJobOwner(
  uint16_t copyNumber, const std::string& owner) {
  checkPayloadWritable();
  // Find the right job
  auto mutJobs = m_payload.mutable_jobs();
  for (auto job=mutJobs->begin(); job!=mutJobs->end(); job++) {
    if (job->copynb() == copyNumber) {
      job->set_owner(owner);
      return;
    }
  }
  throw NoSuchJob("In ArchiveRequest::setJobOwner: no such job");
}

ArchiveRequest::AsyncJobOwnerUpdater* ArchiveRequest::asyncUpdateJobOwner(uint16_t copyNumber,
  const std::string& owner, const std::string& previousOwner) {
  std::unique_ptr<AsyncJobOwnerUpdater> ret(new AsyncJobOwnerUpdater);
  // Passing a reference to the unique pointer led to strange behaviors.
  auto & retRef = *ret;
  ret->m_updaterCallback=
      [this, copyNumber, owner, previousOwner, &retRef](const std::string &in)->std::string {
        // We have a locked and fetched object, so we just need to work on its representation.
        retRef.m_timingReport.lockFetchTime = retRef.m_timer.secs(utils::Timer::resetCounter);
        serializers::ObjectHeader oh;
        if (!oh.ParseFromString(in)) {
          // Use a the tolerant parser to assess the situation.
          oh.ParsePartialFromString(in);
          throw cta::exception::Exception(std::string("In ArchiveRequest::asyncUpdateJobOwner(): could not parse header: ")+
            oh.InitializationErrorString());
        }
        if (oh.type() != serializers::ObjectType::ArchiveRequest_t) {
          std::stringstream err;
          err << "In ArchiveRequest::asyncUpdateJobOwner()::lambda(): wrong object type: " << oh.type();
          throw cta::exception::Exception(err.str());
        }
        serializers::ArchiveRequest payload;
        if (!payload.ParseFromString(oh.payload())) {
          // Use a the tolerant parser to assess the situation.
          payload.ParsePartialFromString(oh.payload());
          throw cta::exception::Exception(std::string("In ArchiveRequest::asyncUpdateJobOwner(): could not parse payload: ")+
            payload.InitializationErrorString());
        }
        // Find the copy number and change the owner.
        auto *jl=payload.mutable_jobs();
        for (auto j=jl->begin(); j!=jl->end(); j++) {
          if (j->copynb() == copyNumber) {
            if (j->owner() != previousOwner) {
              throw WrongPreviousOwner("In ArchiveRequest::asyncUpdateJobOwner()::lambda(): Job not owned.");
            }
            j->set_owner(owner);
            // We also need to gather all the job content for the user to get in-memory
            // representation.
            // TODO this is an unfortunate duplication of the getXXX() members of ArchiveRequest.
            // We could try and refactor this.
            retRef.m_archiveFile.archiveFileID = payload.archivefileid();
            retRef.m_archiveFile.checksumType = payload.checksumtype();
            retRef.m_archiveFile.checksumValue = payload.checksumvalue();
            retRef.m_archiveFile.creationTime = payload.creationtime();
            retRef.m_archiveFile.diskFileId = payload.diskfileid();
            retRef.m_archiveFile.diskFileInfo.group = payload.diskfileinfo().group();
            retRef.m_archiveFile.diskFileInfo.owner = payload.diskfileinfo().owner();
            retRef.m_archiveFile.diskFileInfo.path = payload.diskfileinfo().path();
            retRef.m_archiveFile.diskFileInfo.recoveryBlob = payload.diskfileinfo().recoveryblob();
            retRef.m_archiveFile.diskInstance = payload.diskinstance();
            retRef.m_archiveFile.fileSize = payload.filesize();
            retRef.m_archiveFile.reconciliationTime = payload.reconcilationtime();
            retRef.m_archiveFile.storageClass = payload.storageclass();
            retRef.m_archiveReportURL = payload.archivereporturl();
            retRef.m_srcURL = payload.srcurl();
            oh.set_payload(payload.SerializePartialAsString());
            retRef.m_timingReport.processTime = retRef.m_timer.secs(utils::Timer::resetCounter);
            return oh.SerializeAsString();
          }
        }
        // If we do not find the copy, return not owned as well...
        throw WrongPreviousOwner("In ArchiveRequest::asyncUpdateJobOwner()::lambda(): copyNb not found.");
      };
  ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(), ret->m_updaterCallback));
  return ret.release();
}

void ArchiveRequest::AsyncJobOwnerUpdater::wait() {
  m_backendUpdater->wait();
  m_timingReport.commitUnlockTime = m_timer.secs();
}

ArchiveRequest::AsyncJobOwnerUpdater::TimingsReport ArchiveRequest::AsyncJobOwnerUpdater::getTimeingsReport() {
  return m_timingReport;
}

const common::dataStructures::ArchiveFile& ArchiveRequest::AsyncJobOwnerUpdater::getArchiveFile() {
  return m_archiveFile;
}

const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getArchiveReportURL() {
  return m_archiveReportURL;
}

const std::string& ArchiveRequest::AsyncJobOwnerUpdater::getSrcURL() {
  return m_srcURL;
}

ArchiveRequest::AsyncJobSuccessfulUpdater * ArchiveRequest::asyncUpdateJobSuccessful(const uint16_t copyNumber ) { 
  std::unique_ptr<AsyncJobSuccessfulUpdater> ret(new AsyncJobSuccessfulUpdater);  
  // Passing a reference to the unique pointer led to strange behaviors.
  auto & retRef = *ret;
  ret->m_updaterCallback=
    [this,copyNumber, &retRef](const std::string &in)->std::string { 
      // We have a locked and fetched object, so we just need to work on its representation.
      serializers::ObjectHeader oh;
      oh.ParseFromString(in);
      if (oh.type() != serializers::ObjectType::ArchiveRequest_t) {
        std::stringstream err;
        err << "In ArchiveRequest::asyncUpdateJobSuccessful()::lambda(): wrong object type: " << oh.type();
        throw cta::exception::Exception(err.str());
      }
      serializers::ArchiveRequest payload;
      payload.ParseFromString(oh.payload());
      auto * jl = payload.mutable_jobs();
      for (auto j=jl->begin(); j!=jl->end(); j++) {
        if (j->copynb() == copyNumber) {
          j->set_status(serializers::ArchiveJobStatus::AJS_Complete);
          for (auto j2=jl->begin(); j2!=jl->end(); j2++) {
            if (j2->status()!= serializers::ArchiveJobStatus::AJS_Complete && 
                j2->status()!= serializers::ArchiveJobStatus::AJS_Failed) {
                retRef.m_isLastJob = false;
                oh.set_payload(payload.SerializePartialAsString());
                return oh.SerializeAsString();
            }
          }
          retRef.m_isLastJob = true;
          oh.set_payload(payload.SerializePartialAsString());
          throw cta::objectstore::Backend::AsyncUpdateWithDelete(oh.SerializeAsString());
        }
      }
      std::stringstream err;
      err << "In ArchiveRequest::asyncUpdateJobSuccessful()::lambda(): copyNb not found";
      throw cta::exception::Exception(err.str());
    };
  ret->m_backendUpdater.reset(m_objectStore.asyncUpdate(getAddressIfSet(), ret->m_updaterCallback));
  return ret.release();
}

void ArchiveRequest::AsyncJobSuccessfulUpdater::wait() {
  m_backendUpdater->wait();
}

std::string ArchiveRequest::getJobOwner(uint16_t copyNumber) {
  checkPayloadReadable();
  auto jl = m_payload.jobs();
  auto j=std::find_if(jl.begin(), jl.end(), [&](decltype(*jl.begin())& j2){ return j2.copynb() == copyNumber; });
  if (jl.end() == j)
    throw NoSuchJob("In ArchiveRequest::getJobOwner: no such job");
  return j->owner();
}


bool ArchiveRequest::finishIfNecessary(log::LogContext & lc) {
  checkPayloadWritable();
  // This function is typically called after changing the status of one job
  // in memory. If the request is complete, we will just remove it.
  // If all the jobs are either complete or failed, we can remove the request.
  auto & jl=m_payload.jobs();
  using serializers::ArchiveJobStatus;
  std::set<serializers::ArchiveJobStatus> finishedStatuses(
    {ArchiveJobStatus::AJS_Complete, ArchiveJobStatus::AJS_Failed});
  for (auto & j: jl)
    if (!finishedStatuses.count(j.status()))
      return false;
  remove();
  log::ScopedParamContainer params(lc);
  params.add("archiveRequestObject", getAddressIfSet());
  lc.log(log::INFO, "In ArchiveRequest::finishIfNecessary(): Removed completed request.");
  return true;
}

std::string ArchiveRequest::dump() {
  checkPayloadReadable();
  google::protobuf::util::JsonPrintOptions options;
  options.add_whitespace = true;
  options.always_print_primitive_fields = true;
  std::string headerDump;
  google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options);
  return headerDump;
}

}} // namespace cta::objectstore