Commit 028d9ea1 authored by Eric Cano's avatar Eric Cano
Browse files

Added support for getting queued archive requests.

Added binned running counters (ValueCoundMap) for managing summaries of priorities in object store queues.
Simplified the interfaces to the getPendingArchiveJobs and getPendingRetrieveJobs functions in Scheduler and SchedulerDB.
Removed now unused ArchiveToTapeCopyRequest.
Removed long unused mockScheduler.
Removed _old_protoype files.
parent 786a0e48
......@@ -21,6 +21,7 @@
#include "ProtocolBuffersAlgorithms.hpp"
#include "EntryLog.hpp"
#include "RootEntry.hpp"
#include "ValueCountMap.hpp"
#include <json-c/json.h>
namespace cta { namespace objectstore {
......@@ -153,9 +154,15 @@ std::string ArchiveQueue::getName() {
void ArchiveQueue::addJob(const ArchiveRequest::JobDump& job,
const std::string & archiveToFileAddress, uint64_t fileid,
uint64_t size, uint64_t priority, time_t startTime) {
uint64_t size, const cta::common::dataStructures::MountPolicy & policy, time_t startTime) {
checkPayloadWritable();
// The tape pool gets the highest priority of its jobs
// Keep track of the mounting criteria
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
maxDriveAllowedMap.incCount(policy.maxDrivesAllowed);
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
priorityMap.incCount(policy.archivePriority);
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
minArchiveRequestAgeMap.incCount(policy.archiveMinRequestAge);
if (m_payload.pendingarchivejobs_size()) {
if ((uint64_t)startTime < m_payload.oldestjobcreationtime())
m_payload.set_oldestjobcreationtime(startTime);
......@@ -177,6 +184,18 @@ auto ArchiveQueue::getJobsSummary() -> JobsSummary {
ret.files = m_payload.pendingarchivejobs_size();
ret.bytes = m_payload.archivejobstotalsize();
ret.oldestJobStartTime = m_payload.oldestjobcreationtime();
if (ret.files) {
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ret.maxDrivesAllowed = maxDriveAllowedMap.maxValue();
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
ret.priority = priorityMap.maxValue();
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
ret.minArchiveRequestAge = minArchiveRequestAgeMap.minValue();
} else {
ret.maxDrivesAllowed = 0;
ret.priority = 0;
ret.minArchiveRequestAge = 0;
}
return ret;
}
......@@ -221,7 +240,7 @@ void ArchiveQueue::removeJob(const std::string& archiveToFileAddress) {
} while (found);
}
auto ArchiveQueue::dumpJobs() -> std::list<JobDump> {
auto ArchiveQueue::dumpJobs() -> std::list<ArchiveQueue::JobDump> {
checkPayloadReadable();
std::list<JobDump> ret;
auto & jl=m_payload.pendingarchivejobs();
......@@ -270,8 +289,4 @@ bool ArchiveQueue::addOrphanedJobPendingNsDeletion(
return true;
}
cta::common::dataStructures::MountPolicy ArchiveQueue::getMountPolicy() {
throw cta::exception::Exception(std::string("Not implemented: ") + __PRETTY_FUNCTION__);
}
}} // namespace cta::objectstore
......@@ -52,7 +52,7 @@ public:
// Archive jobs management ===================================================
void addJob(const ArchiveRequest::JobDump & job,
const std::string & archiveToFileAddress, uint64_t fileid,
uint64_t size, uint64_t priority, time_t startTime);
uint64_t size, const cta::common::dataStructures::MountPolicy & priority, time_t startTime);
/// This version will check for existence of the job in the queue before
// returns true if a new job was actually inserted.
bool addJobIfNecessary(const ArchiveRequest::JobDump & job,
......@@ -74,6 +74,8 @@ public:
uint64_t bytes;
time_t oldestJobStartTime;
uint64_t priority;
uint64_t minArchiveRequestAge;
uint64_t maxDrivesAllowed;
};
JobsSummary getJobsSummary();
......@@ -86,10 +88,6 @@ public:
};
std::list<JobDump> dumpJobs();
// Mount criteria jobs management ============================================
void setMountPolicy(const common::dataStructures::MountPolicy& mountPolicy);
common::dataStructures::MountPolicy getMountPolicy();
// Check that the tape pool is empty (of both tapes and jobs)
bool isEmpty();
......
......@@ -233,6 +233,7 @@ uint64_t cta::objectstore::ArchiveRequest::getDiskpoolThroughput() {
void cta::objectstore::ArchiveRequest::setMountPolicy(const cta::common::dataStructures::MountPolicy &mountPolicy) {
checkPayloadWritable();
auto payloadMountPolicy = m_payload.mutable_mountpolicy();
payloadMountPolicy->set_name(mountPolicy.name);
payloadMountPolicy->set_maxdrives(mountPolicy.maxDrivesAllowed);
payloadMountPolicy->set_minrequestage(mountPolicy.archiveMinRequestAge);
payloadMountPolicy->set_priority(mountPolicy.archivePriority);
......@@ -404,7 +405,7 @@ cta::common::dataStructures::EntryLog ArchiveRequest::getCreationLog() {
return creationLog;
}
auto ArchiveRequest::dumpJobs() -> std::list<JobDump> {
auto ArchiveRequest::dumpJobs() -> std::list<ArchiveRequest::JobDump> {
checkPayloadReadable();
std::list<JobDump> ret;
auto & jl = m_payload.jobs();
......
......@@ -42,7 +42,8 @@ set (CTAProtoDependants objectstore/Agent.hpp
objectstore/SchedulerGlobalLock.hpp
objectstore/TapeQueue.hpp
objectstore/ArchiveQueue.hpp
objectstore/UserIdentity.hpp)
objectstore/UserIdentity.hpp
objectstore/ValueCountMap.hpp)
SET_SOURCE_FILES_PROPERTIES(${CTAProtoHeaders} PROPERTIES HEADER_FILE_ONLY TRUE)
SET_SOURCE_FILES_PROPERTIES(${CTAProtoDependants}
......@@ -69,7 +70,8 @@ add_library (ctaobjectstore SHARED
ProtocolBuffersAlgorithms.cpp
GenericObject.cpp
GarbageCollector.cpp
SchedulerGlobalLock.cpp)
SchedulerGlobalLock.cpp
ValueCountMap.cpp)
target_link_libraries(ctaobjectstore Utils rados json-c ctautils)
set_source_files_properties(BackendRados.cpp PROPERTIES COMPILE_FLAGS -Wno-deprecated-declarations)
install (TARGETS ctaobjectstore DESTINATION usr/${CMAKE_INSTALL_LIBDIR})
......
......@@ -347,7 +347,11 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
jd.copyNb = 1;
jd.tapePool = "TapePool0";
jd.ArchiveQueueAddress = tpAddr[0];
aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFileID(), 1000+pass, 0, time(NULL));
cta::common::dataStructures::MountPolicy policy;
policy.archiveMinRequestAge = 0;
policy.archivePriority = 1;
policy.maxDrivesAllowed = 1;
aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFileID(), 1000+pass, policy, time(NULL));
aq.commit();
}
if (pass < 4) { pass++; continue; }
......@@ -361,7 +365,11 @@ TEST(ObjectStore, GarbageCollectorArchiveRequest) {
jd.copyNb = 2;
jd.tapePool = "TapePool1";
jd.ArchiveQueueAddress = tpAddr[1];
aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFileID(), 1000+pass, 0, time(NULL));
cta::common::dataStructures::MountPolicy policy;
policy.archiveMinRequestAge = 0;
policy.archivePriority = 1;
policy.maxDrivesAllowed = 1;
aq.addJob(jd, ar.getAddressIfSet(), ar.getArchiveFileID(), 1000+pass, policy, time(NULL));
aq.commit();
}
if (pass < 5) { pass++; continue; }
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ValueCountMap.hpp"
#include "common/exception/Exception.hpp"
namespace cta { namespace objectstore {
ValueCountMap::ValueCountMap(google::protobuf::RepeatedPtrField<serializers::ValueCountPair>* valueCountMap):
m_valueCountMap(*valueCountMap) { }
void ValueCountMap::decCount(uint64_t value) {
// Find the entry for this value. Failing is an error.
auto counter = std::find_if(m_valueCountMap.begin(), m_valueCountMap.end(),
[&](decltype(*m_valueCountMap.begin()) pair) {
return pair.value() == value;
});
if (counter == m_valueCountMap.end()) {
std::stringstream err;
err << "In ValueCountMap::decCount: no entry found for value=" << value;
throw cta::exception::Exception(err.str());
}
// Decrement the value and remove the entry if needed.
if (counter->count() < 1) {
std::stringstream err;
err << "In ValueCountMap::decCount: entry with wrong count value=" << value << " count=" << counter->count();
throw cta::exception::Exception(err.str());
}
counter->set_count(counter->count()-1);
if (!counter->count()) {
auto size=m_valueCountMap.size();
counter->Swap(&(*(m_valueCountMap.end()-1)));
m_valueCountMap.RemoveLast();
// Cross check that the size has decreased.
if (size -1 != m_valueCountMap.size()) {
std::stringstream err;
err << "In ValueCountMap::decCount: unexpected size after trimming empty entry. expectedSize=" << size -1 << " newSize=" << m_valueCountMap.size();
throw cta::exception::Exception(err.str());
}
// Cross check we cannot find the value.
auto counter2 = std::find_if(m_valueCountMap.begin(), m_valueCountMap.end(),
[&](decltype(*m_valueCountMap.begin()) pair) { return pair.value() == value; });
if (m_valueCountMap.end() != counter2) {
std::stringstream err;
err << "In ValueCountMap::decCount: still found the value after trimming empty entry. value=" << counter2->value() << " count=" << counter2->count();
throw cta::exception::Exception(err.str());
}
}
}
void ValueCountMap::incCount(uint64_t value) {
// Find the entry for this value (might fail)
auto counter = std::find_if(m_valueCountMap.begin(), m_valueCountMap.end(),
[&](decltype(*m_valueCountMap.begin()) pair) {
return pair.value() == value;
});
if (counter != m_valueCountMap.end()) {
if (counter->count() < 1) {
std::stringstream err;
err << "In ValueCountMap::incCount: unexpected count value=" << counter->value() << " count=" << counter->count();
throw cta::exception::Exception(err.str());
} else {
counter->set_value(counter->value()+1);
}
} else {
// Create the new entry if necessary.
auto newCounter = m_valueCountMap.Add();
newCounter->set_value(value);
newCounter->set_count(1);
}
}
uint64_t ValueCountMap::maxValue() {
if (!m_valueCountMap.size()) throw cta::exception::Exception("In ValueCountMap::maxValue: empty map");
uint64_t ret = std::numeric_limits<uint64_t>::min();
std::for_each(m_valueCountMap.begin(), m_valueCountMap.end(),
[&](decltype(*m_valueCountMap.begin()) pair) {
if (ret<pair.value()) ret=pair.value();
});
return ret;
}
uint64_t ValueCountMap::minValue() {
if (!m_valueCountMap.size()) throw cta::exception::Exception("In ValueCountMap::minValue: empty map");
uint64_t ret = std::numeric_limits<uint64_t>::max();
std::for_each(m_valueCountMap.begin(), m_valueCountMap.end(),
[&](decltype(*m_valueCountMap.begin()) pair) {
if (ret>pair.value()) ret=pair.value();
});
return ret;
}
uint64_t ValueCountMap::total() {
uint64_t ret = 0;
std::for_each(m_valueCountMap.begin(), m_valueCountMap.end(),
[&](decltype(*m_valueCountMap.begin()) pair) {
if (pair.count()<1) {
std::stringstream err;
err << "In ValueCountMap::total: unexpected count value=" << pair.value() << " count=" << pair.count();
throw cta::exception::Exception(err.str());
}
ret += pair.count();
});
return ret;
}
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -18,29 +18,22 @@
#pragma once
#include "SchedulerDatabaseFactory.hpp"
#include "objectstore/cta.pb.h"
namespace cta {
namespace cta { namespace objectstore {
/**
* A conncret implementation of a scheduler database factory that creates mock
* objects.
*/
class MockSchedulerDatabaseFactory: public SchedulerDatabaseFactory {
/** A helper class allowing manipulation of arrays of ValueCountPairs, used as containers for running
* counters for properties with multiple possible values. */
class ValueCountMap {
public:
/**
* Destructor.
*/
~MockSchedulerDatabaseFactory() throw();
/**
* Returns a newly created scheduler database object.
*
* @return A newly created scheduler database object.
*/
std::unique_ptr<SchedulerDatabase> create() const;
}; // class MockSchedulerDatabaseFactory
} // namespace cta
ValueCountMap (google::protobuf::RepeatedPtrField<serializers::ValueCountPair>* valueCountMap);
void incCount(uint64_t value);
void decCount(uint64_t value);
uint64_t total();
uint64_t minValue();
uint64_t maxValue();
private:
google::protobuf::RepeatedPtrField<serializers::ValueCountPair>& m_valueCountMap;
};
}} // namespace cta::objectstore
\ No newline at end of file
......@@ -23,13 +23,13 @@ enum ObjectType {
AgentRegister_t = 1;
Agent_t = 2;
DriveRegister_t = 3;
ArchiveToFileRequest_t = 6;
RetrieveToFileRequest_t = 7;
SchedulerGlobalLock_t = 8;
ArchiveRequest_t = 9;
RetrieveRequest_t = 10;
ArchiveQueue_t = 11;
RetrieveQueue_t = 12;
ArchiveToFileRequest_t = 4;
RetrieveToFileRequest_t = 5;
SchedulerGlobalLock_t = 6;
ArchiveRequest_t = 7;
RetrieveRequest_t = 8;
ArchiveQueue_t = 9;
RetrieveQueue_t = 10;
GenericObject_t = 1000;
}
......@@ -348,9 +348,10 @@ message EntryLog {
}
message MountPolicy {
required uint64 minRequestAge = 8985;
required uint64 priority = 8987;
required uint64 maxDrives = 8989;
required string name = 8980;
required uint64 minRequestAge = 8981;
required uint64 priority = 8982;
required uint64 maxDrives = 8983;
}
message ArchiveRequest {
......@@ -383,13 +384,21 @@ message RetrieveRequest {
repeated RetrieveJobEntry jobs = 9152;
}
message ValueCountPair {
required uint64 value = 9201;
required uint64 count = 9202;
}
message ArchiveQueue {
required string tapepoolname = 10000;
repeated ArchiveJobPointer pendingarchivejobs = 10010;
repeated ArchiveJobPointer orphanedarchivejobsnscreation = 10020;
repeated ArchiveJobPointer orphanedarchivejobsnsdeletion = 10030;
required uint64 ArchiveJobsTotalSize = 10040;
required uint64 oldestJobCreationTime = 10050;
repeated ValueCountPair prioritymap = 10031;
repeated ValueCountPair minarchiverequestagemap = 10032;
repeated ValueCountPair maxdrivesallowedmap = 10033;
required uint64 archivejobstotalsize = 10040;
required uint64 oldestjobcreationtime = 10050;
}
message RetrieveQueue {
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "scheduler/ArchiveToTapeCopyRequest.hpp"
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::ArchiveToTapeCopyRequest::ArchiveToTapeCopyRequest() {
}
//------------------------------------------------------------------------------
// destructor
//------------------------------------------------------------------------------
cta::ArchiveToTapeCopyRequest::~ArchiveToTapeCopyRequest() throw() {
}
//------------------------------------------------------------------------------
// constructor
//------------------------------------------------------------------------------
cta::ArchiveToTapeCopyRequest::ArchiveToTapeCopyRequest(
const std::string & diskFileID,
const uint64_t archiveFileID,
const uint16_t copyNb,
const std::string tapePoolName,
const uint64_t priority,
const common::dataStructures::EntryLog &entryLog):
UserArchiveRequest(priority, entryLog),
diskFileID(diskFileID),
archiveFileID(archiveFileID),
copyNb(copyNb),
tapePoolName(tapePoolName) {
}
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "common/remoteFS/RemotePathAndStatus.hpp"
#include "scheduler/UserArchiveRequest.hpp"
#include <stdint.h>
#include <string>
namespace cta {
/**
* A user request to archive a remote file to a copy on tape.
*/
struct ArchiveToTapeCopyRequest: public UserArchiveRequest {
/**
* Constructor.
*/
ArchiveToTapeCopyRequest();
/**
* Destructor.
*/
~ArchiveToTapeCopyRequest() throw();
/**
* Constructor.
*
* @param diskFileID The ID of the remote file to be archived.
* @param archiveFileID The ID of the archive file
* @param copyNb The tape copy number.
* @param tapePoolName The name of the destination tape pool.
* @param priority The priority of the request.
* @param requester The identity of the user who made the request.
* @param entryLog log for the creation of the request.
*/
ArchiveToTapeCopyRequest(
const std::string & diskFileID,
const uint64_t archiveFileID,
const uint16_t copyNb,
const std::string tapePoolName,
const uint64_t priority,
const common::dataStructures::EntryLog & entryLog);
/**
* The ID of the remote file to be archived.
*/
std::string diskFileID;
/**
* The full path of the source archive file.
*/
uint64_t archiveFileID;
/**
* The tape copy number.
*/
uint16_t copyNb;
/**
* The name of the destination tape pool.
*/
std::string tapePoolName;
}; // class ArchiveToTapeCopyRequest
} // namespace cta
......@@ -7,11 +7,8 @@ set (CTA_SCHEDULER_SRC_FILES
ArchiveJob.cpp
ArchiveMount.cpp
UserArchiveRequest.cpp
ArchiveToTapeCopyRequest.cpp
TapeMount.cpp
LogicalLibrary.cpp
# mockDB/MockSchedulerDatabase.cpp
# mockDB/MockSchedulerDatabaseFactory.cpp
MountType.cpp
PositioningMethod.cpp
RetrieveJob.cpp
......@@ -36,8 +33,6 @@ target_link_libraries (ctascheduler ctacommon ctaobjectstore protobuf Utils ctau
# _old_prototype_DummyScheduler.cpp)
add_library (ctaschedulerunittests SHARED
# mockDB/MockSchedulerDatabase.cpp
# mockDB/MockSchedulerDatabaseFactory.cpp
SchedulerDatabaseFactory.cpp
SchedulerDatabaseTest.cpp
SchedulerTest.cpp)
......
......@@ -37,7 +37,6 @@
#include "common/archiveNS/Tape.hpp"
#include "RetrieveToFileRequest.hpp"
#include "common/archiveNS/TapeFileLocation.hpp"
#include "ArchiveToTapeCopyRequest.hpp"
#include "common/archiveNS/ArchiveFile.hpp"
#include "objectstore/ArchiveRequest.hpp"
#include "common/dataStructures/MountPolicy.hpp"
......@@ -103,8 +102,8 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>
m.filesQueued = aqueue.getJobsSummary().files;
m.oldestJobStartTime = aqueue.getJobsSummary().oldestJobStartTime;
m.priority = aqueue.getJobsSummary().priority;
m.mountPolicy = aqueue.getMountPolicy();
m.maxDrivesAllowed = aqueue.getJobsSummary().maxDrivesAllowed;
m.minArchiveRequestAge = aqueue.getJobsSummary().minArchiveRequestAge;
m.logicalLibrary = "";
}
......@@ -271,116 +270,11 @@ std::unique_ptr<SchedulerDatabase::TapeMountDecisionInfo>
}
*/
void OStoreDB::ArchiveRequestCreation::complete() {
// We inherited all the objects from the creation.
// Lock is still here at that point.
// First, record that we are fine for next step.
m_request.setAllJobsLinkingToArchiveQueue();
m_request.commit();
objectstore::RootEntry re(m_objectStore);
// We can now plug the request onto its tape pools.
// We can discover at that point that a tape pool is actually not
// really owned by the root entry, and hence a dangling pointer
// We should then unlink the jobs from that already connected
// tape pools and abort the job creation.
// The list of done tape pools is held here for this purpose
// Reconstruct the job list
auto jl = m_request.dumpJobs();
std::list<std::string> linkedTapePools;
try {
for (auto j=jl.begin(); j!=jl.end(); j++) {
objectstore::ArchiveQueue aq(j->ArchiveQueueAddress, m_objectStore);
ScopedExclusiveLock aql(aq);
aq.fetch();
if (aq.getOwner() != re.getAddressIfSet())
throw NoSuchArchiveQueue("In OStoreDB::queue: non-existing archive queue found "
"(dangling pointer): canceling request creation.");
aq.addJob(*j, m_request.getAddressIfSet(), m_request.getArchiveFileID(),
m_request.getFileSize(), m_request.getMountPolicy().archivePriority,
m_request.getCreationLog().time);
// Now that we have the tape pool handy, get the retry limits from it and
// assign them to the job
m_request.setJobFailureLimits(j->copyNb, aq.getMountPolicy().maxRetriesWithinMount,
aq.getMountPolicy().maxTotalRetries);
aq.commit();
linkedTapePools.push_back(j->ArchiveQueueAddress);
}
} catch (NoSuchArchiveQueue &) {
// Unlink the request from already connected tape pools
for (auto tpa=linkedTapePools.begin(); tpa!=linkedTapePools.end(); tpa++) {
objectstore::ArchiveQueue aq(*tpa, m_objectStore);
ScopedExclusiveLock aql(aq);
aq.fetch();
aq.removeJob(m_request.getAddressIfSet());
aq.commit();
m_request.remove();
}