From f3f1f81669e05ee32445ba5cd8e4cd301f6008c0 Mon Sep 17 00:00:00 2001
From: Eric Cano <Eric.Cano@cern.ch>
Date: Wed, 22 May 2019 18:04:49 +0200
Subject: [PATCH] #471 Implemented support for activities in the catalogue.

---
 catalogue/Catalogue.hpp                       |  13 +
 catalogue/CatalogueRetryWrapper.hpp           |  21 +-
 catalogue/CatalogueTest.cpp                   | 207 ++++++++++++-
 catalogue/DummyCatalogue.hpp                  |   6 +-
 catalogue/RdbmsCatalogue.cpp                  | 274 +++++++++++++++++-
 catalogue/RdbmsCatalogue.hpp                  |  35 +++
 catalogue/common_catalogue_schema.sql         |  12 +
 common/CMakeLists.txt                         |   1 +
 .../RetrieveFileQueueCriteria.hpp             |   6 +
 common/dataStructures/RetrieveRequest.hpp     |   3 +
 common/exception/Exception.hpp                |   6 +-
 common/exception/UserError.hpp                |   2 +
 .../orchestration/tests/client_ar.sh          |   8 +-
 scheduler/Scheduler.cpp                       |   2 +-
 xroot_plugins/XrdSsiCtaRequestMessage.cpp     |   5 +
 15 files changed, 581 insertions(+), 20 deletions(-)

diff --git a/catalogue/Catalogue.hpp b/catalogue/Catalogue.hpp
index e3caf2ed1c..872da4f7b5 100644
--- a/catalogue/Catalogue.hpp
+++ b/catalogue/Catalogue.hpp
@@ -175,6 +175,8 @@ public:
    * @param user The user for whom the file is to be retrieved.  This will be
    * used by the Catalogue to determine the mount policy to be used when
    * retrieving the file.
+   * @param activity The activity under which the user wants to start the retrieve
+   * The call will fail if the activity is set and unknown. 
    * @param lc The log context.
    *
    * @return The information required to queue the associated retrieve request(s).
@@ -183,6 +185,7 @@ public:
     const std::string &diskInstanceName,
     const uint64_t archiveFileId,
     const common::dataStructures::UserIdentity &user,
+    const optional<std::string> & activity,
     log::LogContext &lc) = 0;
 
   /**
@@ -481,6 +484,13 @@ public:
   virtual void modifyMountPolicyMaxDrivesAllowed(const common::dataStructures::SecurityIdentity &admin, const std::string &name, const uint64_t maxDrivesAllowed) = 0;
   virtual void modifyMountPolicyComment(const common::dataStructures::SecurityIdentity &admin, const std::string &name, const std::string &comment) = 0;
 
+  virtual void createActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity &admin, const std::string & diskInstanceName, const std::string & acttivity,
+    double weight, const std::string & comment) = 0;
+  virtual void modifyActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity &admin, const std::string & diskInstanceName, const std::string & acttivity,
+    double weight, const std::string & comment) = 0;
+  virtual void deleteActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity &admin, const std::string & diskInstanceName, const std::string & acttivity) = 0;
+  virtual std::list<common::dataStructures::ActivitiesFairShareWeights> getActivitiesFairShareWeights() const = 0;
+  
   /**
    * Returns the specified archive files.  Please note that the list of files
    * is ordered by archive file ID.
@@ -594,5 +604,8 @@ public:
 
 }; // class Catalogue
 
+CTA_GENERATE_USER_EXCEPTION_CLASS(UserSpecifiedAnEmptyStringActivity);
+CTA_GENERATE_USER_EXCEPTION_CLASS(UserSpecifiedAnOutOfRangeActivityWeight);
+
 } // namespace catalogue
 } // namespace cta
diff --git a/catalogue/CatalogueRetryWrapper.hpp b/catalogue/CatalogueRetryWrapper.hpp
index 65d98eaf85..e627748b1c 100644
--- a/catalogue/CatalogueRetryWrapper.hpp
+++ b/catalogue/CatalogueRetryWrapper.hpp
@@ -89,8 +89,8 @@ public:
     return retryOnLostConnection(m_log, [&]{return m_catalogue->tapeMountedForArchive(vid, drive);}, m_maxTriesToConnect);
   }
 
-  common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string &diskInstanceName, const uint64_t archiveFileId, const common::dataStructures::UserIdentity &user, log::LogContext &lc) override {
-    return retryOnLostConnection(m_log, [&]{return m_catalogue->prepareToRetrieveFile(diskInstanceName, archiveFileId, user, lc);}, m_maxTriesToConnect);
+  common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string& diskInstanceName, const uint64_t archiveFileId, const common::dataStructures::UserIdentity& user, const optional<std::string>& activity, log::LogContext& lc) override {
+    return retryOnLostConnection(m_log, [&]{return m_catalogue->prepareToRetrieveFile(diskInstanceName, archiveFileId, user, activity, lc);}, m_maxTriesToConnect);
   }
 
   void tapeMountedForRetrieve(const std::string &vid, const std::string &drive) override {
@@ -344,6 +344,23 @@ public:
   void modifyMountPolicyComment(const common::dataStructures::SecurityIdentity &admin, const std::string &name, const std::string &comment) override {
     return retryOnLostConnection(m_log, [&]{return m_catalogue->modifyMountPolicyComment(admin, name, comment);}, m_maxTriesToConnect);
   }
+  
+  void createActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& acttivity, double weight, const std::string & comment) override {
+    return retryOnLostConnection(m_log, [&]{return m_catalogue->createActivitiesFairShareWeight(admin, diskInstanceName, acttivity, weight, comment);}, m_maxTriesToConnect);
+  }
+  
+  void modifyActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& acttivity, double weight, const std::string & comment) override {
+    return retryOnLostConnection(m_log, [&]{return m_catalogue->modifyActivitiesFairShareWeight(admin, diskInstanceName, acttivity, weight, comment);}, m_maxTriesToConnect);
+  }
+  
+  void deleteActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& acttivity) override {
+    return retryOnLostConnection(m_log, [&]{return m_catalogue->deleteActivitiesFairShareWeight(admin, diskInstanceName, acttivity);}, m_maxTriesToConnect);
+  }
+  
+  std::list<common::dataStructures::ActivitiesFairShareWeights> getActivitiesFairShareWeights() const override {
+    return retryOnLostConnection(m_log, [&]{return m_catalogue->getActivitiesFairShareWeights();}, m_maxTriesToConnect);
+  }
+
 
   ArchiveFileItor getArchiveFilesItor(const TapeFileSearchCriteria &searchCriteria = TapeFileSearchCriteria()) const override {
     return retryOnLostConnection(m_log, [&]{return m_catalogue->getArchiveFilesItor(searchCriteria);}, m_maxTriesToConnect);
diff --git a/catalogue/CatalogueTest.cpp b/catalogue/CatalogueTest.cpp
index 7e98967bea..1b01310b6c 100644
--- a/catalogue/CatalogueTest.cpp
+++ b/catalogue/CatalogueTest.cpp
@@ -7138,7 +7138,7 @@ TEST_P(cta_catalogue_CatalogueTest, prepareToRetrieveFileUsingArchiveFileId) {
   userIdentity.name = requesterName;
   userIdentity.group = "group";
   const common::dataStructures::RetrieveFileQueueCriteria queueCriteria =
-    m_catalogue->prepareToRetrieveFile(diskInstanceName1, archiveFileId, userIdentity, dummyLc);
+    m_catalogue->prepareToRetrieveFile(diskInstanceName1, archiveFileId, userIdentity, cta::nullopt, dummyLc);
 
   ASSERT_EQ(2, queueCriteria.archiveFile.tapeFiles.size());
   ASSERT_EQ(archivePriority, queueCriteria.mountPolicy.archivePriority);
@@ -7146,7 +7146,7 @@ TEST_P(cta_catalogue_CatalogueTest, prepareToRetrieveFileUsingArchiveFileId) {
   ASSERT_EQ(maxDrivesAllowed, queueCriteria.mountPolicy.maxDrivesAllowed);
 
   // Check that the diskInstanceName mismatch detection works
-  ASSERT_THROW(m_catalogue->prepareToRetrieveFile(diskInstanceName2, archiveFileId, userIdentity, dummyLc),
+  ASSERT_THROW(m_catalogue->prepareToRetrieveFile(diskInstanceName2, archiveFileId, userIdentity, cta::nullopt, dummyLc),
     exception::UserError);
 }
 
@@ -7401,7 +7401,7 @@ TEST_P(cta_catalogue_CatalogueTest, prepareToRetrieveFileUsingArchiveFileId_disa
 
   {
     const common::dataStructures::RetrieveFileQueueCriteria queueCriteria =
-      m_catalogue->prepareToRetrieveFile(diskInstanceName1, archiveFileId, userIdentity, dummyLc);
+      m_catalogue->prepareToRetrieveFile(diskInstanceName1, archiveFileId, userIdentity, cta::nullopt, dummyLc);
 
     ASSERT_EQ(archivePriority, queueCriteria.mountPolicy.archivePriority);
     ASSERT_EQ(minArchiveRequestAge, queueCriteria.mountPolicy.archiveMinRequestAge);
@@ -7435,7 +7435,7 @@ TEST_P(cta_catalogue_CatalogueTest, prepareToRetrieveFileUsingArchiveFileId_disa
 
   {
     const common::dataStructures::RetrieveFileQueueCriteria queueCriteria =
-      m_catalogue->prepareToRetrieveFile(diskInstanceName1, archiveFileId, userIdentity, dummyLc);
+      m_catalogue->prepareToRetrieveFile(diskInstanceName1, archiveFileId, userIdentity, cta::nullopt, dummyLc);
 
     ASSERT_EQ(archivePriority, queueCriteria.mountPolicy.archivePriority);
     ASSERT_EQ(minArchiveRequestAge, queueCriteria.mountPolicy.archiveMinRequestAge);
@@ -7456,7 +7456,7 @@ TEST_P(cta_catalogue_CatalogueTest, prepareToRetrieveFileUsingArchiveFileId_disa
 
   m_catalogue->setTapeDisabled(m_admin, vid2, true);
 
-  ASSERT_THROW(m_catalogue->prepareToRetrieveFile(diskInstanceName1, archiveFileId, userIdentity, dummyLc),
+  ASSERT_THROW(m_catalogue->prepareToRetrieveFile(diskInstanceName1, archiveFileId, userIdentity, cta::nullopt, dummyLc),
     exception::UserError);
 }
 
@@ -12151,6 +12151,203 @@ TEST_P(cta_catalogue_CatalogueTest, reclaimTape_full_lastFSeq_1_one_tape_file_su
   }
 }
 
+TEST_P(cta_catalogue_CatalogueTest, exist_non_superseded_files_after_fseq) {
+  using namespace cta;
+
+  const std::string diskInstanceName1 = "disk_instance_1";
+
+  ASSERT_TRUE(m_catalogue->getTapes().empty());
+
+  const std::string vid1 = "VID123";
+  const std::string vid2 = "VID234";
+  const std::string mediaType = "media_type";
+  const std::string vendor = "vendor";
+  const std::string logicalLibraryName = "logical_library_name";
+  const bool logicalLibraryIsDisabled= false;
+  const std::string tapePoolName = "tape_pool_name";
+  const std::string vo = "vo";
+  const uint64_t nbPartialTapes = 2;
+  const bool isEncrypted = true;
+  const cta::optional<std::string> supply("value for the supply pool mechanism");
+  const uint64_t capacityInBytes = (uint64_t)10 * 1000 * 1000 * 1000 * 1000;
+  const bool disabledValue = true;
+  const bool fullValue = false;
+  const std::string createTapeComment = "Create tape";
+  
+  m_catalogue->createLogicalLibrary(m_admin, logicalLibraryName, logicalLibraryIsDisabled, "Create logical library");
+  m_catalogue->createTapePool(m_admin, tapePoolName, vo, nbPartialTapes, isEncrypted, supply, "Create tape pool");
+  m_catalogue->createTape(m_admin, vid1, mediaType, vendor, logicalLibraryName, tapePoolName, capacityInBytes,
+    disabledValue, fullValue, createTapeComment);
+  
+  //A tape with no tape file have no files after FSeq 0
+  ASSERT_FALSE(m_catalogue->existNonSupersededFilesAfterFSeqAndDeleteTapeFilesForWriting(vid1,0));
+  
+  const uint64_t archiveFileId = 1234;
+
+  ASSERT_FALSE(m_catalogue->getArchiveFilesItor().hasMore());
+  ASSERT_THROW(m_catalogue->getArchiveFileById(archiveFileId), exception::Exception);
+
+  common::dataStructures::StorageClass storageClass;
+  storageClass.diskInstance = diskInstanceName1;
+  storageClass.name = "storage_class";
+  storageClass.nbCopies = 1;
+  storageClass.comment = "Create storage class";
+  m_catalogue->createStorageClass(m_admin, storageClass);
+
+  /*
+   * Insert a file in the tape vid1
+   */
+  {
+    const uint64_t archiveFileSize = 1;
+    const std::string tapeDrive = "tape_drive";
+    const std::string checksumType = "checksum_type";
+    const std::string checksumValue = "checksum_value";
+
+    auto file1WrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
+    auto & file1Written = *file1WrittenUP;
+    std::set<cta::catalogue::TapeItemWrittenPointer> file1WrittenSet;
+    file1WrittenSet.insert(file1WrittenUP.release());
+    file1Written.archiveFileId        = archiveFileId;
+    file1Written.diskInstance         = storageClass.diskInstance;
+    file1Written.diskFileId           = "5678";
+    file1Written.diskFilePath         = "/public_dir/public_file";
+    file1Written.diskFileUser         = "public_disk_user";
+    file1Written.diskFileGroup        = "public_disk_group";
+    file1Written.size                 = archiveFileSize;
+    file1Written.checksumType         = checksumType;
+    file1Written.checksumValue        = checksumValue;
+    file1Written.storageClassName     = storageClass.name;
+    file1Written.vid                  = vid1;
+    file1Written.fSeq                 = 1;
+    file1Written.blockId              = 4321;
+    file1Written.compressedSize       = 1;
+    file1Written.copyNb               = 1;
+    file1Written.tapeDrive            = tapeDrive;
+    m_catalogue->filesWrittenToTape(file1WrittenSet);
+  }
+  //One file written : this file is not superseded by another one, existNonSupersededFilesAfterFSeq = true
+  ASSERT_TRUE(m_catalogue->existNonSupersededFilesAfterFSeqAndDeleteTapeFilesForWriting(vid1,0));
+  //No file after the only file inserted, existNonSupersededFilesAfterFseq = false
+  ASSERT_FALSE(m_catalogue->existNonSupersededFilesAfterFSeqAndDeleteTapeFilesForWriting(vid1,1));
+  
+  //Insert another file in another tape that will supersed the first one in vid1
+  {
+    m_catalogue->createTape(m_admin, vid2, mediaType, vendor, logicalLibraryName, tapePoolName, capacityInBytes,
+    disabledValue, fullValue, createTapeComment);
+    const uint64_t archiveFileSize = 1;
+    const std::string tapeDrive = "tape_drive";
+    const std::string checksumType = "checksum_type";
+    const std::string checksumValue = "checksum_value";
+
+    auto file1WrittenUP=cta::make_unique<cta::catalogue::TapeFileWritten>();
+    auto & file1Written = *file1WrittenUP;
+    std::set<cta::catalogue::TapeItemWrittenPointer> file1WrittenSet;
+    file1WrittenSet.insert(file1WrittenUP.release());
+    file1Written.archiveFileId        = archiveFileId;
+    file1Written.diskInstance         = storageClass.diskInstance;
+    file1Written.diskFileId           = "5678";
+    file1Written.diskFilePath         = "/public_dir/public_file";
+    file1Written.diskFileUser         = "public_disk_user";
+    file1Written.diskFileGroup        = "public_disk_group";
+    file1Written.size                 = archiveFileSize;
+    file1Written.checksumType         = checksumType;
+    file1Written.checksumValue        = checksumValue;
+    file1Written.storageClassName     = storageClass.name;
+    file1Written.vid                  = vid2;
+    file1Written.fSeq                 = 1;
+    file1Written.blockId              = 4321;
+    file1Written.compressedSize       = 1;
+    file1Written.copyNb               = 1;
+    file1Written.tapeDrive            = tapeDrive;
+    m_catalogue->filesWrittenToTape(file1WrittenSet);
+  }
+  //The tape files written to tape vid2 are not superseded by any file, but the tape files in vid1 
+  //are superseded by the tape files in vid2
+  ASSERT_FALSE(m_catalogue->existNonSupersededFilesAfterFSeqAndDeleteTapeFilesForWriting(vid1,0));
+  ASSERT_TRUE(m_catalogue->existNonSupersededFilesAfterFSeqAndDeleteTapeFilesForWriting(vid2,0));
+}
+
+TEST_P(cta_catalogue_CatalogueTest, createModifyDeleteActivityWeight) {
+  using namespace cta;
+
+  const std::string diskInstanceName = "ExperimentEOS";
+  const std::string activity1 = "Reco";  
+  const std::string activity2 = "Grid";
+  const double weight1 = 0.654;
+  const double weight2 = 0.456;
+  const std::string comment = "No comment.";
+
+  m_catalogue->createActivitiesFairShareWeight(m_admin, diskInstanceName, activity1, weight1, comment);
+      
+  const auto activitiesList = m_catalogue->getActivitiesFairShareWeights();
+      
+  ASSERT_EQ(1, activitiesList.size());
+  ASSERT_EQ(1, activitiesList.front().activitiesWeights.size());
+  ASSERT_NO_THROW(activitiesList.front().activitiesWeights.at(activity1));
+  ASSERT_EQ(weight1, activitiesList.front().activitiesWeights.at(activity1));
+
+  m_catalogue->createActivitiesFairShareWeight(m_admin, diskInstanceName, activity2, weight2, comment);
+  
+  const auto activitiesList2 = m_catalogue->getActivitiesFairShareWeights();
+  
+  ASSERT_EQ(1, activitiesList2.size());
+  ASSERT_EQ(2, activitiesList2.front().activitiesWeights.size());
+  ASSERT_NO_THROW(activitiesList2.front().activitiesWeights.at(activity1));
+  ASSERT_EQ(weight1, activitiesList2.front().activitiesWeights.at(activity1));
+  ASSERT_NO_THROW(activitiesList2.front().activitiesWeights.at(activity2));
+  ASSERT_EQ(weight2, activitiesList2.front().activitiesWeights.at(activity2));
+  
+  ASSERT_THROW(m_catalogue->modifyActivitiesFairShareWeight(m_admin, "NoSuchInstance", activity2, weight2, comment), cta::exception::UserError);
+  ASSERT_THROW(m_catalogue->modifyActivitiesFairShareWeight(m_admin, diskInstanceName, "NoSuchActivity", weight2, comment), cta::exception::UserError);
+  
+  ASSERT_NO_THROW(m_catalogue->modifyActivitiesFairShareWeight(m_admin, diskInstanceName, activity1, weight2, comment));
+  ASSERT_NO_THROW(m_catalogue->modifyActivitiesFairShareWeight(m_admin, diskInstanceName, activity2, weight1, comment));
+  
+  
+  const auto activitiesList3 = m_catalogue->getActivitiesFairShareWeights();
+  
+  ASSERT_EQ(1, activitiesList3.size());
+  ASSERT_EQ(2, activitiesList3.front().activitiesWeights.size());
+  ASSERT_NO_THROW(activitiesList3.front().activitiesWeights.at(activity1));
+  ASSERT_EQ(weight2, activitiesList3.front().activitiesWeights.at(activity1));
+  ASSERT_NO_THROW(activitiesList3.front().activitiesWeights.at(activity2));
+  ASSERT_EQ(weight1, activitiesList3.front().activitiesWeights.at(activity2));
+  
+  ASSERT_THROW(m_catalogue->deleteActivitiesFairShareWeight(m_admin, "NoSuchInstance", activity2), cta::exception::UserError);
+  ASSERT_THROW(m_catalogue->deleteActivitiesFairShareWeight(m_admin, diskInstanceName, "NoSuchActivity"), cta::exception::UserError);
+  
+  ASSERT_NO_THROW(m_catalogue->deleteActivitiesFairShareWeight(m_admin, diskInstanceName, activity1));
+  
+  const auto activitiesList4 = m_catalogue->getActivitiesFairShareWeights();
+      
+  ASSERT_EQ(1, activitiesList4.size());
+  ASSERT_EQ(1, activitiesList4.front().activitiesWeights.size());
+  ASSERT_NO_THROW(activitiesList4.front().activitiesWeights.at(activity2));
+  ASSERT_EQ(weight1, activitiesList4.front().activitiesWeights.at(activity2));
+  
+  ASSERT_NO_THROW(m_catalogue->deleteActivitiesFairShareWeight(m_admin, diskInstanceName, activity2));
+  
+  ASSERT_EQ(0, m_catalogue->getActivitiesFairShareWeights().size());
+}
+
+TEST_P(cta_catalogue_CatalogueTest, activitiesDataValidation) {
+  using namespace cta;
+  ASSERT_THROW(m_catalogue->createActivitiesFairShareWeight(m_admin, "", "Activity", 0.1, "No comment."), catalogue::UserSpecifiedAnEmptyStringDiskInstanceName);
+  ASSERT_THROW(m_catalogue->createActivitiesFairShareWeight(m_admin, "DiskInstance", "", 0.1, "No comment."), catalogue::UserSpecifiedAnEmptyStringActivity);
+  ASSERT_THROW(m_catalogue->createActivitiesFairShareWeight(m_admin, "DiskInstance", "Activity", 0.0, "No comment."), catalogue::UserSpecifiedAnOutOfRangeActivityWeight);
+  ASSERT_THROW(m_catalogue->createActivitiesFairShareWeight(m_admin, "DiskInstance", "Activity", 1.1, "No comment."), catalogue::UserSpecifiedAnOutOfRangeActivityWeight);
+  ASSERT_THROW(m_catalogue->createActivitiesFairShareWeight(m_admin, "DiskInstance", "Activity", 0.1, ""), catalogue::UserSpecifiedAnEmptyStringComment);
+  
+  ASSERT_THROW(m_catalogue->modifyActivitiesFairShareWeight(m_admin, "", "Activity", 0.1, "No comment."), catalogue::UserSpecifiedAnEmptyStringDiskInstanceName);
+  ASSERT_THROW(m_catalogue->modifyActivitiesFairShareWeight(m_admin, "DiskInstance", "", 0.1, "No comment."), catalogue::UserSpecifiedAnEmptyStringActivity);
+  ASSERT_THROW(m_catalogue->modifyActivitiesFairShareWeight(m_admin, "DiskInstance", "Activity", 0.0, "No comment."), catalogue::UserSpecifiedAnOutOfRangeActivityWeight);
+  ASSERT_THROW(m_catalogue->modifyActivitiesFairShareWeight(m_admin, "DiskInstance", "Activity", 1.1, "No comment."), catalogue::UserSpecifiedAnOutOfRangeActivityWeight);
+  ASSERT_THROW(m_catalogue->modifyActivitiesFairShareWeight(m_admin, "DiskInstance", "Activity", 0.1, ""), catalogue::UserSpecifiedAnEmptyStringComment);
+  
+  ASSERT_THROW(m_catalogue->deleteActivitiesFairShareWeight(m_admin, "", "Activity"), catalogue::UserSpecifiedAnEmptyStringDiskInstanceName);
+  ASSERT_THROW(m_catalogue->deleteActivitiesFairShareWeight(m_admin, "DiskInstance", ""), catalogue::UserSpecifiedAnEmptyStringActivity);
+}
+
 TEST_P(cta_catalogue_CatalogueTest, ping) {
   using namespace cta;
 
diff --git a/catalogue/DummyCatalogue.hpp b/catalogue/DummyCatalogue.hpp
index 5e83c60901..a6980d3ca1 100644
--- a/catalogue/DummyCatalogue.hpp
+++ b/catalogue/DummyCatalogue.hpp
@@ -34,6 +34,7 @@ public:
   DummyCatalogue() {}
   virtual ~DummyCatalogue() { }
 
+  void createActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& acttivity, double weight, const std::string & comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void createAdminUser(const common::dataStructures::SecurityIdentity& admin, const std::string& username, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void createArchiveRoute(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& storageClassName, const uint32_t copyNb, const std::string& tapePoolName, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void createLogicalLibrary(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const bool isDisabled, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
@@ -43,6 +44,7 @@ public:
   void createStorageClass(const common::dataStructures::SecurityIdentity& admin, const common::dataStructures::StorageClass& storageClass) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void createTape(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const std::string &mediaType, const std::string &vendor, const std::string& logicalLibraryName, const std::string& tapePoolName, const uint64_t capacityInBytes, const bool disabled, const bool full, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void createTapePool(const common::dataStructures::SecurityIdentity& admin, const std::string& name, const std::string & vo, const uint64_t nbPartialTapes, const bool encryptionValue, const cta::optional<std::string> &supply, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
+  void deleteActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& acttivity) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void deleteAdminUser(const std::string& username) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void deleteArchiveFile(const std::string& instanceName, const uint64_t archiveFileId, log::LogContext &lc) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void deleteArchiveRoute(const std::string& diskInstanceName, const std::string& storageClassName, const uint32_t copyNb) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
@@ -54,6 +56,7 @@ public:
   void deleteTape(const std::string& vid) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void deleteTapePool(const std::string& name) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void filesWrittenToTape(const std::set<TapeItemWrittenPointer>& event) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
+  std::list<common::dataStructures::ActivitiesFairShareWeights> getActivitiesFairShareWeights() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   std::list<common::dataStructures::AdminUser> getAdminUsers() const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   common::dataStructures::ArchiveFile getArchiveFileById(const uint64_t id) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   ArchiveFileItor getArchiveFilesItor(const TapeFileSearchCriteria& searchCriteria) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
@@ -72,6 +75,7 @@ public:
   common::dataStructures::VidToTapeMap getAllTapes() const override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   std::list<TapeForWriting> getTapesForWriting(const std::string& logicalLibraryName) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   bool isAdmin(const common::dataStructures::SecurityIdentity& admin) const { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
+  void modifyActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& acttivity, double weight, const std::string & comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void modifyAdminUserComment(const common::dataStructures::SecurityIdentity& admin, const std::string& username, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void modifyArchiveRouteComment(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint32_t copyNb, const std::string& comment) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void modifyArchiveRouteTapePoolName(const common::dataStructures::SecurityIdentity& admin, const std::string& instanceName, const std::string& storageClassName, const uint32_t copyNb, const std::string& tapePoolName) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
@@ -105,7 +109,7 @@ public:
   uint64_t checkAndGetNextArchiveFileId(const std::string &diskInstanceName, const std::string &storageClassName, const common::dataStructures::UserIdentity &user) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   common::dataStructures::ArchiveFileQueueCriteria getArchiveFileQueueCriteria(const std::string &diskInstanceName,
     const std::string &storageClassName, const common::dataStructures::UserIdentity &user) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
-  common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string& instanceName, const uint64_t archiveFileId, const common::dataStructures::UserIdentity& user, log::LogContext &lc) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
+  common::dataStructures::RetrieveFileQueueCriteria prepareToRetrieveFile(const std::string& diskInstanceName, const uint64_t archiveFileId, const common::dataStructures::UserIdentity& user, const optional<std::string>& activity, log::LogContext& lc) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void reclaimTape(const common::dataStructures::SecurityIdentity& admin, const std::string& vid) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void setTapeDisabled(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const bool disabledValue) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
   void setTapeFull(const common::dataStructures::SecurityIdentity& admin, const std::string& vid, const bool fullValue) override { throw exception::Exception(std::string("In ")+__PRETTY_FUNCTION__+": not implemented"); }
diff --git a/catalogue/RdbmsCatalogue.cpp b/catalogue/RdbmsCatalogue.cpp
index fc5424be8c..47f98fdeb3 100644
--- a/catalogue/RdbmsCatalogue.cpp
+++ b/catalogue/RdbmsCatalogue.cpp
@@ -69,8 +69,8 @@ RdbmsCatalogue::RdbmsCatalogue(
   m_groupMountPolicyCache(10),
   m_userMountPolicyCache(10),
   m_expectedNbArchiveRoutesCache(10),
-  m_isAdminCache(10) {
-}
+  m_isAdminCache(10),
+  m_activitiesFairShareWeights(10) {}
 
 //------------------------------------------------------------------------------
 // destructor
@@ -3974,6 +3974,223 @@ void RdbmsCatalogue::modifyMountPolicyComment(const common::dataStructures::Secu
   }
 }
 
+//------------------------------------------------------------------------------
+// createActivitiesFairShareWeight
+//------------------------------------------------------------------------------
+void RdbmsCatalogue::createActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, 
+    const std::string& diskInstanceName, const std::string& activity, double weight, const std::string & comment) {
+  try {
+    if (diskInstanceName.empty()) {
+      throw UserSpecifiedAnEmptyStringDiskInstanceName("Cannot create activity weight because the disk instance name is"
+        " an empty string");
+    }
+    
+    if (activity.empty()) {
+      throw UserSpecifiedAnEmptyStringActivity("Cannot create activity weight because the activity name is"
+        " an empty string");
+    }
+    
+    if (weight <= 0 || weight > 1) {
+      throw UserSpecifiedAnOutOfRangeActivityWeight("Cannot create activity because the activity weight is out of ]0, 1] range.");
+    }
+    
+    if (comment.empty()) {
+      throw UserSpecifiedAnEmptyStringComment("Cannot create activity weight because the comment is"
+        " an empty string");
+    }
+    
+    const time_t now = time(nullptr);
+    const char *const sql =
+      "INSERT INTO ACTIVITIES_WEIGHTS ("
+        "DISK_INSTANCE_NAME,"
+        "ACTIVITY,"
+        "WEIGHT,"
+    
+        "USER_COMMENT,"
+
+        "CREATION_LOG_USER_NAME,"
+        "CREATION_LOG_HOST_NAME,"
+        "CREATION_LOG_TIME,"
+
+        "LAST_UPDATE_USER_NAME,"
+        "LAST_UPDATE_HOST_NAME,"
+        "LAST_UPDATE_TIME)"
+    
+      "VALUES ("
+        ":DISK_INSTANCE_NAME,"
+        ":ACTIVITY,"
+        ":WEIGHT,"
+
+        ":USER_COMMENT,"
+
+        ":CREATION_LOG_USER_NAME,"
+        ":CREATION_LOG_HOST_NAME,"
+        ":CREATION_LOG_TIME,"
+
+        ":LAST_UPDATE_USER_NAME,"
+        ":LAST_UPDATE_HOST_NAME,"
+        ":LAST_UPDATE_TIME)";
+    auto conn = m_connPool.getConn();
+    auto stmt = conn.createStmt(sql);
+    stmt.bindString(":DISK_INSTANCE_NAME", diskInstanceName);
+    stmt.bindString(":ACTIVITY", activity);
+    stmt.bindString(":WEIGHT", std::to_string(weight));
+    
+    stmt.bindString(":USER_COMMENT", comment);
+
+    stmt.bindString(":CREATION_LOG_USER_NAME", admin.username);
+    stmt.bindString(":CREATION_LOG_HOST_NAME", admin.host);
+    stmt.bindUint64(":CREATION_LOG_TIME", now);
+
+    stmt.bindString(":LAST_UPDATE_USER_NAME", admin.username);
+    stmt.bindString(":LAST_UPDATE_HOST_NAME", admin.host);
+    stmt.bindUint64(":LAST_UPDATE_TIME", now);
+    
+    stmt.executeNonQuery();
+
+    conn.commit();
+  } catch(exception::UserError &) {
+    throw;
+  } catch(exception::Exception &ex) {
+    ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str());
+    throw;
+  }
+}
+
+//------------------------------------------------------------------------------
+// modifyActivitiesFairShareWeight
+//------------------------------------------------------------------------------
+void RdbmsCatalogue::modifyActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& activity, double weight, const std::string& comment) {
+  try {
+    if (diskInstanceName.empty()) {
+      throw UserSpecifiedAnEmptyStringDiskInstanceName("Cannot create activity weight because the disk instance name is"
+        " an empty string");
+    }
+    
+    if (activity.empty()) {
+      throw UserSpecifiedAnEmptyStringActivity("Cannot create activity weight because the activity name is"
+        " an empty string");
+    }
+    
+    if (weight <= 0 || weight > 1) {
+      throw UserSpecifiedAnOutOfRangeActivityWeight("Cannot create activity because the activity weight is out of ]0, 1] range.");
+    }
+    
+    if (comment.empty()) {
+      throw UserSpecifiedAnEmptyStringComment("Cannot modify activity weight because the comment is"
+        " an empty string");
+    }
+    
+    const time_t now = time(nullptr);
+    const char *const sql =
+      "UPDATE ACTIVITIES_WEIGHTS SET "
+        "WEIGHT = :WEIGHT,"
+        "LAST_UPDATE_USER_NAME = :LAST_UPDATE_USER_NAME,"
+        "LAST_UPDATE_HOST_NAME = :LAST_UPDATE_HOST_NAME,"
+        "LAST_UPDATE_TIME = :LAST_UPDATE_TIME,"
+        "USER_COMMENT = :USER_COMMENT "
+      "WHERE "
+        "DISK_INSTANCE_NAME = :DISK_INSTANCE_NAME AND "
+        "ACTIVITY = :ACTIVITY";
+    auto conn = m_connPool.getConn();
+    auto stmt = conn.createStmt(sql);
+    stmt.bindString(":DISK_INSTANCE_NAME", diskInstanceName);
+    stmt.bindString(":ACTIVITY", activity);
+    stmt.bindString(":WEIGHT", std::to_string(weight));
+    
+    stmt.bindString(":USER_COMMENT", comment);
+    stmt.bindString(":LAST_UPDATE_USER_NAME", admin.username);
+    stmt.bindString(":LAST_UPDATE_HOST_NAME", admin.host);
+    stmt.bindUint64(":LAST_UPDATE_TIME", now);
+    stmt.executeNonQuery();
+
+    if(0 == stmt.getNbAffectedRows()) {
+      throw exception::UserError(std::string("Cannot modify activity fair share weight ") + activity + " because it does not exist");
+    }
+  } catch(exception::UserError &) {
+    throw;
+  } catch(exception::Exception &ex) {
+    ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str());
+    throw;
+  }
+}
+
+
+//------------------------------------------------------------------------------
+// deleteActivitiesFairShareWeight
+//------------------------------------------------------------------------------
+void RdbmsCatalogue::deleteActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity& admin, const std::string& diskInstanceName, const std::string& activity) {
+  try {
+    if (diskInstanceName.empty()) {
+      throw UserSpecifiedAnEmptyStringDiskInstanceName("Cannot create activity weight because the disk instance name is"
+        " an empty string");
+    }
+    
+    if (activity.empty()) {
+      throw UserSpecifiedAnEmptyStringActivity("Cannot create activity weight because the activity name is"
+        " an empty string");
+    }
+    
+    const char *const sql = "DELETE FROM ACTIVITIES_WEIGHTS WHERE DISK_INSTANCE_NAME = :DISK_INSTANCE_NAME AND ACTIVITY = :ACTIVITY";
+    auto conn = m_connPool.getConn();
+    auto stmt = conn.createStmt(sql);
+    stmt.bindString(":DISK_INSTANCE_NAME", diskInstanceName);
+    stmt.bindString(":ACTIVITY", activity);
+    stmt.executeNonQuery();
+
+    if(0 == stmt.getNbAffectedRows()) {
+      throw exception::UserError(std::string("Cannot delete activity weight ") + activity + " because it does not exist");
+    }
+  } catch(exception::UserError &) {
+    throw;
+  } catch(exception::Exception &ex) {
+    ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str());
+    throw;
+  }
+}
+
+//------------------------------------------------------------------------------
+// getActivitiesFairShareWeights
+//------------------------------------------------------------------------------
+std::list<common::dataStructures::ActivitiesFairShareWeights> RdbmsCatalogue::getActivitiesFairShareWeights() const {
+  try {
+    std::string sql =
+      "SELECT "
+        "ACTIVITIES_WEIGHTS.DISK_INSTANCE_NAME AS DISK_INSTANCE_NAME,"
+        "ACTIVITIES_WEIGHTS.ACTIVITY AS ACTIVITY,"
+        "ACTIVITIES_WEIGHTS.WEIGHT AS WEIGHT "
+      "FROM "
+        "ACTIVITIES_WEIGHTS";
+
+    auto conn = m_connPool.getConn();
+    auto stmt = conn.createStmt(sql);
+    auto rset = stmt.executeQuery();
+
+    std::map<std::string, common::dataStructures::ActivitiesFairShareWeights> activitiesMap;
+    while(rset.next()) {
+      common::dataStructures::ActivitiesFairShareWeights * activity;
+      auto diskInstanceName = rset.columnString("DISK_INSTANCE_NAME");
+      try {
+        activity = & activitiesMap.at(diskInstanceName);
+      } catch (std::out_of_range) {
+        activity = & activitiesMap[diskInstanceName];
+        activity->diskInstance = diskInstanceName;
+      }
+      activity->setWeightFromString(rset.columnString("ACTIVITY"), rset.columnString("WEIGHT"));
+    }
+    std::list<common::dataStructures::ActivitiesFairShareWeights> ret;
+    for (auto & dia: activitiesMap) {
+      ret.push_back(dia.second);
+    }
+    return ret;
+  } catch(exception::UserError &) {
+    throw;
+  } catch(exception::Exception &ex) {
+    ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str());
+    throw;
+  }
+}
+
 //------------------------------------------------------------------------------
 // insertArchiveFile
 //------------------------------------------------------------------------------
@@ -4694,6 +4911,7 @@ common::dataStructures::RetrieveFileQueueCriteria RdbmsCatalogue::prepareToRetri
   const std::string &diskInstanceName,
   const uint64_t archiveFileId,
   const common::dataStructures::UserIdentity &user,
+  const optional<std::string>& activity,
   log::LogContext &lc) {
   try {
     cta::utils::Timer t;
@@ -5277,6 +5495,58 @@ std::unique_ptr<common::dataStructures::ArchiveFile> RdbmsCatalogue::getArchiveF
   }
 }
 
+//------------------------------------------------------------------------------
+// getCachedActivitiesWeights
+//------------------------------------------------------------------------------
+common::dataStructures::ActivitiesFairShareWeights 
+RdbmsCatalogue::getCachedActivitiesWeights(const std::string& diskInstance) const {
+  try {
+    auto getNonCachedValue = [&] {
+      auto conn = m_connPool.getConn();
+      return getActivitiesWeights(conn, diskInstance);
+    };
+    return m_activitiesFairShareWeights.getCachedValue(diskInstance, getNonCachedValue);
+  } catch(exception::UserError &) {
+    throw;
+  } catch(exception::Exception &ex) {
+    ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str());
+    throw;
+  }
+}
+
+//------------------------------------------------------------------------------
+// getActivitiesWeights
+//------------------------------------------------------------------------------
+common::dataStructures::ActivitiesFairShareWeights 
+RdbmsCatalogue::getActivitiesWeights(rdbms::Conn& conn, const std::string& diskInstanceName) const {
+  try {
+    const char *const sql =
+      "SELECT "
+        "ACTIVITIES_WEIGHTS.ACTIVITY AS ACTIVITY,"
+        "ACTIVITIES_WEIGHTS.WEIGHT AS WEIGHT "
+      "FROM "
+        "ACTIVITIES_WEIGHTS "
+      "WHERE "
+        "ACTIVITIES_WEIGHTS.DISK_INSTANCE_NAME = :DISK_INSTANCE_NAME";
+    auto stmt = conn.createStmt(sql);
+    stmt.bindString(":DISK_INSTANCE_NAME", diskInstanceName);
+    auto rset = stmt.executeQuery();
+    common::dataStructures::ActivitiesFairShareWeights afsw;
+    afsw.diskInstance = diskInstanceName;
+    while (rset.next()) {
+      // The weight is a string encoded double with values in [0, 1], like in FTS.
+      // All the checks are performed in setWeightFromString().
+      afsw.setWeightFromString(rset.columnString("ACTIVITY"), rset.columnString("WEIGHT"));
+    }
+    return afsw;
+  } catch(exception::UserError &) {
+    throw;
+  } catch(exception::Exception &ex) {
+    ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str());
+    throw;
+  }
+}
+
 //------------------------------------------------------------------------------
 // getArchiveFileByDiskFileId
 //------------------------------------------------------------------------------
diff --git a/catalogue/RdbmsCatalogue.hpp b/catalogue/RdbmsCatalogue.hpp
index 584074885c..0609fd0e4d 100644
--- a/catalogue/RdbmsCatalogue.hpp
+++ b/catalogue/RdbmsCatalogue.hpp
@@ -170,6 +170,8 @@ public:
    * @param user The user for whom the file is to be retrieved.  This will be
    * used by the Catalogue to determine the mount policy to be used when
    * retrieving the file.
+   * @param activity The activity under which the user wants to start the retrieve
+   * The call will fail if the activity is set and unknown. 
    * @param lc The log context.
    *
    * @return The information required to queue the associated retrieve request(s).
@@ -178,6 +180,7 @@ public:
     const std::string &diskInstanceName,
     const uint64_t archiveFileId,
     const common::dataStructures::UserIdentity &user,
+    const optional<std::string> & activity,
     log::LogContext &lc) override;
 
   /**
@@ -494,7 +497,15 @@ public:
   void modifyMountPolicyRetrieveMinRequestAge(const common::dataStructures::SecurityIdentity &admin, const std::string &name, const uint64_t minRetrieveRequestAge) override;
   void modifyMountPolicyMaxDrivesAllowed(const common::dataStructures::SecurityIdentity &admin, const std::string &name, const uint64_t maxDrivesAllowed) override;
   void modifyMountPolicyComment(const common::dataStructures::SecurityIdentity &admin, const std::string &name, const std::string &comment) override;
+  
+  void createActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity &admin, const std::string & diskInstanceName, const std::string & activity,
+    double weight, const std::string & comment) override;
+  void modifyActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity &admin, const std::string & diskInstanceName, const std::string & activity,
+    double weight, const std::string & comment) override;
+  void deleteActivitiesFairShareWeight(const common::dataStructures::SecurityIdentity &admin, const std::string & diskInstanceName, const std::string & activity) override;
+  std::list<common::dataStructures::ActivitiesFairShareWeights> getActivitiesFairShareWeights() const override;
 
+  
   /**
    * Throws a UserError exception if the specified searchCriteria is not valid
    * due to a user error.
@@ -1080,6 +1091,25 @@ protected:
     rdbms::Conn &conn,
     const uint64_t archiveFileId) const;
 
+  /**
+   * Returns a cached version of the (possibly empty) activities to weight map
+   * for the given dsk instance.
+   * @param diskInstance
+   * @return activities to weight map (ActivitiesFairShareWeights)
+   */
+  common::dataStructures::ActivitiesFairShareWeights getCachedActivitiesWeights(
+    const std::string &diskInstanceName) const;
+  
+  /**
+   * Returns a the (possibly empty) activities to weight map for the given dsk instance.
+   * @param conn The database connection.
+   * @param diskInstance
+   * @return activities to weight map (ActivitiesFairShareWeights)
+   */
+  common::dataStructures::ActivitiesFairShareWeights getActivitiesWeights(
+    rdbms::Conn &conn,
+    const std::string &diskInstanceName) const;  
+  
   /**
    * Returns the specified archive file.   A nullptr pointer is returned if
    * there is no corresponding row in the ARCHIVE_FILE table.  Please note that
@@ -1270,6 +1300,11 @@ protected:
    * Cached version of isAdmin() results.
    */
   mutable TimeBasedCache<common::dataStructures::SecurityIdentity, bool> m_isAdminCache;
+  
+  /**
+   * Cached version of the activities to weight maps.
+   */
+  mutable TimeBasedCache<std::string, common::dataStructures::ActivitiesFairShareWeights> m_activitiesFairShareWeights;
 
 }; // class RdbmsCatalogue
 
diff --git a/catalogue/common_catalogue_schema.sql b/catalogue/common_catalogue_schema.sql
index a5f2a3b801..3e7b1d0dd4 100644
--- a/catalogue/common_catalogue_schema.sql
+++ b/catalogue/common_catalogue_schema.sql
@@ -197,6 +197,18 @@ CREATE TABLE TAPE_FILE(
 CREATE INDEX TAPE_FILE_VID_IDX ON TAPE_FILE(VID);
 CREATE INDEX TAPE_FILE_ARCHIVE_FILE_ID_IDX ON TAPE_FILE(ARCHIVE_FILE_ID);
 CREATE INDEX TAPE_FILE_SBV_SBF_IDX ON TAPE_FILE(SUPERSEDED_BY_VID, SUPERSEDED_BY_FSEQ);
+CREATE TABLE ACTIVITIES_WEIGHTS (
+  DISK_INSTANCE_NAME       VARCHAR(100),
+  ACTIVITY                 VARCHAR(100),
+  WEIGHT                   VARCHAR(100),
+  USER_COMMENT             VARCHAR(1000)   CONSTRAINT ACTIV_WEIGHTS_UC_NN   NOT NULL,
+  CREATION_LOG_USER_NAME   VARCHAR(100)    CONSTRAINT ACTIV_WEIGHTS_CLUN_NN NOT NULL,
+  CREATION_LOG_HOST_NAME   VARCHAR(100)    CONSTRAINT ACTIV_WEIGHTS_CLHN_NN NOT NULL,
+  CREATION_LOG_TIME        NUMERIC(20, 0)  CONSTRAINT ACTIV_WEIGHTS_CLT_NN  NOT NULL,
+  LAST_UPDATE_USER_NAME    VARCHAR(100)    CONSTRAINT ACTIV_WEIGHTS_LUUN_NN NOT NULL,
+  LAST_UPDATE_HOST_NAME    VARCHAR(100)    CONSTRAINT ACTIV_WEIGHTS_LUHN_NN NOT NULL,
+  LAST_UPDATE_TIME         NUMERIC(20, 0)  CONSTRAINT ACTIV_WEIGHTS_LUT_NN  NOT NULL
+);
 INSERT INTO CTA_CATALOGUE(
   SCHEMA_VERSION_MAJOR,
   SCHEMA_VERSION_MINOR)
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index cac4940c7d..5c0965ce33 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -28,6 +28,7 @@ include_directories (${XROOTD_INCLUDE_DIR})
 set_source_files_properties(CRC.cpp PROPERTIES COMPILE_FLAGS -O2)
 
 set (COMMON_LIB_SRC_FILES
+  dataStructures/ActivitiesFairShareWeights.cpp
   dataStructures/AdminUser.cpp
   dataStructures/ArchiveFile.cpp
   dataStructures/ArchiveFileQueueCriteria.cpp
diff --git a/common/dataStructures/RetrieveFileQueueCriteria.hpp b/common/dataStructures/RetrieveFileQueueCriteria.hpp
index 9e1dc809bc..ba5965acd0 100644
--- a/common/dataStructures/RetrieveFileQueueCriteria.hpp
+++ b/common/dataStructures/RetrieveFileQueueCriteria.hpp
@@ -20,6 +20,7 @@
 
 #include "common/dataStructures/ArchiveFile.hpp"
 #include "common/dataStructures/MountPolicy.hpp"
+#include "common/dataStructures/ActivitiesFairShareWeights.hpp"
 
 #include <map>
 #include <stdint.h>
@@ -43,6 +44,11 @@ struct RetrieveFileQueueCriteria {
    */
   MountPolicy mountPolicy;
   
+  /**
+   * The fair shares for the disk instance of the file (if any).
+   */
+  ActivitiesFairShareWeights activitiesFairShareWeight;
+  
   RetrieveFileQueueCriteria &operator=(const RetrieveFileQueueCriteria& other);
 
 }; // struct RetrieveFileQueueCriteria
diff --git a/common/dataStructures/RetrieveRequest.hpp b/common/dataStructures/RetrieveRequest.hpp
index ecab33381a..5a82075162 100644
--- a/common/dataStructures/RetrieveRequest.hpp
+++ b/common/dataStructures/RetrieveRequest.hpp
@@ -28,6 +28,7 @@
 #include "common/dataStructures/UserIdentity.hpp"
 #include "common/dataStructures/ArchiveRoute.hpp"
 #include "LifecycleTimings.hpp"
+#include "common/optional.hpp"
 
 namespace cta {
 namespace common {
@@ -51,6 +52,8 @@ struct RetrieveRequest {
   DiskFileInfo diskFileInfo;
   EntryLog creationLog;
   LifecycleTimings lifecycleTimings;
+  optional<std::string> activity;
+
 }; // struct RetrieveRequest
 
 std::ostream &operator<<(std::ostream &os, const RetrieveRequest &obj);
diff --git a/common/exception/Exception.hpp b/common/exception/Exception.hpp
index e62940f360..69f6d413da 100644
--- a/common/exception/Exception.hpp
+++ b/common/exception/Exception.hpp
@@ -121,8 +121,4 @@ protected:
 
 }} // namespace cta::exception
 
-#define CTA_GENERATE_EXCEPTION_CLASS(A)                          \
-class A: public cta::exception::Exception {                      \
-public:                                                          \
-  A(const std::string & w = ""): cta::exception::Exception(w) {} \
-}
+#define CTA_GENERATE_EXCEPTION_CLASS(A) class A: public cta::exception::Exception { using Exception::Exception; }
diff --git a/common/exception/UserError.hpp b/common/exception/UserError.hpp
index 4cbfb57dc6..731d141951 100644
--- a/common/exception/UserError.hpp
+++ b/common/exception/UserError.hpp
@@ -43,3 +43,5 @@ public:
 
 } // namespace exception
 } // namespace cta
