Commit b16d8231 authored by Eric Cano's avatar Eric Cano
Browse files

Fixed missing updates to ArchiveQueue summary maps.

Changed ArchiveQueue's schema in object store so the per-job values are cached locally in the queue.
Added a sanity check on commit() so the summary are rebuilt if incoherent.
parent a6540da7
......@@ -62,6 +62,28 @@ void ArchiveQueue::initialize(const std::string& name) {
m_payloadInterpreted = true;
}
void ArchiveQueue::commit() {
// Before calling ObjectOps::commit, check that we have coherent queue summaries
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
if (maxDriveAllowedMap.total() != (uint64_t)m_payload.pendingarchivejobs_size() ||
priorityMap.total() != (uint64_t)m_payload.pendingarchivejobs_size() ||
minArchiveRequestAgeMap.total() != (uint64_t)m_payload.pendingarchivejobs_size()) {
// The maps counts are off: recompute them.
maxDriveAllowedMap.clear();
priorityMap.clear();
minArchiveRequestAgeMap.clear();
for (size_t i=0; i<(size_t)m_payload.pendingarchivejobs_size(); i++) {
maxDriveAllowedMap.incCount(m_payload.pendingarchivejobs(i).maxdrivesallowed());
priorityMap.incCount(m_payload.pendingarchivejobs(i).priority());
minArchiveRequestAgeMap.incCount(m_payload.pendingarchivejobs(i).priority());
}
}
ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>::commit();
}
bool ArchiveQueue::isEmpty() {
checkPayloadReadable();
// Check we have no archive jobs pending
......@@ -140,6 +162,9 @@ void ArchiveQueue::addJob(const ArchiveRequest::JobDump& job,
j->set_size(fileSize);
j->set_fileid(archiveFileId);
j->set_copynb(job.copyNb);
j->set_maxdrivesallowed(policy.maxDrivesAllowed);
j->set_priority(policy.archivePriority);
j->set_minarchiverequestage(policy.archiveMinRequestAge);
}
auto ArchiveQueue::getJobsSummary() -> JobsSummary {
......@@ -174,16 +199,33 @@ bool ArchiveQueue::addJobIfNecessary(
if (j->address() == archiveRequestAddress)
return false;
}
// Keep track of the mounting criteria
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
maxDriveAllowedMap.incCount(policy.maxDrivesAllowed);
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
priorityMap.incCount(policy.archivePriority);
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
minArchiveRequestAgeMap.incCount(policy.archiveMinRequestAge);
if (m_payload.pendingarchivejobs_size()) {
if ((uint64_t)startTime < m_payload.oldestjobcreationtime())
m_payload.set_oldestjobcreationtime(startTime);
m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + fileSize);
} else {
m_payload.set_archivejobstotalsize(fileSize);
m_payload.set_oldestjobcreationtime(startTime);
}
auto * j = m_payload.add_pendingarchivejobs();
j->set_address(archiveRequestAddress);
j->set_size(fileSize);
j->set_fileid(archiveFileId);
j->set_copynb(job.copyNb);
j->set_maxdrivesallowed(policy.maxDrivesAllowed);
j->set_priority(policy.archivePriority);
j->set_minarchiverequestage(policy.archiveMinRequestAge);
return true;
}
void ArchiveQueue::removeJob(const std::string& archiveToFileAddress) {
// TODO: remove the summary of the job from the queue's counts.
checkPayloadWritable();
auto * jl=m_payload.mutable_pendingarchivejobs();
bool found = false;
......@@ -193,6 +235,12 @@ void ArchiveQueue::removeJob(const std::string& archiveToFileAddress) {
for (size_t i=0; i<(size_t)jl->size(); i++) {
if (jl->Get(i).address() == archiveToFileAddress) {
found = true;
ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap());
maxDriveAllowedMap.decCount(jl->Get(i).maxdrivesallowed());
ValueCountMap priorityMap(m_payload.mutable_prioritymap());
priorityMap.decCount(jl->Get(i).priority());
ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap());
minArchiveRequestAgeMap.decCount(jl->Get(i).minarchiverequestage());
while (i+1 < (size_t)jl->size()) {
jl->SwapElements(i, i+1);
i++;
......@@ -224,7 +272,7 @@ bool ArchiveQueue::addOrphanedJobPendingNsCreation(
const ArchiveRequest::JobDump& job,
const std::string& archiveToFileAddress,
uint64_t fileid,
uint64_t size) {
uint64_t size, const cta::common::dataStructures::MountPolicy & policy) {
checkPayloadWritable();
auto & jl=m_payload.orphanedarchivejobsnscreation();
for (auto j=jl.begin(); j!= jl.end(); j++) {
......@@ -236,6 +284,9 @@ bool ArchiveQueue::addOrphanedJobPendingNsCreation(
j->set_size(size);
j->set_fileid(fileid);
j->set_copynb(job.copyNb);
j->set_maxdrivesallowed(policy.maxDrivesAllowed);
j->set_priority(policy.archivePriority);
j->set_minarchiverequestage(policy.archiveMinRequestAge);
return true;
}
......
......@@ -49,6 +49,9 @@ public:
// In memory initialiser
void initialize(const std::string & name);
// Commit with sanity checks (override from ObjectOps
void commit();
// Set/get tape pool
void setTapePool(const std::string & name);
std::string getTapePool();
......@@ -56,17 +59,17 @@ public:
// Archive jobs management ===================================================
void addJob(const ArchiveRequest::JobDump & job,
const std::string & archiveRequestAddress, uint64_t archiveFileId,
uint64_t fileSize, const cta::common::dataStructures::MountPolicy & priority, time_t startTime);
uint64_t fileSize, const cta::common::dataStructures::MountPolicy & policy, time_t startTime);
/// This version will check for existence of the job in the queue before
// returns true if a new job was actually inserted.
bool addJobIfNecessary(const ArchiveRequest::JobDump & job,
const std::string & archiveRequestAddress, uint64_t archiveFileId,
uint64_t fileSize, const cta::common::dataStructures::MountPolicy & priority, time_t startTime);
uint64_t fileSize, const cta::common::dataStructures::MountPolicy & policy, time_t startTime);
/// This version will check for existence of the job in the queue before
// returns true if a new job was actually inserted.
bool addOrphanedJobPendingNsCreation(const ArchiveRequest::JobDump& job,
const std::string& archiveToFileAddress, uint64_t fileid,
uint64_t size);
uint64_t size, const cta::common::dataStructures::MountPolicy & policy);
/// This version will check for existence of the job in the queue before
// returns true if a new job was actually inserted.
bool addOrphanedJobPendingNsDeletion(const ArchiveRequest::JobDump& job,
......
......@@ -337,7 +337,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
jd.tapePool = j->tapepool();
jd.owner = j->owner();
if (aq.addOrphanedJobPendingNsCreation(jd, getAddressIfSet(),
m_payload.archivefileid(), m_payload.filesize()))
m_payload.archivefileid(), m_payload.filesize(), getMountPolicy()))
aq.commit();
j->set_owner(aq.getAddressIfSet());
commit();
......@@ -361,7 +361,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer
jd.tapePool = j->tapepool();
jd.owner = j->owner();
if (aq.addOrphanedJobPendingNsCreation(jd, getAddressIfSet(),
m_payload.archivefileid(), m_payload.filesize()))
m_payload.archivefileid(), m_payload.filesize(), getMountPolicy()))
aq.commit();
j->set_owner(aq.getAddressIfSet());
j->set_status(serializers::AJS_PendingMount);
......
......@@ -87,6 +87,11 @@ void ValueCountMap::incCount(uint64_t value) {
}
}
void ValueCountMap::clear() {
m_valueCountMap.Clear();
}
uint64_t ValueCountMap::maxValue() {
if (!m_valueCountMap.size())
throw cta::exception::Exception("In ValueCountMap::maxValue: empty map");
......
......@@ -29,6 +29,7 @@ public:
ValueCountMap (google::protobuf::RepeatedPtrField<serializers::ValueCountPair>* valueCountMap);
void incCount(uint64_t value);
void decCount(uint64_t value);
void clear();
uint64_t total();
uint64_t minValue();
uint64_t maxValue();
......
......@@ -143,6 +143,9 @@ message ArchiveJobPointer {
required uint64 size = 3001;
required string address = 3002;
required uint32 copynb = 3003;
required uint64 priority = 3004;
required uint64 minarchiverequestage = 3005;
required uint64 maxdrivesallowed = 3006;
}
message RetrieveJobPointer {
......
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "TapeMount.hpp"
namespace cta {
/**
* A dummy tape mount used on unit tests.
* Information reporting functions should not be needed and will throw exceptions.
* Null returning functions do nothing.
*/
class TapeMountDummy: public TapeMount {
void abort() override {};
std::string getMountTransactionId() const override {
throw exception::Exception("In DummyTapeMount::getMountTransactionId() : not implemented");
}
cta::common::dataStructures::MountType getMountType() const override {
throw exception::Exception("In DummyTapeMount::getMountType() : not implemented");
}
uint32_t getNbFiles() const override {
throw exception::Exception("In DummyTapeMount::getNbFiles() : not implemented");
}
std::string getVid() const override {
throw exception::Exception("In DummyTapeMount::getNbFiles() : not implemented");
}
void setDriveStatus(cta::common::dataStructures::DriveStatus status) override {}
};
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment