Skip to content
Snippets Groups Projects
RootEntry.cpp 40.18 KiB
/*
 * The CERN Tape Archive (CTA) project
 * Copyright (C) 2015  CERN
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "RootEntry.hpp"
#include "AgentRegister.hpp"
#include "Agent.hpp"
#include "AgentReference.hpp"
#include "ArchiveQueue.hpp"
#include "RetrieveQueue.hpp"
#include "DriveRegister.hpp"
#include "GenericObject.hpp"
#include "SchedulerGlobalLock.hpp"
#include "RepackIndex.hpp"
#include "RepackQueue.hpp"
#include <cxxabi.h>
#include "ProtocolBuffersAlgorithms.hpp"
#include <google/protobuf/util/json_util.h>

namespace cta { namespace objectstore {

const std::string RootEntry::address("root");

// construtor, when the backend store exists.
// Checks the existence and correctness of the root entry
RootEntry::RootEntry(Backend & os):
  ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(os, address) {}

RootEntry::RootEntry(GenericObject& go): 
  ObjectOps<serializers::RootEntry, serializers::RootEntry_t>(go.objectStore()) {
  // Here we transplant the generic object into the new object
  go.transplantHeader(*this);
  // And interpret the header.
  getPayloadFromHeader();
}

// Initialiser. This uses the base object's initialiser and sets the defaults 
// of payload.
void RootEntry::initialize() {
  ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::initialize();
  // There is nothing to do for the payload.
  m_payloadInterpreted = true;
}

bool RootEntry::isEmpty() {
  checkPayloadReadable();
  if (m_payload.has_driveregisterpointer() &&
      m_payload.driveregisterpointer().address().size())
    return false;
  if (m_payload.agentregisterintent().size())
    return false;
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size())
    return false;
  if (m_payload.has_schedulerlockpointer() &&
      m_payload.schedulerlockpointer().address().size())
    return false;
  for (auto &qt: {JobQueueType::JobsToTransferForUser, JobQueueType::JobsToReportToUser, JobQueueType::FailedJobs}) {
    if (archiveQueuePointers(qt).size())
      return false;
  }
  for (auto &qt: {JobQueueType::JobsToTransferForUser, JobQueueType::JobsToReportToUser, JobQueueType::FailedJobs, JobQueueType::JobsToReportToRepackForSuccess}) {
    if (retrieveQueuePointers(qt).size())
      return false;
  }
  return true;
}

void RootEntry::removeIfEmpty(log::LogContext & lc) {
  checkPayloadWritable();
  if (!isEmpty()) {
    throw NotEmpty("In RootEntry::removeIfEmpty(): root entry not empty");
  }
  remove();
  log::ScopedParamContainer params(lc);
  params.add("rootObjectName", getAddressIfSet());
  lc.log(log::INFO, "In RootEntry::removeIfEmpty(): removed root entry.");
}

void RootEntry::garbageCollect(const std::string& presumedOwner, AgentReference & agentReference, log::LogContext & lc,
    cta::catalogue::Catalogue & catalogue) {
  // The root entry cannot be garbage collected.
  throw ForbiddenOperation("In RootEntry::garbageCollect(): RootEntry cannot be garbage collected");
}

// =============================================================================
// ========== Queue types and helper functions =================================
// =============================================================================

const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>& RootEntry::archiveQueuePointers(JobQueueType queueType) {
  switch(queueType) {
  case JobQueueType::JobsToTransferForUser:
    return m_payload.archive_queue_to_transfer_for_user_pointers();
  case JobQueueType::JobsToReportToUser:
    return m_payload.archive_queue_to_report_for_user_pointers();
  case JobQueueType::FailedJobs:
    return m_payload.archive_queue_failed_pointers();
  case JobQueueType::JobsToTransferForRepack:
    return m_payload.archive_queue_to_transfer_for_repack_pointers();
  case JobQueueType::JobsToReportToRepackForSuccess:
    return m_payload.archive_queue_to_report_to_repack_for_success_pointers();
  case JobQueueType::JobsToReportToRepackForFailure:
    return m_payload.archive_queue_to_report_to_repack_for_failure_pointers();
  default:
    throw cta::exception::Exception("In RootEntry::archiveQueuePointers(): unknown queue type.");
  }
}

::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::ArchiveQueuePointer>* RootEntry::mutableArchiveQueuePointers(JobQueueType queueType) {
  switch(queueType) {
  case JobQueueType::JobsToTransferForUser:
    return m_payload.mutable_archive_queue_to_transfer_for_user_pointers();
  case JobQueueType::JobsToReportToUser:
    return m_payload.mutable_archive_queue_to_report_for_user_pointers();
  case JobQueueType::FailedJobs:
    return m_payload.mutable_archive_queue_failed_pointers();
  case JobQueueType::JobsToTransferForRepack:
    return m_payload.mutable_archive_queue_to_transfer_for_repack_pointers();
  case JobQueueType::JobsToReportToRepackForSuccess:
    return m_payload.mutable_archive_queue_to_report_to_repack_for_success_pointers();
  case JobQueueType::JobsToReportToRepackForFailure:
    return m_payload.mutable_archive_queue_to_report_to_repack_for_failure_pointers();
  default:
    throw cta::exception::Exception("In RootEntry::mutableArchiveQueuePointers(): unknown queue type.");
  }
}

const ::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>& RootEntry::retrieveQueuePointers(JobQueueType queueType) {
  switch(queueType) {
  case JobQueueType::JobsToTransferForUser:
    return m_payload.retrieve_queue_to_transfer_for_user_pointers();
  case JobQueueType::JobsToReportToUser:
    return m_payload.retrieve_queue_to_report_for_user_pointers();
  case JobQueueType::FailedJobs:
    return m_payload.retrieve_queue_failed_pointers();
  case JobQueueType::JobsToReportToRepackForSuccess:
     return m_payload.retrieve_queue_to_report_to_repack_for_success_pointers();
  case JobQueueType::JobsToReportToRepackForFailure:
    return m_payload.retrieve_queue_to_report_to_repack_for_failure_pointers();
  case JobQueueType::JobsToTransferForRepack:
    return m_payload.retrieve_queue_to_transfer_for_repack_pointers();
  default:
    throw cta::exception::Exception("In RootEntry::retrieveQueuePointers(): unknown queue type.");
  }
}

::google::protobuf::RepeatedPtrField<::cta::objectstore::serializers::RetrieveQueuePointer>* RootEntry::mutableRetrieveQueuePointers(JobQueueType queueType) {
  switch(queueType) {
  case JobQueueType::JobsToTransferForUser:
    return m_payload.mutable_retrieve_queue_to_transfer_for_user_pointers();
  case JobQueueType::JobsToReportToUser:
    return m_payload.mutable_retrieve_queue_to_report_for_user_pointers();
  case JobQueueType::FailedJobs:
    return m_payload.mutable_retrieve_queue_failed_pointers();
  case JobQueueType::JobsToReportToRepackForSuccess:
    return m_payload.mutable_retrieve_queue_to_report_to_repack_for_success_pointers();
  case JobQueueType::JobsToReportToRepackForFailure:
    return m_payload.mutable_retrieve_queue_to_report_to_repack_for_failure_pointers();
  case JobQueueType::JobsToTransferForRepack:
    return m_payload.mutable_retrieve_queue_to_transfer_for_repack_pointers();
  default:
    throw cta::exception::Exception("In RootEntry::mutableRetrieveQueuePointers(): unknown queue type.");
  }
}

// =============================================================================
// ========== Archive queues manipulations =====================================
// =============================================================================


// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
  bool operator==(const std::string &tp,
    const serializers::ArchiveQueuePointer & tpp) {
    return tpp.name() == tp;
  }
}

std::string RootEntry::addOrGetArchiveQueueAndCommit(const std::string& tapePool, AgentReference& agentRef, JobQueueType queueType) {
  checkPayloadWritable();
  // Check the archive queue does not already exist
  try {
    return serializers::findElement(archiveQueuePointers(queueType), tapePool).address();
  } catch (serializers::NotFound &) {}
  // Insert the archive queue pointer in the root entry, then the queue.
  std::string archiveQueueNameHeader = "ArchiveQueue";
  switch(queueType) {
  case JobQueueType::JobsToTransferForUser: archiveQueueNameHeader+="ToTransferForUser"; break;
  case JobQueueType::JobsToReportToRepackForFailure: archiveQueueNameHeader+="ToReportToRepackForSuccess"; break;
  case JobQueueType::JobsToReportToRepackForSuccess: archiveQueueNameHeader+="ToReportToRepackForSuccess"; break;
  case JobQueueType::JobsToReportToUser: archiveQueueNameHeader+="ToReportForUser"; break;
  case JobQueueType::FailedJobs: archiveQueueNameHeader+="Failed"; break;
  case JobQueueType::JobsToTransferForRepack: archiveQueueNameHeader+="ToTransferForRepack"; break;
  default: break;
  }
  std::string archiveQueueAddress = agentRef.nextId(archiveQueueNameHeader+"-"+tapePool);
  // Now move create a reference the tape pool's ownership to the root entry
  auto * tpp = mutableArchiveQueuePointers(queueType)->Add();
  tpp->set_address(archiveQueueAddress);
  tpp->set_name(tapePool);
  // We must commit here to ensure the tape pool object is referenced.
  commit();
  // Then insert the queue object
  ArchiveQueue aq(archiveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
  aq.initialize(tapePool);
  aq.setOwner(getAddressIfSet());
  aq.setBackupOwner(getAddressIfSet());
  aq.insert();
  return archiveQueueAddress;
}

void RootEntry::removeArchiveQueueAndCommit(const std::string& tapePool, JobQueueType queueType, log::LogContext & lc) {
  checkPayloadWritable();
  // find the address of the archive queue object
  try {
    auto aqp = serializers::findElement(archiveQueuePointers(queueType), tapePool);
    // Open the tape pool object
    ArchiveQueue aq (aqp.address(), m_objectStore);
    ScopedExclusiveLock aql;
    try {
      // Give a slight grace period to avoid live locks (seen in CI: live lock between creation and deletion of empty queues).
      std::this_thread::sleep_for (std::chrono::milliseconds(100));
      aql.lock(aq);
      aq.fetch();
    } catch (cta::exception::Exception & ex) {
      // The archive queue seems to not be there. Make sure this is the case:
      if (aq.exists()) {
        // We failed to access the queue, yet it is present. This is an error.
        // Let the exception pass through.
        throw;
      } else {
        // The queue object is already gone. We can skip to removing the 
        // reference from the RootEntry
        goto deleteFromRootEntry;
      }
    }
    // Verify this is the archive queue we're looking for.
    if (aq.getTapePool() != tapePool) {
      std::stringstream err;
      err << "In RootEntry::removeArchiveQueueAndCommit(): Unexpected tape pool name found in archive queue pointed to for tape pool: "
          << tapePool << " found: " << aq.getTapePool();
      throw WrongArchiveQueue(err.str());
    }
    // Check the archive queue is empty
    if (!aq.isEmpty()) {
      throw ArchiveQueueNotEmpty ("In RootEntry::removeArchiveQueueAndCommit(): trying to "
          "remove a non-empty archive queue");
    }
    // We can delete the queue
    aq.remove();
    {
      log::ScopedParamContainer params(lc);
      params.add("archiveQueueObject", aq.getAddressIfSet());
      lc.log(log::INFO, "In RootEntry::removeArchiveQueueAndCommit(): removed archive queue.");
    }
  deleteFromRootEntry:
    // ... and remove it from our entry
    serializers::removeOccurences(mutableArchiveQueuePointers(queueType), tapePool);
    // We commit for safety and symmetry with the add operation
    commit();
    {
      log::ScopedParamContainer params(lc);
      params.add("tapePool", tapePool)
            .add("queueType", toString(queueType));
      lc.log(log::INFO, "In RootEntry::removeArchiveQueueAndCommit(): removed archive queue reference.");
    }
  } catch (serializers::NotFound &) {
    // No such tape pool. Nothing to to.
    throw NoSuchArchiveQueue("In RootEntry::removeArchiveQueueAndCommit(): trying to remove non-existing archive queue");
  }
}

void RootEntry::removeMissingArchiveQueueReference(const std::string& tapePool, JobQueueType queueType) {
  serializers::removeOccurences(mutableArchiveQueuePointers(queueType), tapePool);
}

std::string RootEntry::getArchiveQueueAddress(const std::string& tapePool, JobQueueType queueType) {
  checkPayloadReadable();
  try {
    auto & tpp = serializers::findElement(archiveQueuePointers(queueType), tapePool);
    return tpp.address();
  } catch (serializers::NotFound &) {
    throw NoSuchArchiveQueue("In RootEntry::getArchiveQueueAddress: archive queue not allocated");
  }
}

auto RootEntry::dumpArchiveQueues(JobQueueType queueType) -> std::list<ArchiveQueueDump> {
  checkPayloadReadable();
  std::list<ArchiveQueueDump> ret;
  auto & tpl = archiveQueuePointers(queueType);
  for (auto i = tpl.begin(); i!=tpl.end(); i++) {
    ret.push_back(ArchiveQueueDump());
    ret.back().address = i->address();
    ret.back().tapePool = i->name();
  }
  return ret;
}

// =============================================================================
// ========== Retrieve queues manipulations ====================================
// =============================================================================

// This operator will be used in the following usage of the findElement
// removeOccurences
namespace {
  bool operator==(const std::string &vid,
    const serializers::RetrieveQueuePointer & tpp) {
    return tpp.vid() == vid;
  }
}

std::string RootEntry::addOrGetRetrieveQueueAndCommit(const std::string& vid, AgentReference& agentRef, JobQueueType queueType) {
  checkPayloadWritable();
  // Check the retrieve queue does not already exist
  try {
    return serializers::findElement(retrieveQueuePointers(queueType), vid).address();
  } catch (serializers::NotFound &) {}
  // Insert the retrieve queue, then its pointer, with agent intent log update
  // First generate the intent. We expect the agent to be passed locked.
  // The make of the vid in the object name will be handy.
  std::string retrieveQueueNameHeader = "RetrieveQueue";
  switch(queueType) {
  case JobQueueType::JobsToTransferForUser: retrieveQueueNameHeader+="ToTransferForUser"; break;
  case JobQueueType::JobsToReportToUser: retrieveQueueNameHeader+="ToReportForUser"; break;
  case JobQueueType::FailedJobs: retrieveQueueNameHeader+="Failed"; break;
  case JobQueueType::JobsToReportToRepackForSuccess: retrieveQueueNameHeader+="ToReportToRepackForSuccess"; break;
  case JobQueueType::JobsToReportToRepackForFailure: retrieveQueueNameHeader+="ToReportToRepackForFailure"; break;
  case JobQueueType::JobsToTransferForRepack: retrieveQueueNameHeader+="ToTransferForRepack"; break;
  default: break;
  }
  std::string retrieveQueueAddress = agentRef.nextId(retrieveQueueNameHeader+"-"+vid);
  // Reference the queue to the root entry before creation
  auto * rqp = mutableRetrieveQueuePointers(queueType)->Add();
  rqp->set_address(retrieveQueueAddress);
  rqp->set_vid(vid);
  // We must commit here to ensure the tape pool object is referenced.
  commit();
  // Then create the tape pool queue object
  RetrieveQueue rq(retrieveQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
  rq.initialize(vid);
  rq.setOwner(getAddressIfSet());
  rq.setBackupOwner("root");
  rq.insert();
  return retrieveQueueAddress;
}

void RootEntry::removeMissingRetrieveQueueReference(const std::string& vid, JobQueueType queueType) {
  serializers::removeOccurences(mutableRetrieveQueuePointers(queueType), vid);
}

void RootEntry::removeRetrieveQueueAndCommit(const std::string& vid, JobQueueType queueType, log::LogContext & lc) {
  checkPayloadWritable();
  // find the address of the retrieve queue object
  try {
    auto rqp=serializers::findElement(retrieveQueuePointers(queueType), vid);
    // Open the retrieve queue object
    RetrieveQueue rq(rqp.address(), m_objectStore);
    ScopedExclusiveLock rql;
    try {
      // Give a slight grace period to avoid live locks (seen in CI: live lock between creation and deletion of empty queues).
      std::this_thread::sleep_for (std::chrono::milliseconds(100));
      rql.lock(rq);
      rq.fetch();
    } catch (cta::exception::Exception & ex) {
      // The retrieve queue seems to not be there. Make sure this is the case:
      if (rq.exists()) {
        // We failed to access the queue, yet it is present. This is an error.
        // Let the exception pass through.
        throw;
      } else {
        // The queue object is already gone. We can skip to removing the 
        // reference from the RootEntry
        goto deleteFromRootEntry;
      }
    }
    // Verify this is the retrieve queue we're looking for.
    if (rq.getVid() != vid) {
      std::stringstream err;
      err << "Unexpected vid found in retrieve queue pointed to for vid: "
          << vid << " found: " << rq.getVid();
      throw WrongRetrieveQueue(err.str());
    }
    // Check the retrieve queue is empty
    if (!rq.isEmpty()) {
      throw RetrieveQueueNotEmpty("In RootEntry::removeTapePoolQueueAndCommit: trying to "
          "remove a non-empty tape pool");
    }
    // We can now delete the queue
    rq.remove();
    {
      log::ScopedParamContainer params(lc);
      params.add("retrieveQueueObject", rq.getAddressIfSet());
      lc.log(log::INFO, "In RootEntry::removeRetrieveQueueAndCommit(): removed retrieve queue.");
    }
  deleteFromRootEntry:
    // ... and remove it from our entry
    serializers::removeOccurences(mutableRetrieveQueuePointers(queueType), vid);
    // We commit for safety and symmetry with the add operation
    commit();
    {
      log::ScopedParamContainer params(lc);
      params.add("tapeVid", vid)
            .add("queueType", toString(queueType));
      lc.log(log::INFO, "In RootEntry::removeRetrieveQueueAndCommit(): removed retrieve queue reference.");
    }
  } catch (serializers::NotFound &) {
    // No such tape pool. Nothing to to.
    throw NoSuchRetrieveQueue("In RootEntry::removeRetrieveQueueAndCommit: trying to remove non-existing retrieve queue");
  }
}


std::string RootEntry::getRetrieveQueueAddress(const std::string& vid, JobQueueType queueType) {
  checkPayloadReadable();
  try {
    auto & rqp = serializers::findElement(retrieveQueuePointers(queueType), vid);
    return rqp.address();
  } catch (serializers::NotFound &) {
    throw NoSuchRetrieveQueue(std::string("In RootEntry::getRetreveQueueAddress: retrieve queue not allocated ")+
        vid+"/"+toString(queueType));
  }
}

auto RootEntry::dumpRetrieveQueues(JobQueueType queueType) -> std::list<RetrieveQueueDump> {
  checkPayloadReadable();
  std::list<RetrieveQueueDump> ret;
  auto & tpl = retrieveQueuePointers(queueType);
  for (auto i = tpl.begin(); i!=tpl.end(); i++) {
    ret.push_back(RetrieveQueueDump());
    ret.back().address = i->address();
    ret.back().vid = i->vid();
  }
  return ret;
}

// =============================================================================
// ================ Drive register manipulation ================================
// =============================================================================

std::string RootEntry::addOrGetDriveRegisterPointerAndCommit(
  AgentReference& agentRef, const EntryLogSerDeser & log) {
  checkPayloadWritable();
  // Check if the drive register exists
  try {
    return getDriveRegisterAddress();
  } catch (NotAllocated &) {
    // decide on the object's name and add to agent's intent.
    std::string drAddress (agentRef.nextId("DriveRegister"));
    agentRef.addToOwnership(drAddress, m_objectStore);
    // Then create the drive register object
    DriveRegister dr(drAddress, m_objectStore);
    dr.initialize();
    dr.setOwner(agentRef.getAgentAddress());
    dr.setBackupOwner(getAddressIfSet());
    dr.insert();
    // Take a lock on drive registry
    ScopedExclusiveLock drLock(dr);
    // Move drive registry ownership to the root entry
    auto * mdrp = m_payload.mutable_driveregisterpointer();
    mdrp->set_address(drAddress);
    log.serialize(*mdrp->mutable_log());
    commit();
    // Record completion in drive registry
    dr.setOwner(getAddressIfSet());
    dr.setBackupOwner(getAddressIfSet());
    dr.commit();
    //... and clean up the agent
    agentRef.removeFromOwnership(drAddress, m_objectStore);
    return drAddress;
  }
}

void RootEntry::removeDriveRegisterAndCommit(log::LogContext & lc) {
  checkPayloadWritable();
  // Get the address of the drive register (nothing to do if there is none)
  if (!m_payload.has_driveregisterpointer() || 
      !m_payload.driveregisterpointer().address().size())
    return;
  std::string drAddr = m_payload.driveregisterpointer().address();
  DriveRegister dr(drAddr, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
  ScopedExclusiveLock drl(dr);
  dr.fetch();
  // Check the drive register is empty
  if (!dr.isEmpty()) {
    throw DriveRegisterNotEmpty("In RootEntry::removeDriveRegisterAndCommit: "
      "trying to remove a non-empty drive register");
  }
  // we can delete the drive register
  dr.remove();
  log::ScopedParamContainer params(lc);
  params.add("driveRegisterObject", dr.getAddressIfSet());
  lc.log(log::INFO, "In RootEntry::removeDriveRegisterAndCommit(): removed drive register.");
  // And update the root entry
  m_payload.mutable_driveregisterpointer()->set_address("");
  // We commit for safety and symmetry with the add operation
  commit();
}

std::string RootEntry::getDriveRegisterAddress() {
  checkPayloadReadable();
  if (m_payload.has_driveregisterpointer() && 
      m_payload.driveregisterpointer().address().size()) {
    return m_payload.driveregisterpointer().address();
  }
  throw NotAllocated("In RootEntry::getDriveRegisterAddress: drive register not allocated");
}


// =============================================================================
// ================ Agent register manipulation ================================
// =============================================================================
// Get the name of the agent register (or exception if not available)
std::string RootEntry::getAgentRegisterAddress() {
  checkPayloadReadable();
  // If the registry is defined, return it, job done.
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size())
    return m_payload.agentregisterpointer().address();
  throw NotAllocated("In RootEntry::getAgentRegister: agentRegister not yet allocated");
}

// Get the name of a (possibly freshly created) agent register
std::string RootEntry::addOrGetAgentRegisterPointerAndCommit(AgentReference& agentRef,
  const EntryLogSerDeser & log, log::LogContext & lc) {
  // Check if the agent register exists
  try {
    return getAgentRegisterAddress();
  } catch (NotAllocated &) {
    // If we get here, the agent register is not created yet, so we have to do it:
    // lock the entry again, for writing. We take the lock ourselves if needed
    // This will make an autonomous transaction
    checkPayloadWritable();
    fetch();
    if (m_payload.has_agentregisterpointer() &&
        m_payload.agentregisterpointer().address().size()) {
      return m_payload.agentregisterpointer().address();
    }
    // decide on the object's name
    std::string arAddress (agentRef.nextId("AgentRegister"));
    // Record the agent registry in our own intent
    addIntendedAgentRegistry(arAddress, lc);
    commit();
    // Create the agent registry
    AgentRegister ar(arAddress, m_objectStore);
    ar.initialize();
    // There is no garbage collection for an agent registry: if it is not
    // plugged to the root entry, it does not exist.
    ar.setOwner("");
    ar.setBackupOwner("");
    ar.insert();
    // Take a lock on agent registry
    ScopedExclusiveLock arLock(ar);
    // Move agent registry from intent to official
    auto * marp = m_payload.mutable_agentregisterpointer();
    marp->set_address(arAddress);
    log.serialize(*marp->mutable_log());
    m_payload.set_agentregisterintent("");
    commit();
    // Record completion in agent registry
    ar.setOwner(getAddressIfSet());
    ar.setBackupOwner(getAddressIfSet());
    ar.commit();
    // And we are done. Release locks
    arLock.release();
    return arAddress;
  }
}

void RootEntry::removeAgentRegisterAndCommit(log::LogContext & lc) {
  checkPayloadWritable();
  // Check that we do have an agent register set. Cleanup a potential intent as
  // well
  if (m_payload.agentregisterintent().size()) {
    AgentRegister iar(m_payload.agentregisterintent(),
      ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
    ScopedExclusiveLock iarl(iar);
    // An agent register only referenced in the intent should not be used
    // and hence empty. We'll see that.
    iar.fetch();
    if (!iar.isEmpty()) {
      throw AgentRegisterNotEmpty("In RootEntry::removeAgentRegister: found "
        "a non-empty intended agent register. Internal error.");
    }
    iar.remove();
    log::ScopedParamContainer params(lc);
    params.add("agentRegisterObject", iar.getAddressIfSet());
    lc.log(log::INFO, "In RootEntry::removeAgentRegisterAndCommit(): removed agent register");
    m_payload.set_agentregisterintent("");
    commit();
  }
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size()) {
    AgentRegister ar(m_payload.agentregisterpointer().address(),
      ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
    ScopedExclusiveLock arl(ar);
    ar.fetch();
    if (!ar.isEmpty()) {
      throw AgentRegisterNotEmpty("In RootEntry::removeAgentRegister: the agent "
        "register is not empty. Cannot remove.");
    }
    ar.remove();
    log::ScopedParamContainer params(lc);
    params.add("agentRegisterObject", ar.getAddressIfSet());
    lc.log(log::INFO, "In RootEntry::removeAgentRegisterAndCommit(): removed agent register.");
    m_payload.mutable_agentregisterpointer()->set_address("");
    commit();
  }
}

void RootEntry::addIntendedAgentRegistry(const std::string& address, log::LogContext & lc) {
  checkPayloadWritable();
  // We are supposed to have only one intended agent registry at a time.
  // If we got the lock and there is one entry, this means the previous
  // attempt to create one did not succeed.
  // When getting here, having a set pointer to the registry is an error.
  if (m_payload.has_agentregisterpointer() &&
      m_payload.agentregisterpointer().address().size()) {
    throw exception::Exception("In RootEntry::addIntendedAgentRegistry:"
        " pointer to registry already set");
  }
  if (m_payload.agentregisterintent().size()) {
    // The intended object might not have made it to the official pointer.
    // If it did, we just clean up the intent.
    // If it did not, we clean up the object if present, clean up the intent
    // and replace it with the new one.
    // We do not recycle the object, as the state is doubtful.
    if (ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore.exists(
      m_payload.agentregisterintent())) {
      AgentRegister iar(m_payload.agentregisterintent(),
        ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
      iar.fetch();
      if (!iar.isEmpty()) {
        throw AgentRegisterNotEmpty("In RootEntry::addIntendedAgentRegistry, "
          "found a non-empty intended agent register. Internal Error.");
      }
      iar.remove();
      log::ScopedParamContainer params (lc);
      params.add("agentRegisterObject", iar.getAddressIfSet());
      lc.log(log::INFO, "In RootEntry::addIntendedAgentRegistry(): removed agent register.");
    }
  }
  m_payload.set_agentregisterintent(address);
}

// =============================================================================
// ================ Scheduler global lock manipulation =========================
// =============================================================================

std::string RootEntry::getSchedulerGlobalLock() {
  checkPayloadReadable();
  // If the scheduler lock is defined, return it, job done.
  if (m_payload.has_schedulerlockpointer() &&
      m_payload.schedulerlockpointer().address().size())
    return m_payload.schedulerlockpointer().address();
  throw NotAllocated("In RootEntry::getAgentRegister: scheduler global lock not yet allocated");
}

// Get the name of a (possibly freshly created) scheduler global lock
std::string RootEntry::addOrGetSchedulerGlobalLockAndCommit(AgentReference& agentRef,
  const EntryLogSerDeser & log) {
  checkPayloadWritable();
  // Check if the drive register exists
  try {
    return getSchedulerGlobalLock();
  } catch (NotAllocated &) {
    // decide on the object's name and add to agent's intent.
    std::string sglAddress (agentRef.nextId("SchedulerGlobalLock"));
    agentRef.addToOwnership(sglAddress, m_objectStore);
    // Then create the drive register object
    SchedulerGlobalLock sgl(sglAddress, m_objectStore);
    sgl.initialize();
    sgl.setOwner(agentRef.getAgentAddress());
    sgl.setBackupOwner(getAddressIfSet());
    sgl.insert();
    // Take a lock on scheduler global lock
    ScopedExclusiveLock sglLock(sgl);
    // Move drive registry ownership to the root entry
    auto * msgl = m_payload.mutable_schedulerlockpointer();
    msgl->set_address(sglAddress);
    log.serialize(*msgl->mutable_log());
    commit();
    // Record completion in scheduler global lock
    sgl.setOwner(getAddressIfSet());
    sgl.setBackupOwner(getAddressIfSet());
    sgl.commit();
    //... and clean up the agent
    agentRef.removeFromOwnership(sglAddress, m_objectStore);
    return sglAddress;
  }
}

void RootEntry::removeSchedulerGlobalLockAndCommit(log::LogContext & lc) {
  checkPayloadWritable();
  // Get the address of the scheduler lock (nothing to do if there is none)
  if (!m_payload.has_schedulerlockpointer() ||
      !m_payload.schedulerlockpointer().address().size())
    return;
  std::string sglAddress = m_payload.schedulerlockpointer().address();
  SchedulerGlobalLock sgl(sglAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
  ScopedExclusiveLock sgll(sgl);
  sgl.fetch();
  // Check the drive register is empty
  if (!sgl.isEmpty()) {
    throw DriveRegisterNotEmpty("In RootEntry::removeSchedulerGlobalLockAndCommit: "
      "trying to remove a non-empty scheduler global lock");
  }
  // we can delete the drive register
  sgl.remove();
  log::ScopedParamContainer params(lc);
  params.add("schedulerGlobalLockObject", sgl.getAddressIfSet());
  lc.log(log::INFO, "In RootEntry::removeSchedulerGlobalLockAndCommit(): removed scheduler global lock object.");
  // And update the root entry
  m_payload.mutable_schedulerlockpointer()->set_address("");
  // We commit for safety and symmetry with the add operation
  commit();
}

// =============================================================================
// ================ Repack index manipulation ==================================
// =============================================================================

std::string RootEntry::getRepackIndexAddress() {
  checkPayloadReadable();
  if (m_payload.has_repackindexpointer() &&
      m_payload.repackindexpointer().address().size()) {
    return m_payload.repackindexpointer().address();
  }
  throw cta::exception::Exception("In RootEntry::getRepackIndexAddress: repack tape register not yet allocated");
}

std::string RootEntry::addOrGetRepackIndexAndCommit(AgentReference& agentRef) {
  checkPayloadWritable();
  // Check if the repack tape register exists
  try {
    return getRepackIndexAddress();
  } catch (cta::exception::Exception &) {
    // TODO: this insertion method is much simpler than the ones used for other objects.
    // It implies the only dangling pointer situation we can get is the one where
    // the object does not exist.
    // As the object never changes ownership, the garbage collection can be left
    // empty. There should never be garbage collection for this object type.
    //
    // decide on the object's name.
    std::string rtrAddress (agentRef.nextId("RepackIndex"));
    // Then prepare the repack tape register object
    RepackIndex ri(rtrAddress, m_objectStore);
    ri.initialize();
    ri.setOwner(getAddressIfSet());
    ri.setBackupOwner(getAddressIfSet());
    // Reference the registry in the root entry
    auto * rtrp = m_payload.mutable_repackindexpointer();
    rtrp->set_address(rtrAddress);
    commit();
    // Create the repack tape register
    ri.insert();
    // done.
    return rtrAddress;
  } 
}

void RootEntry::removeRepackIndexAndCommit(log::LogContext& lc) {
  checkPayloadWritable();
  // Get the address of the scheduler lock (nothing to do if there is none)
  if (!m_payload.has_repackindexpointer() ||
      !m_payload.repackindexpointer().address().size())
    return;
  std::string rtrAddress = m_payload.repackindexpointer().address();
  RepackIndex ri(rtrAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
  ScopedExclusiveLock rtrl(ri);
  ri.fetch();
  // Check the drive register is empty
  if (!ri.isEmpty()) {
    throw DriveRegisterNotEmpty("In RootEntry::removeRepackIndexAndCommit(): "
      "trying to remove a non-empty repack tape register");
  }
  // we can delete the drive register
  ri.remove();
  log::ScopedParamContainer params(lc);
  params.add("repackIndex", ri.getAddressIfSet());
  lc.log(log::INFO, "In RootEntry::removeRepackIndexAndCommit(): removed repack tape register object.");
  // And update the root entry
  m_payload.mutable_schedulerlockpointer()->set_address("");
  // We commit for safety and symmetry with the add operation
  commit();
}

// =============================================================================
// ================ Repack index manipulation ==================================
// =============================================================================

std::string RootEntry::getRepackQueueAddress(RepackQueueType queueType) {
  checkPayloadReadable();
  switch (queueType) {
  case RepackQueueType::Pending:
    if (!m_payload.has_repackrequestspendingqueuepointer())
      throw NoSuchRepackQueue("In RootEntry::getRepackQueueAddress: pending queue no set.");
    return m_payload.repackrequestspendingqueuepointer().address();
  case RepackQueueType::ToExpand:
    if (!m_payload.has_repackrequeststoexpandqueuepointer())
      throw NoSuchRepackQueue("In RootEntry::getRepackQueueAddress: toExpand queue not set.");
    return m_payload.repackrequeststoexpandqueuepointer().address();
  }
  throw cta::exception::Exception("In RootEntry::getRepackQueueAddress(): unexptected queue type.");
}

void RootEntry::clearRepackQueueAddress(RepackQueueType queueType) {
  checkPayloadWritable();
  switch (queueType) {
  case RepackQueueType::Pending:
    if (!m_payload.has_repackrequestspendingqueuepointer())
      throw NoSuchRepackQueue("In RootEntry::clearRepackQueueAddress: pending queue no set.");
    return m_payload.mutable_repackrequestspendingqueuepointer()->Clear();
  case RepackQueueType::ToExpand:
    if (!m_payload.has_repackrequeststoexpandqueuepointer())
      throw NoSuchRepackQueue("In RootEntry::clearRepackQueueAddress: toExpand queue not set.");
    return m_payload.mutable_repackrequeststoexpandqueuepointer()->Clear();
  }
  throw cta::exception::Exception("In RootEntry::clearRepackQueueAddress(): unexptected queue type.");
}

void RootEntry::removeRepackQueueAndCommit(RepackQueueType queueType, log::LogContext& lc) {
  checkPayloadWritable();
  // find the address of the repack queue object
  try {
    bool hasQueue;
    switch (queueType) {
    case RepackQueueType::Pending:
      hasQueue = m_payload.has_repackrequestspendingqueuepointer();
      break;
    case RepackQueueType::ToExpand:
      hasQueue = m_payload.has_repackrequeststoexpandqueuepointer();
    }
    if (!hasQueue) {
      throw NoSuchRepackQueue("In RootEntry::removeRepackQueueAndCommit: trying to remove non-existing repack queue");
    }
    std::string queueAddress;
    switch (queueType) {
    case RepackQueueType::Pending:
      queueAddress = m_payload.repackrequestspendingqueuepointer().address();
      break;
    case RepackQueueType::ToExpand:
      queueAddress = m_payload.repackrequeststoexpandqueuepointer().address();
    }
    // Open the repack queue object
    RepackQueue rq(queueAddress, m_objectStore);
    ScopedExclusiveLock rql;
    try {
      rql.lock(rq);
      rq.fetch();
    } catch (cta::exception::Exception & ex) {
      // The repack queue seems to not be there. Make sure this is the case:
      if (rq.exists()) {
        // We failed to access the queue, yet it is present. This is an error.
        // Let the exception pass through.
        throw;
      } else {
        // The queue object is already gone. We can skip to removing the 
        // reference from the RootEntry
        goto deleteFromRootEntry;
      }
    }
    // Check the repack queue is empty
    if (!rq.isEmpty()) {
      throw RepackQueueNotEmpty("In RootEntry::removeRepackQueueAndCommit: trying to "
          "remove a non-empty tape pool");
    }
    // We can now delete the queue
    rq.remove();
    {
      log::ScopedParamContainer params(lc);
      params.add("repackQueueObject", rq.getAddressIfSet())
            .add("queueType", toString(queueType));
      lc.log(log::INFO, "In, RootEntry::removeRepackQueueAndCommit(): removed retrieve queue.");
    }
  deleteFromRootEntry:
    // ... and remove it from our entry
    switch (queueType) {
    case RepackQueueType::Pending:
      m_payload.clear_repackrequestspendingqueuepointer();
      break;
    case RepackQueueType::ToExpand:
      m_payload.clear_repackrequeststoexpandqueuepointer();
    }
    commit();
    {
      log::ScopedParamContainer params(lc);
      params.add("queueType", toString(queueType));
      lc.log(log::INFO, "In RootEntry::removeRetrieveQueueAndCommit(): removed retrieve queue reference.");
    }
  } catch (serializers::NotFound &) {
    // No such tape pool. Nothing to to.
    throw NoSuchRetrieveQueue("In RootEntry::addOrGetRetrieveQueueAndCommit: trying to remove non-existing retrieve queue");
  }
}
std::string RootEntry::addOrGetRepackQueueAndCommit(AgentReference& agentRef, RepackQueueType queueType) {
  checkPayloadWritable();
  // Check the repack queue does not already exist
  try {
    return getRepackQueueAddress(queueType);
  } catch (NoSuchRepackQueue &) {}
  // The queue is not there yet. Create it.
  // Insert the archive queue pointer in the root entry, then the queue.
  std::string repackQueueNameHeader = "RepackQueue";
  switch(queueType) {
  case RepackQueueType::Pending: repackQueueNameHeader+="Pending"; break;
  case RepackQueueType::ToExpand: repackQueueNameHeader+="ToExpand"; break;
  default: break;
  }
  std::string repackQueueAddress = agentRef.nextId(repackQueueNameHeader);
  // Now move create a reference in the root entry
  switch(queueType) {
  case RepackQueueType::Pending:
    m_payload.mutable_repackrequestspendingqueuepointer()->set_address(repackQueueAddress); 
    break;
  case RepackQueueType::ToExpand:
    m_payload.mutable_repackrequeststoexpandqueuepointer()->set_address(repackQueueAddress);
    break;
  }
  // We must commit here to ensure the repack queue is referenced.
  commit();
  // Then insert the queue object
  RepackQueue rq(repackQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
  rq.initialize();
  rq.setOwner(getAddressIfSet());
  rq.setBackupOwner(getAddressIfSet());
  rq.insert();
  return repackQueueAddress;
}



// =============================================================================
// ================ Dump =======================================================
// =============================================================================

// Dump the root entry
std::string RootEntry::dump () {
  checkPayloadReadable();
  google::protobuf::util::JsonPrintOptions options;
  options.add_whitespace = true;
  options.always_print_primitive_fields = true;
  std::string headerDump;
  google::protobuf::util::MessageToJsonString(m_payload, &headerDump, options);
  return headerDump;
}

}} // namespace cta::objectstore