+
+#define CTA_GENERATE_USER_EXCEPTION_CLASS(A) class A: public cta::exception::UserError { using UserError::UserError; }
diff --git a/continuousintegration/orchestration/tests/client_ar.sh b/continuousintegration/orchestration/tests/client_ar.sh
index 83f1c8618e..f0c463ada5 100644
--- a/continuousintegration/orchestration/tests/client_ar.sh
+++ b/continuousintegration/orchestration/tests/client_ar.sh
@@ -252,9 +252,9 @@ fi
 echo "###"
 echo "${TAPEONLY}/${ARCHIVED} on tape only"
 echo "###"
-echo "Sleeping 400 seconds to allow MGM-FST communication to settle after disk copy deletion."
-sleep 400
-echo "###"
+#echo "Sleeping 400 seconds to allow MGM-FST communication to settle after disk copy deletion."
+#sleep 400
+#echo "###"
 
 
 if [[ $TAPEAWAREGC == 1 ]]; then
@@ -279,7 +279,7 @@ done
 # CAREFULL HERE: ${STATUS_FILE} contains lines like: 99/test9900001
 for ((subdir=0; subdir < ${NB_DIRS}; subdir++)); do
   echo -n "Recalling files to ${EOS_DIR}/${subdir} using ${NB_PROCS} processes..."
-  cat ${STATUS_FILE} | grep ^${subdir}/ | cut -d/ -f2 | xargs --max-procs=${NB_PROCS} -iTEST_FILE_NAME bash -c "XRD_LOGLEVEL=Dump KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 XrdSecPROTOCOL=krb5 xrdfs ${EOSINSTANCE} prepare -s ${EOS_DIR}/${subdir}/TEST_FILE_NAME 2>${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME && rm ${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME || echo ERROR with xrootd transfer for file TEST_FILE_NAME, full logs in ${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME"
+  cat ${STATUS_FILE} | grep ^${subdir}/ | cut -d/ -f2 | xargs --max-procs=${NB_PROCS} -iTEST_FILE_NAME bash -c "XRD_LOGLEVEL=Dump KRB5CCNAME=/tmp/${EOSPOWER_USER}/krb5cc_0 XrdSecPROTOCOL=krb5 xrdfs ${EOSINSTANCE} prepare -s ${EOS_DIR}/${subdir}/TEST_FILE_NAME?activity=T0Reprocess 2>${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME && rm ${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME || echo ERROR with xrootd transfer for file TEST_FILE_NAME, full logs in ${ERROR_DIR}/RETRIEVE_TEST_FILE_NAME"
   echo Done.
 done
 
