Commit c07e8365 authored by Daniele Kruse's avatar Daniele Kruse
Browse files

Introduced the concept of mountpolicy and added code for it in the scheduler db (and objectstore)

parent 8f6999bd
......@@ -40,6 +40,7 @@
#include "common/dataStructures/ListStorageClassRequest.hpp"
#include "common/dataStructures/LogicalLibrary.hpp"
#include "common/dataStructures/MountType.hpp"
#include "common/dataStructures/MountPolicy.hpp"
#include "common/dataStructures/ReadTestResult.hpp"
#include "common/dataStructures/RepackInfo.hpp"
#include "common/dataStructures/RepackType.hpp"
......@@ -184,6 +185,8 @@ public:
const cta::common::dataStructures::TapeFileLocation tapeFileLocation) = 0;
virtual std::map<uint64_t,std::string> getCopyNbToTapePoolMap(const std::string &storageClass) = 0;
virtual cta::common::dataStructures::MountPolicy getArchiveMountPolicy(const cta::common::dataStructures::Requester &requester) = 0;
virtual cta::common::dataStructures::MountPolicy getRetrieveMountPolicy(const cta::common::dataStructures::Requester &requester) = 0;
}; // class Catalogue
......
......@@ -414,4 +414,18 @@ void cta::catalogue::DummyCatalogue::fileWrittenToTape(
//------------------------------------------------------------------------------
std::map<uint64_t,std::string> cta::catalogue::DummyCatalogue::getCopyNbToTapePoolMap(const std::string &storageClass) {
return std::map<uint64_t,std::string>();
}
//------------------------------------------------------------------------------
// getArchiveMountPolicy
//------------------------------------------------------------------------------
cta::common::dataStructures::MountPolicy cta::catalogue::DummyCatalogue::getArchiveMountPolicy(const cta::common::dataStructures::Requester &requester) {
return cta::common::dataStructures::MountPolicy();
}
//------------------------------------------------------------------------------
// getRetrieveMountPolicy
//------------------------------------------------------------------------------
cta::common::dataStructures::MountPolicy cta::catalogue::DummyCatalogue::getRetrieveMountPolicy(const cta::common::dataStructures::Requester &requester) {
return cta::common::dataStructures::MountPolicy();
}
\ No newline at end of file
......@@ -143,6 +143,9 @@ public:
const cta::common::dataStructures::TapeFileLocation tapeFileLocation);
std::map<uint64_t,std::string> getCopyNbToTapePoolMap(const std::string &storageClass);
virtual cta::common::dataStructures::MountPolicy getArchiveMountPolicy(const cta::common::dataStructures::Requester &requester);
virtual cta::common::dataStructures::MountPolicy getRetrieveMountPolicy(const cta::common::dataStructures::Requester &requester);
}; // class DummyCatalogue
......
......@@ -18,6 +18,7 @@ set (COMMON_LIB_SRC_FILES
dataStructures/EntryLog.cpp
dataStructures/ListStorageClassRequest.cpp
dataStructures/MountType.cpp
dataStructures/MountPolicy.cpp
dataStructures/ReadTestResult.cpp
dataStructures/RepackInfo.cpp
dataStructures/RepackType.cpp
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common/dataStructures/MountPolicy.hpp"
#include "common/exception/Exception.hpp"
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::common::dataStructures::MountPolicy::MountPolicy() {
m_maxDrivesSet = false;
m_minBytesQueuedSet = false;
m_minFilesQueuedSet = false;
m_minRequestAgeSet = false;
m_prioritySet = false;
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
cta::common::dataStructures::MountPolicy::~MountPolicy() throw() {
}
//------------------------------------------------------------------------------
// allFieldsSet
//------------------------------------------------------------------------------
bool cta::common::dataStructures::MountPolicy::allFieldsSet() const {
return m_maxDrivesSet
&& m_minBytesQueuedSet
&& m_minFilesQueuedSet
&& m_minRequestAgeSet
&& m_prioritySet;
}
//------------------------------------------------------------------------------
// setMaxDrives
//------------------------------------------------------------------------------
void cta::common::dataStructures::MountPolicy::setMaxDrives(const uint64_t maxDrives) {
m_maxDrives = maxDrives;
m_maxDrivesSet = true;
}
//------------------------------------------------------------------------------
// getMaxDrives
//------------------------------------------------------------------------------
uint64_t cta::common::dataStructures::MountPolicy::getMaxDrives() const {
if(!allFieldsSet()) {
throw cta::exception::Exception(std::string(__FUNCTION__)+" Error: not all fields of the MountPolicy have been set!");
}
return m_maxDrives;
}
//------------------------------------------------------------------------------
// setMinBytesQueued
//------------------------------------------------------------------------------
void cta::common::dataStructures::MountPolicy::setMinBytesQueued(const uint64_t minBytesQueued) {
m_minBytesQueued = minBytesQueued;
m_minBytesQueuedSet = true;
}
//------------------------------------------------------------------------------
// getMinBytesQueued
//------------------------------------------------------------------------------
uint64_t cta::common::dataStructures::MountPolicy::getMinBytesQueued() const {
if(!allFieldsSet()) {
throw cta::exception::Exception(std::string(__FUNCTION__)+" Error: not all fields of the MountPolicy have been set!");
}
return m_minBytesQueued;
}
//------------------------------------------------------------------------------
// setMinFilesQueued
//------------------------------------------------------------------------------
void cta::common::dataStructures::MountPolicy::setMinFilesQueued(const uint64_t minFilesQueued) {
m_minFilesQueued = minFilesQueued;
m_minFilesQueuedSet = true;
}
//------------------------------------------------------------------------------
// getMinFilesQueued
//------------------------------------------------------------------------------
uint64_t cta::common::dataStructures::MountPolicy::getMinFilesQueued() const {
if(!allFieldsSet()) {
throw cta::exception::Exception(std::string(__FUNCTION__)+" Error: not all fields of the MountPolicy have been set!");
}
return m_minFilesQueued;
}
//------------------------------------------------------------------------------
// setMinRequestAge
//------------------------------------------------------------------------------
void cta::common::dataStructures::MountPolicy::setMinRequestAge(const uint64_t minRequestAge) {
m_minRequestAge = minRequestAge;
m_minRequestAgeSet = true;
}
//------------------------------------------------------------------------------
// getMinRequestAge
//------------------------------------------------------------------------------
uint64_t cta::common::dataStructures::MountPolicy::getMinRequestAge() const {
if(!allFieldsSet()) {
throw cta::exception::Exception(std::string(__FUNCTION__)+" Error: not all fields of the MountPolicy have been set!");
}
return m_minRequestAge;
}
//------------------------------------------------------------------------------
// setPriority
//------------------------------------------------------------------------------
void cta::common::dataStructures::MountPolicy::setPriority(const uint64_t priority) {
m_priority = priority;
m_prioritySet = true;
}
//------------------------------------------------------------------------------
// getPriority
//------------------------------------------------------------------------------
uint64_t cta::common::dataStructures::MountPolicy::getPriority() const {
if(!allFieldsSet()) {
throw cta::exception::Exception(std::string(__FUNCTION__)+" Error: not all fields of the MountPolicy have been set!");
}
return m_priority;
}
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <list>
#include <map>
#include <stdint.h>
#include <string>
namespace cta {
namespace common {
namespace dataStructures {
class MountPolicy {
public:
/**
* Constructor
*/
MountPolicy();
/**
* Destructor
*/
~MountPolicy() throw();
void setMaxDrives(const uint64_t maxDrives);
uint64_t getMaxDrives() const;
void setMinBytesQueued(const uint64_t minBytesQueued);
uint64_t getMinBytesQueued() const;
void setMinFilesQueued(const uint64_t minFilesQueued);
uint64_t getMinFilesQueued() const;
void setMinRequestAge(const uint64_t minRequestAge);
uint64_t getMinRequestAge() const;
void setPriority(const uint64_t priority);
uint64_t getPriority() const;
private:
/**
* @return true if all fields have been set, false otherwise
*/
bool allFieldsSet() const;
uint64_t m_maxDrives;
bool m_maxDrivesSet;
uint64_t m_minBytesQueued;
bool m_minBytesQueuedSet;
uint64_t m_minFilesQueued;
bool m_minFilesQueuedSet;
uint64_t m_minRequestAge;
bool m_minRequestAgeSet;
uint64_t m_priority;
bool m_prioritySet;
}; // class MountPolicy
} // namespace dataStructures
} // namespace common
} // namespace cta
......@@ -145,6 +145,22 @@ void cta::objectstore::ArchiveRequest::setAllJobsPendingNSdeletion() {
}
}
//------------------------------------------------------------------------------
// setArchiveFileID
//------------------------------------------------------------------------------
void cta::objectstore::ArchiveRequest::setArchiveFileID(const uint64_t archiveFileID) {
checkPayloadWritable();
m_payload.set_archivefileid(archiveFileID);
}
//------------------------------------------------------------------------------
// getArchiveFileID
//------------------------------------------------------------------------------
uint64_t cta::objectstore::ArchiveRequest::getArchiveFileID() {
checkPayloadReadable();
return m_payload.archivefileid();
}
//------------------------------------------------------------------------------
// setChecksumType
//------------------------------------------------------------------------------
......@@ -209,6 +225,34 @@ uint64_t cta::objectstore::ArchiveRequest::getDiskpoolThroughput() {
return m_payload.diskpoolthroughput();
}
//------------------------------------------------------------------------------
// setDiskpoolThroughput
//------------------------------------------------------------------------------
void cta::objectstore::ArchiveRequest::setMountPolicy(const cta::common::dataStructures::MountPolicy &mountPolicy) {
checkPayloadWritable();
auto payloadMountPolicy = m_payload.mutable_mountpolicy();
payloadMountPolicy->set_maxdrives(mountPolicy.getMaxDrives());
payloadMountPolicy->set_minbytesqueued(mountPolicy.getMinBytesQueued());
payloadMountPolicy->set_minfilesqueued(mountPolicy.getMinFilesQueued());
payloadMountPolicy->set_minrequestage(mountPolicy.getMinRequestAge());
payloadMountPolicy->set_priority(mountPolicy.getPriority());
}
//------------------------------------------------------------------------------
// setDiskpoolThroughput
//------------------------------------------------------------------------------
cta::common::dataStructures::MountPolicy cta::objectstore::ArchiveRequest::getMountPolicy() {
checkPayloadReadable();
cta::common::dataStructures::MountPolicy mountPolicy;
auto payloadMountPolicy = m_payload.mountpolicy();
mountPolicy.setMaxDrives(payloadMountPolicy.maxdrives());
mountPolicy.setMinBytesQueued(payloadMountPolicy.minbytesqueued());
mountPolicy.setMinFilesQueued(payloadMountPolicy.minfilesqueued());
mountPolicy.setMinRequestAge(payloadMountPolicy.minrequestage());
mountPolicy.setPriority(payloadMountPolicy.priority());
return mountPolicy;
}
//------------------------------------------------------------------------------
// setDrData
//------------------------------------------------------------------------------
......
......@@ -21,6 +21,7 @@
#include "common/dataStructures/DRData.hpp"
#include "common/dataStructures/EntryLog.hpp"
#include "common/dataStructures/MountPolicy.hpp"
#include "common/dataStructures/Requester.hpp"
#include "ObjectOps.hpp"
#include "objectstore/cta.pb.h"
......@@ -65,6 +66,9 @@ public:
void setSuccessful();
void setFailed();
// ===========================================================================
void setArchiveFileID(const uint64_t archiveFileID);
uint64_t getArchiveFileID();
void setChecksumType(const std::string &checksumType);
std::string getChecksumType();
......@@ -98,6 +102,9 @@ public:
void setCreationLog(const cta::common::dataStructures::EntryLog &creationLog);
cta::common::dataStructures::EntryLog getCreationLog();
void setMountPolicy(const cta::common::dataStructures::MountPolicy &mountPolicy);
cta::common::dataStructures::MountPolicy getMountPolicy();
class JobDump {
public:
uint16_t copyNb;
......
......@@ -428,7 +428,17 @@ message EntryLog {
required uint64 time = 8970;
}
message MountPolicy {
required uint64 minBytesQueued = 8981;
required uint64 minFilesQueued = 8983;
required uint64 minRequestAge = 8985;
required uint64 priority = 8987;
required uint64 maxDrives = 8989;
}
message ArchiveRequest {
required uint64 archiveFileID = 8990;
required MountPolicy mountPolicy = 8995;
required string checksumtype = 9000;
required string checksumValue = 9010;
required string diskpoolName = 9020;
......@@ -445,6 +455,7 @@ message ArchiveRequest {
message RetrieveRequest {
required uint64 archiveFileID = 9100;
required MountPolicy mountPolicy = 9105;
required string diskpoolName = 9110;
required uint64 diskpoolThroughput = 9120;
required DRData drData = 9130;
......
......@@ -745,13 +745,14 @@ std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCreation>
}
std::unique_ptr<cta::SchedulerDatabase::ArchiveRequestCreation> OStoreDB::queue(const cta::common::dataStructures::ArchiveRequest &request,
const uint64_t archiveFileId, const std::map<uint64_t, std::string> &copyNbToPoolMap) {
const uint64_t archiveFileId, const std::map<uint64_t, std::string> &copyNbToPoolMap, const cta::common::dataStructures::MountPolicy &mountPolicy) {
assertAgentSet();
// Construct the return value immediately
std::unique_ptr<cta::OStoreDB::ArchiveRequestCreation> internalRet(new cta::OStoreDB::ArchiveRequestCreation(m_agent, m_objectStore));
cta::objectstore::ArchiveRequest & ar = internalRet->m_request;
ar.setAddress(m_agent->nextId("ArchiveRequest"));
ar.initialize();
ar.setArchiveFileID(archiveFileId);
ar.setChecksumType(request.getChecksumType());
ar.setChecksumValue(request.getChecksumValue());
ar.setCreationLog(request.getCreationLog());
......@@ -760,6 +761,7 @@ std::unique_ptr<cta::SchedulerDatabase::ArchiveRequestCreation> OStoreDB::queue(
ar.setDrData(request.getDrData());
ar.setEosFileID(request.getEosFileID());
ar.setFileSize(request.getFileSize());
ar.setMountPolicy(mountPolicy);
ar.setRequester(request.getRequester());
ar.setSrcURL(request.getSrcURL());
ar.setStorageClass(request.getStorageClass());
......
......@@ -252,7 +252,8 @@ public:
virtual std::unique_ptr<cta::SchedulerDatabase::ArchiveToFileRequestCreation> queue(const ArchiveToFileRequest& rqst);
virtual std::unique_ptr<cta::SchedulerDatabase::ArchiveRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId, const std::map<uint64_t, std::string> &copyNbToPoolMap);
virtual std::unique_ptr<cta::SchedulerDatabase::ArchiveRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request,
const uint64_t archiveFileId, const std::map<uint64_t, std::string> &copyNbToPoolMap, const cta::common::dataStructures::MountPolicy &mountPolicy);
CTA_GENERATE_EXCEPTION_CLASS(NoSuchArchiveRequest);
CTA_GENERATE_EXCEPTION_CLASS(ArchiveRequestAlreadyDeleted);
......
......@@ -165,8 +165,9 @@ public:
return m_OStoreDB.queue(rqst);
}
virtual std::unique_ptr<ArchiveRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId, const std::map<uint64_t, std::string> &copyNbToPoolMap) {
return m_OStoreDB.queue(request, archiveFileId, copyNbToPoolMap);
virtual std::unique_ptr<ArchiveRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId,
const std::map<uint64_t, std::string> &copyNbToPoolMap, const cta::common::dataStructures::MountPolicy &mountPolicy) {
return m_OStoreDB.queue(request, archiveFileId, copyNbToPoolMap, mountPolicy);
}
virtual void queue(const RetrieveToFileRequest& rqst) {
......
......@@ -50,7 +50,8 @@ cta::Scheduler::~Scheduler() throw() {
uint64_t cta::Scheduler::queueArchiveRequest(const cta::common::dataStructures::SecurityIdentity &requestPusher, const cta::common::dataStructures::ArchiveRequest &request) {
const uint64_t archiveFileId = m_catalogue.getNextArchiveFileId();
const std::map<uint64_t, std::string> copyNbToPoolMap = m_catalogue.getCopyNbToTapePoolMap(request.getStorageClass());
std::unique_ptr<SchedulerDatabase::ArchiveRequestCreation> requestCreation(m_db.queue(request, archiveFileId, copyNbToPoolMap));
const cta::common::dataStructures::MountPolicy policy = m_catalogue.getArchiveMountPolicy(request.getRequester());
std::unique_ptr<SchedulerDatabase::ArchiveRequestCreation> requestCreation(m_db.queue(request, archiveFileId, copyNbToPoolMap, policy));
requestCreation->complete();
return 0;
......
......@@ -37,6 +37,7 @@
#include "scheduler/MountType.hpp"
#include "common/forwardDeclarations.hpp"
#include "common/dataStructures/ArchiveRequest.hpp"
#include "common/dataStructures/MountPolicy.hpp"
namespace cta {
/**
......@@ -92,7 +93,8 @@ public:
*
* @param rqst The request.
*/
virtual std::unique_ptr<ArchiveRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId, const std::map<uint64_t, std::string> &copyNbToPoolMap) = 0;
virtual std::unique_ptr<ArchiveRequestCreation> queue(const cta::common::dataStructures::ArchiveRequest &request, const uint64_t archiveFileId,
const std::map<uint64_t, std::string> &copyNbToPoolMap, const cta::common::dataStructures::MountPolicy &mountPolicy) = 0;
/**
* Returns all of the queued archive requests. The returned requests are
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment