From cadd81dfbe00abe68a8cee212467038e3d9ee859 Mon Sep 17 00:00:00 2001
From: Daniele Kruse <dkruse@cern.ch>
Date: Mon, 4 Apr 2016 15:21:21 +0200
Subject: [PATCH] adding new objectstore queues for tapes and tapepools

---
 objectstore/CMakeLists.txt    |   4 +
 objectstore/RootEntry.cpp     | 107 ++++++++++++++
 objectstore/RootEntry.hpp     |  18 +++
 objectstore/TapePoolQueue.cpp | 270 ++++++++++++++++++++++++++++++++++
 objectstore/TapePoolQueue.hpp |  99 +++++++++++++
 objectstore/TapeQueue.cpp     | 197 +++++++++++++++++++++++++
 objectstore/TapeQueue.hpp     |  69 +++++++++
 objectstore/cta.proto         |  48 +++++-
 8 files changed, 804 insertions(+), 8 deletions(-)
 create mode 100644 objectstore/TapePoolQueue.cpp
 create mode 100644 objectstore/TapePoolQueue.hpp
 create mode 100644 objectstore/TapeQueue.cpp
 create mode 100644 objectstore/TapeQueue.hpp

diff --git a/objectstore/CMakeLists.txt b/objectstore/CMakeLists.txt
index 15cd0ee1af..9972349cba 100644
--- a/objectstore/CMakeLists.txt
+++ b/objectstore/CMakeLists.txt
@@ -42,6 +42,8 @@ set (CTAProtoDependants objectstore/Agent.hpp
   objectstore/SchedulerGlobalLock.hpp
   objectstore/Tape.hpp
   objectstore/TapePool.hpp
+  objectstore/TapeQueue.hpp
+  objectstore/TapePoolQueue.hpp
   objectstore/UserIdentity.hpp)
 
 SET_SOURCE_FILES_PROPERTIES(${CTAProtoHeaders} PROPERTIES HEADER_FILE_ONLY TRUE)
@@ -57,6 +59,8 @@ add_library (ctaobjectstore SHARED
   AgentWatchdog.cpp
   TapePool.cpp
   Tape.cpp
+  TapePoolQueue.cpp
+  TapeQueue.cpp
   ArchiveToFileRequest.cpp
   ArchiveRequest.cpp
   RetrieveToFileRequest.cpp
diff --git a/objectstore/RootEntry.cpp b/objectstore/RootEntry.cpp
index 2d9e4629b6..9b671390cb 100644
--- a/objectstore/RootEntry.cpp
+++ b/objectstore/RootEntry.cpp
@@ -20,6 +20,8 @@
 #include "AgentRegister.hpp"
 #include "Agent.hpp"
 #include "TapePool.hpp"
+#include "TapePoolQueue.hpp"
+#include "TapeQueue.hpp"
 #include "DriveRegister.hpp"
 #include "GenericObject.hpp"
 #include "SchedulerGlobalLock.hpp"
@@ -572,6 +574,111 @@ auto cta::objectstore::RootEntry::dumpTapePools() -> std::list<TapePoolDump> {
   return ret;
 }
 
+// =============================================================================
+// ========== Tape pool queues manipulations =========================================
+// =============================================================================
+
+
+// This operator will be used in the following usage of the findElement
+// removeOccurences
+namespace {
+  bool operator==(const std::string &tp,
+    const cta::objectstore::serializers::TapePoolQueuePointer & tpp) {
+    return tpp.name() == tp;
+  }
+}
+
+std::string cta::objectstore::RootEntry::addOrGetTapePoolQueueAndCommit(const std::string& tapePool, Agent& agent) {
+  checkPayloadWritable();
+  // Check the tape pool does not already exist
+  try {
+    return serializers::findElement(m_payload.tapepoolqueuepointers(), tapePool).address();
+  } catch (serializers::NotFound &) {}
+  // Insert the tape pool, then its pointer, with agent intent log update
+  // First generate the intent. We expect the agent to be passed locked.
+  std::string tapePoolQueueAddress = agent.nextId("tapePoolQueue");
+  // TODO Do we expect the agent to be passed locked or not: to be clarified.
+  ScopedExclusiveLock agl(agent);
+  agent.fetch();
+  agent.addToOwnership(tapePoolQueueAddress);
+  agent.commit();
+  // Then create the tape pool object
+  TapePool tp(tapePoolQueueAddress, ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
+  tp.initialize(tapePool);
+  tp.setOwner(agent.getAddressIfSet());
+  tp.setBackupOwner("root");
+  tp.insert();
+  ScopedExclusiveLock tpl(tp);
+  // Now move the tape pool's ownership to the root entry
+  auto * tpp = m_payload.mutable_tapepoolqueuepointers()->Add();
+  tpp->set_address(tapePoolQueueAddress);
+  // We must commit here to ensure the tape pool object is referenced.
+  commit();
+  // Now update the tape pool's ownership.
+  tp.setOwner(getAddressIfSet());
+  tp.setBackupOwner(getAddressIfSet());
+  tp.commit();
+  // ... and clean up the agent
+  agent.removeFromOwnership(tapePoolQueueAddress);
+  agent.commit();
+  return tapePoolQueueAddress;
+}
+
+void cta::objectstore::RootEntry::removeTapePoolQueueAndCommit(const std::string& tapePool) {
+  checkPayloadWritable();
+  // find the address of the tape pool object
+  try {
+    auto tpp = serializers::findElement(m_payload.tapepoolqueuepointers(), tapePool);
+    // Open the tape pool object
+    TapePoolQueue tp (tpp.address(), ObjectOps<serializers::RootEntry, serializers::RootEntry_t>::m_objectStore);
+    ScopedExclusiveLock tpl(tp);
+    tp.fetch();
+    // Verify this is the tapepool we're looking for.
+    if (tp.getName() != tapePool) {
+      std::stringstream err;
+      err << "Unexpected tape pool name found in object pointed to for tape pool: "
+          << tapePool << " found: " << tp.getName();
+      throw WrongTapePoolQueue(err.str());
+    }
+    // Check the tape pool is empty
+    if (!tp.isEmpty()) {
+      throw TapePoolQueueNotEmpty ("In RootEntry::removeTapePoolQueueAndCommit: trying to "
+          "remove a non-empty tape pool");
+    }
+    // We can delete the pool
+    tp.remove();
+    // ... and remove it from our entry
+    serializers::removeOccurences(m_payload.mutable_tapepoolqueuepointers(), tapePool);
+    // We commit for safety and symmetry with the add operation
+    commit();
+  } catch (serializers::NotFound &) {
+    // No such tape pool. Nothing to to.
+    throw NoSuchTapePoolQueue("In RootEntry::removeTapePoolQueueAndCommit: trying to remove non-existing tape pool");
+  }
+}
+
+std::string cta::objectstore::RootEntry::getTapePoolQueueAddress(const std::string& tapePool) {
+  checkPayloadReadable();
+  try {
+    auto & tpp = serializers::findElement(m_payload.tapepoolqueuepointers(), tapePool);
+    return tpp.address();
+  } catch (serializers::NotFound &) {
+    throw NotAllocated("In RootEntry::getTapePoolQueueAddress: tape pool not allocated");
+  }
+}
+
+auto cta::objectstore::RootEntry::dumpTapePoolQueues() -> std::list<TapePoolQueueDump> {
+  checkPayloadReadable();
+  std::list<TapePoolQueueDump> ret;
+  auto & tpl = m_payload.tapepoolqueuepointers();
+  for (auto i = tpl.begin(); i!=tpl.end(); i++) {
+    ret.push_back(TapePoolQueueDump());
+    ret.back().address = i->address();
+    ret.back().tapePool = i->name();
+  }
+  return ret;
+}
+
 // =============================================================================
 // ================ Drive register manipulation ================================
 // =============================================================================
diff --git a/objectstore/RootEntry.hpp b/objectstore/RootEntry.hpp
index c6ed6a87b9..518b84b77e 100644
--- a/objectstore/RootEntry.hpp
+++ b/objectstore/RootEntry.hpp
@@ -155,6 +155,24 @@ public:
   };
   std::list<TapePoolDump> dumpTapePools();
   
+  // TapePoolQueue Manipulations =====================================================
+  CTA_GENERATE_EXCEPTION_CLASS(TapePoolQueueNotEmpty);
+  CTA_GENERATE_EXCEPTION_CLASS(WrongTapePoolQueue);
+  /** This function implicitly creates the tape pool structure and updates 
+   * the pointer to it. It needs to implicitly commit the object to the store. */
+  std::string addOrGetTapePoolQueueAndCommit(const std::string & tapePool, Agent & agent);
+  /** This function implicitly deletes the tape pool structure. 
+   * Fails if it not empty*/
+  CTA_GENERATE_EXCEPTION_CLASS(NoSuchTapePoolQueue);
+  void removeTapePoolQueueAndCommit(const std::string & tapePool);
+  std::string getTapePoolQueueAddress(const std::string & tapePool);
+  class TapePoolQueueDump {
+  public:
+    std::string tapePool;
+    std::string address;
+  };
+  std::list<TapePoolQueueDump> dumpTapePoolQueues();
+  
   // Drive register manipulations ==============================================
   CTA_GENERATE_EXCEPTION_CLASS(DriveRegisterNotEmpty);
   std::string getDriveRegisterAddress();  
diff --git a/objectstore/TapePoolQueue.cpp b/objectstore/TapePoolQueue.cpp
new file mode 100644
index 0000000000..8b26ef3d29
--- /dev/null
+++ b/objectstore/TapePoolQueue.cpp
@@ -0,0 +1,270 @@
+/*
+ * The CERN Tape Archive (CTA) project
+ * Copyright (C) 2015  CERN
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "TapePoolQueue.hpp"
+#include "GenericObject.hpp"
+#include "ProtocolBuffersAlgorithms.hpp"
+#include "CreationLog.hpp"
+#include "Tape.hpp"
+#include "RootEntry.hpp"
+#include <json-c/json.h>
+
+cta::objectstore::TapePoolQueue::TapePoolQueue(const std::string& address, Backend& os):
+  ObjectOps<serializers::TapePoolQueue, serializers::TapePoolQueue_t>(os, address) { }
+
+cta::objectstore::TapePoolQueue::TapePoolQueue(GenericObject& go):
+  ObjectOps<serializers::TapePoolQueue, serializers::TapePoolQueue_t>(go.objectStore()) {
+  // Here we transplant the generic object into the new object
+  go.transplantHeader(*this);
+  // And interpret the header.
+  getPayloadFromHeader();
+}
+
+std::string cta::objectstore::TapePoolQueue::dump() {  
+  checkPayloadReadable();
+  std::stringstream ret;
+  ret << "TapePoolQueue" << std::endl;
+  struct json_object * jo = json_object_new_object();
+  
+  json_object_object_add(jo, "name", json_object_new_string(m_payload.tapepoolname().c_str()));
+  json_object_object_add(jo, "ArchiveJobsTotalSize", json_object_new_int64(m_payload.archivejobstotalsize()));
+  json_object_object_add(jo, "oldestJobCreationTime", json_object_new_int64(m_payload.oldestjobcreationtime()));
+  {
+    json_object * array = json_object_new_array();
+    for (auto i = m_payload.pendingarchivejobs().begin(); i!=m_payload.pendingarchivejobs().end(); i++) {
+      json_object * jot = json_object_new_object();
+      json_object_object_add(jot, "address", json_object_new_string(i->address().c_str()));
+      json_object_object_add(jot, "copynb", json_object_new_int(i->copynb()));
+      json_object_object_add(jot, "path", json_object_new_string(i->path().c_str()));
+      json_object_object_add(jot, "size", json_object_new_int64(i->size()));
+      json_object_array_add(array, jot);
+    }
+    json_object_object_add(jo, "pendingarchivejobs", array);
+  }
+  {
+    json_object * array = json_object_new_array();
+    for (auto i = m_payload.orphanedarchivejobsnscreation().begin(); i!=m_payload.orphanedarchivejobsnscreation().end(); i++) {
+      json_object * jot = json_object_new_object();
+      json_object_object_add(jot, "address", json_object_new_string(i->address().c_str()));
+      json_object_object_add(jot, "copynb", json_object_new_int(i->copynb()));
+      json_object_object_add(jot, "path", json_object_new_string(i->path().c_str()));
+      json_object_object_add(jot, "size", json_object_new_int64(i->size()));
+      json_object_array_add(array, jot);
+    }
+    json_object_object_add(jo, "orphanedarchivejobsnscreation", array);
+  }
+  {
+    json_object * array = json_object_new_array();
+    for (auto i = m_payload.orphanedarchivejobsnsdeletion().begin(); i!=m_payload.orphanedarchivejobsnsdeletion().end(); i++) {
+      json_object * jot = json_object_new_object();
+      json_object_object_add(jot, "address", json_object_new_string(i->address().c_str()));
+      json_object_object_add(jot, "copynb", json_object_new_int(i->copynb()));
+      json_object_object_add(jot, "path", json_object_new_string(i->path().c_str()));
+      json_object_object_add(jot, "size", json_object_new_int64(i->size()));
+      json_object_array_add(array, jot);
+    }
+    json_object_object_add(jo, "orphanedarchivejobsnsdeletion", array);
+  }
+  ret << json_object_to_json_string_ext(jo, JSON_C_TO_STRING_PRETTY) << std::endl;
+  json_object_put(jo);
+  return ret.str();
+}
+
+void cta::objectstore::TapePoolQueue::initialize(const std::string& name) {
+  // Setup underlying object
+  ObjectOps<serializers::TapePoolQueue, serializers::TapePoolQueue_t>::initialize();
+  // Setup the object so it's valid
+  m_payload.set_tapepoolname(name);
+  // set the archive jobs counter to zero
+  m_payload.set_archivejobstotalsize(0);
+  m_payload.set_oldestjobcreationtime(0);
+  // This object is good to go (to storage)
+  m_payloadInterpreted = true;
+}
+
+bool cta::objectstore::TapePoolQueue::isEmpty() {
+  checkPayloadReadable();
+  // Check we have no archive jobs pending
+  if (m_payload.pendingarchivejobs_size() 
+      || m_payload.orphanedarchivejobsnscreation_size()
+      || m_payload.orphanedarchivejobsnsdeletion_size())
+    return false;
+  // If we made it to here, it seems the pool is indeed empty.
+  return true;
+}
+
+void cta::objectstore::TapePoolQueue::garbageCollect(const std::string &presumedOwner) {
+  checkPayloadWritable();
+  // If the agent is not anymore the owner of the object, then only the very
+  // last operation of the tape pool creation failed. We have nothing to do.
+  if (presumedOwner != m_header.owner())
+    return;
+  // If the owner is still the agent, there are 2 possibilities
+  // 1) The tape pool is referenced in the root entry, and then nothing is needed
+  // besides setting the tape pool's owner to the root entry's address in 
+  // order to enable its usage. Before that, it was considered as a dangling
+  // pointer.
+  {
+    RootEntry re(m_objectStore);
+    ScopedSharedLock rel (re);
+    re.fetch();
+    auto tpd=re.dumpTapePoolQueues();
+    for (auto tp=tpd.begin(); tp!=tpd.end(); tp++) {
+      if (tp->address == getAddressIfSet()) {
+        setOwner(re.getAddressIfSet());
+        commit();
+        return;
+      }
+    }
+  }
+  // 2) The tape pool is not referenced by the root entry. It is then effectively
+  // not accessible and should be discarded.
+  if (!isEmpty()) {
+    throw (NotEmpty("Trying to garbage collect a non-empty TapePoolQueue: internal error"));
+  }
+  remove();
+}
+
+void cta::objectstore::TapePoolQueue::setName(const std::string& name) {
+  checkPayloadWritable();
+  m_payload.set_tapepoolname(name);
+}
+
+std::string cta::objectstore::TapePoolQueue::getName() {
+  checkPayloadReadable();
+  return m_payload.tapepoolname();
+}
+
+void cta::objectstore::TapePoolQueue::addJob(const ArchiveRequest::JobDump& job,
+  const std::string & archiveToFileAddress, const std::string & path,
+  uint64_t size, uint64_t priority, time_t startTime) {
+  checkPayloadWritable();
+  // The tape pool gets the highest priority of its jobs
+  if (m_payload.pendingarchivejobs_size()) {
+    if ((uint64_t)startTime < m_payload.oldestjobcreationtime())
+      m_payload.set_oldestjobcreationtime(startTime);
+    m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + size);
+  } else {
+    m_payload.set_archivejobstotalsize(size);
+    m_payload.set_oldestjobcreationtime(startTime);
+  }
+  auto * j = m_payload.add_pendingarchivejobs();
+  j->set_address(archiveToFileAddress);
+  j->set_size(size);
+  j->set_path(path);
+  j->set_copynb(job.copyNb);
+}
+
+auto cta::objectstore::TapePoolQueue::getJobsSummary() -> JobsSummary {
+  checkPayloadReadable();
+  JobsSummary ret;
+  ret.files = m_payload.pendingarchivejobs_size();
+  ret.bytes = m_payload.archivejobstotalsize();
+  ret.oldestJobStartTime = m_payload.oldestjobcreationtime();
+  return ret;
+}
+
+bool cta::objectstore::TapePoolQueue::addJobIfNecessary(
+  const ArchiveRequest::JobDump& job, 
+  const std::string& archiveToFileAddress,
+  const std::string & path, uint64_t size) {
+  checkPayloadWritable();
+  auto & jl=m_payload.pendingarchivejobs();
+  for (auto j=jl.begin(); j!= jl.end(); j++) {
+    if (j->address() == archiveToFileAddress)
+      return false;
+  }
+  auto * j = m_payload.add_pendingarchivejobs();
+  j->set_address(archiveToFileAddress);
+  j->set_size(size);
+  j->set_path(path);
+  j->set_copynb(job.copyNb);
+  return true;
+}
+
+void cta::objectstore::TapePoolQueue::removeJob(const std::string& archiveToFileAddress) {
+  checkPayloadWritable();
+  auto * jl=m_payload.mutable_pendingarchivejobs();
+  bool found = false;
+  do {
+    found = false;
+    // Push the found entry all the way to the end.
+    for (size_t i=0; i<(size_t)jl->size(); i++) {
+      if (jl->Get(i).address() == archiveToFileAddress) {
+        found = true;
+        while (i+1 < (size_t)jl->size()) {
+          jl->SwapElements(i, i+1);
+          i++;
+        }
+        break;
+      }
+    }
+    // and remove it
+    if (found)
+      jl->RemoveLast();
+  } while (found);
+}
+
+auto cta::objectstore::TapePoolQueue::dumpJobs() -> std::list<JobDump> {
+  checkPayloadReadable();
+  std::list<JobDump> ret;
+  auto & jl=m_payload.pendingarchivejobs();
+  for (auto j=jl.begin(); j!=jl.end(); j++) {
+    ret.push_back(JobDump());
+    ret.back().address = j->address();
+    ret.back().size = j->size();
+    ret.back().copyNb = j->copynb();
+  }
+  return ret;
+}
+
+bool cta::objectstore::TapePoolQueue::addOrphanedJobPendingNsCreation(
+  const ArchiveRequest::JobDump& job, 
+  const std::string& archiveToFileAddress, 
+  const std::string & path,
+  uint64_t size) {
+  checkPayloadWritable();
+  auto & jl=m_payload.orphanedarchivejobsnscreation();
+  for (auto j=jl.begin(); j!= jl.end(); j++) {
+    if (j->address() == archiveToFileAddress)
+      return false;
+  }
+  auto * j = m_payload.add_orphanedarchivejobsnscreation();
+  j->set_address(archiveToFileAddress);
+  j->set_size(size);
+  j->set_path(path);
+  j->set_copynb(job.copyNb);
+  return true;
+}
+
+bool cta::objectstore::TapePoolQueue::addOrphanedJobPendingNsDeletion(
+  const ArchiveRequest::JobDump& job, 
+  const std::string& archiveToFileAddress, 
+  const std::string & path, uint64_t size) {
+  checkPayloadWritable();
+  auto & jl=m_payload.orphanedarchivejobsnsdeletion();
+  for (auto j=jl.begin(); j!= jl.end(); j++) {
+    if (j->address() == archiveToFileAddress)
+      return false;
+  }
+  auto * j = m_payload.add_orphanedarchivejobsnsdeletion();
+  j->set_address(archiveToFileAddress);
+  j->set_size(size);
+  j->set_path(path);
+  return true;
+}
\ No newline at end of file
diff --git a/objectstore/TapePoolQueue.hpp b/objectstore/TapePoolQueue.hpp
new file mode 100644
index 0000000000..2a73ea4b56
--- /dev/null
+++ b/objectstore/TapePoolQueue.hpp
@@ -0,0 +1,99 @@
+/*
+ * The CERN Tape Archive (CTA) project
+ * Copyright (C) 2015  CERN
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "Backend.hpp"
+#include "ObjectOps.hpp"
+#include <string>
+#include "objectstore/cta.pb.h"
+#include "common/CreationLog.hpp"
+#include "common/MountControl.hpp"
+#include "ArchiveToFileRequest.hpp"
+#include "ArchiveRequest.hpp"
+#include "CreationLog.hpp"
+#include "Agent.hpp"
+#include "common/archiveNS/Tape.hpp"
+
+namespace cta { namespace objectstore {
+  
+class GenericObject;
+
+class TapePoolQueue: public ObjectOps<serializers::TapePoolQueue, serializers::TapePoolQueue_t> {
+public:
+  // Constructor
+  TapePoolQueue(const std::string & address, Backend & os);
+  
+  // Upgrader form generic object
+  TapePoolQueue(GenericObject & go);
+
+  // In memory initialiser
+  void initialize(const std::string & name);
+  
+  // Set/get name
+  void setName(const std::string & name);
+  std::string getName();
+  
+  // Archive jobs management ===================================================  
+  void addJob(const ArchiveRequest::JobDump & job,
+    const std::string & archiveToFileAddress, const std::string & path,
+    uint64_t size, uint64_t priority, time_t startTime);
+  /// This version will check for existence of the job in the queue before
+  // returns true if a new job was actually inserted.
+  bool addJobIfNecessary(const ArchiveRequest::JobDump & job,
+    const std::string & archiveToFileAddress, 
+    const std::string & path, uint64_t size);
+  /// This version will check for existence of the job in the queue before
+  // returns true if a new job was actually inserted.
+  bool addOrphanedJobPendingNsCreation(const ArchiveRequest::JobDump& job,
+    const std::string& archiveToFileAddress, const std::string & path,
+    uint64_t size);
+  /// This version will check for existence of the job in the queue before
+  // returns true if a new job was actually inserted.
+  bool addOrphanedJobPendingNsDeletion(const ArchiveRequest::JobDump& job,
+    const std::string& archiveToFileAddress,
+    const std::string & path, uint64_t size);
+  
+  struct JobsSummary {
+    uint64_t files;
+    uint64_t bytes;
+    time_t oldestJobStartTime;
+    uint64_t priority;
+  };
+  JobsSummary getJobsSummary();
+  
+  void removeJob(const std::string &archiveToFileAddress);
+  class JobDump {
+  public:
+    uint64_t size;
+    std::string address;
+    uint16_t copyNb;
+  };
+  std::list<JobDump> dumpJobs();
+  
+  // Check that the tape pool is empty (of both tapes and jobs)
+  bool isEmpty();
+ 
+  CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
+  // Garbage collection
+  void garbageCollect(const std::string &presumedOwner);
+  
+  std::string dump();
+};
+  
+}}
diff --git a/objectstore/TapeQueue.cpp b/objectstore/TapeQueue.cpp
new file mode 100644
index 0000000000..5f4b3d767e
--- /dev/null
+++ b/objectstore/TapeQueue.cpp
@@ -0,0 +1,197 @@
+/*
+ * The CERN Tape Archive (CTA) project
+ * Copyright (C) 2015  CERN
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "TapeQueue.hpp"
+#include "GenericObject.hpp"
+#include "CreationLog.hpp"
+#include <json-c/json.h>
+
+cta::objectstore::TapeQueue::TapeQueue(const std::string& address, Backend& os):
+  ObjectOps<serializers::TapeQueue, serializers::TapeQueue_t>(os, address) { }
+
+cta::objectstore::TapeQueue::TapeQueue(GenericObject& go):
+  ObjectOps<serializers::TapeQueue, serializers::TapeQueue_t>(go.objectStore()){
+  // Here we transplant the generic object into the new object
+  go.transplantHeader(*this);
+  // And interpret the header.
+  getPayloadFromHeader();
+}
+
+void cta::objectstore::TapeQueue::initialize(const std::string &name, 
+    const std::string &logicallibrary, const cta::CreationLog & creationLog) {
+  ObjectOps<serializers::TapeQueue, serializers::TapeQueue_t>::initialize();
+  // Set the reguired fields
+  m_payload.set_oldestjobtime(0);
+  m_payload.set_retrievejobstotalsize(0);
+  m_payloadInterpreted = true;
+}
+
+
+bool cta::objectstore::TapeQueue::isEmpty() {
+  checkPayloadReadable();
+  return !m_payload.retrievejobs_size();
+}
+
+void cta::objectstore::TapeQueue::removeIfEmpty() {
+  checkPayloadWritable();
+  if (!isEmpty()) {
+    throw NotEmpty("In TapeQueue::removeIfEmpty: trying to remove an tape with retrieves queued");
+  }
+  remove();
+}
+
+std::string cta::objectstore::TapeQueue::getVid() {
+  checkPayloadReadable();
+  return m_payload.vid();
+}
+
+std::string cta::objectstore::TapeQueue::dump() {  
+  checkPayloadReadable();
+  std::stringstream ret;
+  ret << "TapePool" << std::endl;
+  struct json_object * jo = json_object_new_object();
+  
+  json_object_object_add(jo, "vid", json_object_new_string(m_payload.vid().c_str()));
+  json_object_object_add(jo, "retrievejobstotalsize", json_object_new_int64(m_payload.retrievejobstotalsize()));
+  json_object_object_add(jo, "oldestjobtime", json_object_new_int64(m_payload.oldestjobtime()));
+  
+  {
+    json_object * array = json_object_new_array();
+    for (auto i = m_payload.retrievejobs().begin(); i!=m_payload.retrievejobs().end(); i++) {
+      json_object * rjobs = json_object_new_object();
+      json_object_object_add(rjobs, "size", json_object_new_int64(i->size()));
+      json_object_object_add(rjobs, "address", json_object_new_string(i->address().c_str()));
+      json_object_object_add(rjobs, "copynb", json_object_new_int(i->copynb()));
+      json_object_array_add(array, rjobs);
+    }
+    json_object_object_add(jo, "retrievejobs", array);
+  }
+  
+  ret << json_object_to_json_string_ext(jo, JSON_C_TO_STRING_PRETTY) << std::endl;
+  json_object_put(jo);
+  return ret.str();
+}
+
+void cta::objectstore::TapeQueue::addJob(const RetrieveToFileRequest::JobDump& job,
+  const std::string & retrieveToFileAddress, uint64_t size, uint64_t priority,
+  time_t startTime) {
+  checkPayloadWritable();
+  // Manage the cumulative properties
+  if (m_payload.retrievejobs_size()) {
+    if (m_payload.oldestjobtime() > (uint64_t)startTime) {
+      m_payload.set_oldestjobtime(startTime);
+    }
+    m_payload.set_retrievejobstotalsize(m_payload.retrievejobstotalsize() + size);
+  } else {
+    m_payload.set_oldestjobtime(startTime);
+    m_payload.set_retrievejobstotalsize(size);
+  }
+  auto * j = m_payload.add_retrievejobs();
+  j->set_address(retrieveToFileAddress);
+  j->set_size(size);
+  j->set_copynb(job.copyNb);
+}
+
+cta::objectstore::TapeQueue::JobsSummary cta::objectstore::TapeQueue::getJobsSummary() {
+  checkPayloadReadable();
+  JobsSummary ret;
+  ret.bytes = m_payload.retrievejobstotalsize();
+  ret.files = m_payload.retrievejobs_size();
+  ret.oldestJobStartTime = m_payload.oldestjobtime();
+  return ret;
+}
+
+auto cta::objectstore::TapeQueue::dumpAndFetchRetrieveRequests() 
+  -> std::list<RetrieveRequestDump> {
+  checkPayloadReadable();
+  std::list<RetrieveRequestDump> ret;
+  auto & rjl = m_payload.retrievejobs();
+  for (auto rj=rjl.begin(); rj!=rjl.end(); rj++) {
+    try {
+      cta::objectstore::RetrieveToFileRequest rtfr(rj->address(),m_objectStore);
+      objectstore::ScopedSharedLock rtfrl(rtfr);
+      rtfr.fetch();
+      ret.push_back(RetrieveRequestDump());
+      auto & retReq = ret.back();
+      retReq.archiveFile = rtfr.getArchiveFile();
+      retReq.remoteFile = rtfr.getRemoteFile();
+      retReq.creationLog = rtfr.getCreationLog();
+      // Find the copy number from the list of jobs
+      retReq.activeCopyNb = rj->copynb();
+      auto jl = rtfr.dumpJobs();
+      for (auto j=jl.begin(); j!= jl.end(); j++) {
+        retReq.tapeCopies.push_back(TapeFileLocation());
+        auto & retJob = retReq.tapeCopies.back();
+        retJob.blockId = j->blockid;
+        retJob.copyNb = j->copyNb;
+        retJob.fSeq = j->fseq;
+        retJob.vid = j->tape;
+      }
+    } catch (cta::exception::Exception &) {}
+  }
+  return ret;
+}
+
+auto cta::objectstore::TapeQueue::dumpJobs() -> std::list<JobDump> {
+  checkPayloadReadable();
+  std::list<JobDump> ret;
+  auto & rjl = m_payload.retrievejobs();
+  for (auto rj=rjl.begin(); rj!=rjl.end(); rj++) {
+    ret.push_back(JobDump());
+    auto & b=ret.back();
+    b.copyNb = rj->copynb();
+    b.address = rj->address();
+    b.size = rj->size();
+  }
+  return ret;
+}
+
+void cta::objectstore::TapeQueue::removeJob(const std::string& retriveToFileAddress) {
+  checkPayloadWritable();
+  auto * jl = m_payload.mutable_retrievejobs();
+  bool found=false;
+  do {
+    found=false;
+    found = false;
+    // Push the found entry all the way to the end.
+    for (size_t i=0; i<(size_t)jl->size(); i++) {
+      if (jl->Get(i).address() == retriveToFileAddress) {
+        found = true;
+        while (i+1 < (size_t)jl->size()) {
+          jl->SwapElements(i, i+1);
+          i++;
+        }
+        break;
+      }
+    }
+    // and remove it
+    if (found)
+      jl->RemoveLast();
+  } while (found);
+}
+
+
+
+
+
+
+
+
+
+
+
diff --git a/objectstore/TapeQueue.hpp b/objectstore/TapeQueue.hpp
new file mode 100644
index 0000000000..098f9b16e2
--- /dev/null
+++ b/objectstore/TapeQueue.hpp
@@ -0,0 +1,69 @@
+/*
+ * The CERN Tape Archive (CTA) project
+ * Copyright (C) 2015  CERN
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "ObjectOps.hpp"
+#include "objectstore/cta.pb.h"
+#include "RetrieveToFileRequest.hpp"
+#include "common/archiveNS/TapeFileLocation.hpp"
+#include "scheduler/RetrieveRequestDump.hpp"
+
+namespace cta { namespace objectstore {
+  
+class Backend;
+class Agent;
+class GenericObject;
+
+class TapeQueue: public ObjectOps<serializers::TapeQueue, serializers::TapeQueue_t> {
+public:
+  TapeQueue(const std::string & address, Backend & os);
+  TapeQueue(GenericObject & go);
+  void initialize(const std::string & vid, const std::string &logicalLibrary, 
+    const cta::CreationLog & creationLog);
+  void garbageCollect();
+  bool isEmpty();
+  CTA_GENERATE_EXCEPTION_CLASS(NotEmpty);
+  void removeIfEmpty();
+  std::string dump();
+  
+  // Retrieve jobs management ==================================================
+  void addJob(const RetrieveToFileRequest::JobDump & job,
+    const std::string & retrieveToFileAddress, uint64_t size, uint64_t priority,
+    time_t startTime);
+  struct JobsSummary {
+    uint64_t files;
+    uint64_t bytes;
+    time_t oldestJobStartTime;
+    uint64_t priority;
+  };
+  JobsSummary getJobsSummary();
+  std::list<RetrieveRequestDump> dumpAndFetchRetrieveRequests();
+  struct JobDump {
+    std::string address;
+    uint16_t copyNb;
+    uint64_t size;
+  };
+  std::list<JobDump> dumpJobs();
+  
+  void removeJob(const std::string & retriveToFileAddress);
+  // -- Generic parameters
+  std::string getVid();
+};
+
+}}
diff --git a/objectstore/cta.proto b/objectstore/cta.proto
index b5333a3a13..9fc78ce5a1 100644
--- a/objectstore/cta.proto
+++ b/objectstore/cta.proto
@@ -14,6 +14,8 @@ enum ObjectType {
   SchedulerGlobalLock_t = 8;
   ArchiveRequest_t = 9;
   RetrieveRequest_t = 10;
+  TapePoolQueue_t = 11;
+  TapeQueue_t = 12;
   GenericObject_t = 1000;
 }
 
@@ -136,18 +138,32 @@ message SchedulerGlobalLockPointer {
   required CreationLog log = 111;
 }
 
+// Pointer to the tapepool queue
+message TapePoolQueuePointer {
+  required string address = 120;
+  required string name = 121;
+}
+
+// Pointer to the tape queue
+message TapeQueuePointer {
+  required string address = 130;
+  required string vid = 131;
+}
+
 // The root entry. This entry contains all the most static information, i.e.
 // the admin handled configuration information
 message RootEntry {
   repeated AdminHost adminhosts = 1000;
-  repeated AdminUser adminusers = 1001;
-  repeated StorageClass storageclasses = 1002;
-  repeated TapePoolPointer tapepoolpointers = 1003;
-  repeated Library libraries = 1004;
-  optional DriveRegisterPointer driveregisterpointer = 1005;
-  optional AgentRegisterPointer agentregisterpointer = 1006;
-  optional string agentregisterintent = 1007;
-  optional SchedulerGlobalLockPointer schedulerlockpointer = 1008;
+  repeated AdminUser adminusers = 1010;
+  repeated StorageClass storageclasses = 1020;
+  repeated TapePoolPointer tapepoolpointers = 1030;
+  repeated Library libraries = 1040;
+  repeated TapePoolQueuePointer tapepoolqueuepointers = 1050; //new queues by tapepool name that will replace the old archive queues (these are pointers because queues change often and are volatile and we shouldn't lock th root entry for each change)
+  repeated TapeQueuePointer tapequeuepointers = 1060; //new queues by tape vid that will replace the old retrieve queues (these are pointers because queues change often and are volatile and we shouldn't lock th root entry for each change)
+  optional DriveRegisterPointer driveregisterpointer = 1070;
+  optional AgentRegisterPointer agentregisterpointer = 1080;
+  optional string agentregisterintent = 1090;
+  optional SchedulerGlobalLockPointer schedulerlockpointer = 1100;
 }
 
 //=========== Sub-objects ======================================================
@@ -462,3 +478,19 @@ message RetrieveRequest {
   required EntryLog creationLog = 9151;
   repeated RetrieveJobEntry jobs = 9152;
 }
+
+message TapePoolQueue {
+  required string tapepoolname = 10000;
+  repeated ArchiveJobPointer pendingarchivejobs = 10010;
+  repeated ArchiveJobPointer orphanedarchivejobsnscreation = 10020;
+  repeated ArchiveJobPointer orphanedarchivejobsnsdeletion = 10030;
+  required uint64 ArchiveJobsTotalSize = 10040;
+  required uint64 oldestJobCreationTime = 10050;
+}
+
+message TapeQueue {
+  required string vid = 10100;
+  repeated RetrieveJobPointer retrievejobs = 10110;
+  required uint64 retrievejobstotalsize = 10120;
+  required uint64 oldestjobtime = 10130;
+}
\ No newline at end of file
-- 
GitLab