diff --git a/scheduler/Scheduler.cpp b/scheduler/Scheduler.cpp
index 16f0d76829..8c9ba58557 100644
--- a/scheduler/Scheduler.cpp
+++ b/scheduler/Scheduler.cpp
@@ -210,7 +210,7 @@ void Scheduler::queueRetrieve(
   utils::Timer t;
   // Get the queue criteria
   common::dataStructures::RetrieveFileQueueCriteria queueCriteria;
-  queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, lc);
+  queueCriteria = m_catalogue.prepareToRetrieveFile(instanceName, request.archiveFileID, request.requester, request.activity, lc);
   auto catalogueTime = t.secs(cta::utils::Timer::resetCounter);
   std::string selectedVid = m_db.queueRetrieve(request, queueCriteria, lc);
   auto schedulerDbTime = t.secs();
diff --git a/xroot_plugins/XrdSsiCtaRequestMessage.cpp b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
index 0f5b19a2b3..a4176e7247 100644
--- a/xroot_plugins/XrdSsiCtaRequestMessage.cpp
+++ b/xroot_plugins/XrdSsiCtaRequestMessage.cpp
@@ -494,6 +494,11 @@ void RequestMessage::processPREPARE(const cta::eos::Notification &notification,
    {
       throw PbException("Invalid archiveFileID " + archiveFileIdStr);
    }
+   
+   // Activity value is a string. The parameter might be present or not.
+   try {
+     request.activity = notification.file().xattr().at("activity");
+   } catch (...) {}
 
    cta::utils::Timer t;
 
-- 
GitLab