diff --git a/objectstore/ArchiveQueue.cpp b/objectstore/ArchiveQueue.cpp index 66ff225b27369d7a832dd625306b932e404cbf21..93f12e675194ba2a62bf31fe4d7df41278ed7adb 100644 --- a/objectstore/ArchiveQueue.cpp +++ b/objectstore/ArchiveQueue.cpp @@ -62,6 +62,28 @@ void ArchiveQueue::initialize(const std::string& name) { m_payloadInterpreted = true; } +void ArchiveQueue::commit() { + // Before calling ObjectOps::commit, check that we have coherent queue summaries + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); + if (maxDriveAllowedMap.total() != (uint64_t)m_payload.pendingarchivejobs_size() || + priorityMap.total() != (uint64_t)m_payload.pendingarchivejobs_size() || + minArchiveRequestAgeMap.total() != (uint64_t)m_payload.pendingarchivejobs_size()) { + // The maps counts are off: recompute them. + maxDriveAllowedMap.clear(); + priorityMap.clear(); + minArchiveRequestAgeMap.clear(); + for (size_t i=0; i<(size_t)m_payload.pendingarchivejobs_size(); i++) { + maxDriveAllowedMap.incCount(m_payload.pendingarchivejobs(i).maxdrivesallowed()); + priorityMap.incCount(m_payload.pendingarchivejobs(i).priority()); + minArchiveRequestAgeMap.incCount(m_payload.pendingarchivejobs(i).priority()); + } + } + ObjectOps<serializers::ArchiveQueue, serializers::ArchiveQueue_t>::commit(); +} + + bool ArchiveQueue::isEmpty() { checkPayloadReadable(); // Check we have no archive jobs pending @@ -140,6 +162,9 @@ void ArchiveQueue::addJob(const ArchiveRequest::JobDump& job, j->set_size(fileSize); j->set_fileid(archiveFileId); j->set_copynb(job.copyNb); + j->set_maxdrivesallowed(policy.maxDrivesAllowed); + j->set_priority(policy.archivePriority); + j->set_minarchiverequestage(policy.archiveMinRequestAge); } auto ArchiveQueue::getJobsSummary() -> JobsSummary { @@ -174,16 +199,33 @@ bool ArchiveQueue::addJobIfNecessary( if (j->address() == archiveRequestAddress) return false; } + // Keep track of the mounting criteria + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + maxDriveAllowedMap.incCount(policy.maxDrivesAllowed); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + priorityMap.incCount(policy.archivePriority); + ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); + minArchiveRequestAgeMap.incCount(policy.archiveMinRequestAge); + if (m_payload.pendingarchivejobs_size()) { + if ((uint64_t)startTime < m_payload.oldestjobcreationtime()) + m_payload.set_oldestjobcreationtime(startTime); + m_payload.set_archivejobstotalsize(m_payload.archivejobstotalsize() + fileSize); + } else { + m_payload.set_archivejobstotalsize(fileSize); + m_payload.set_oldestjobcreationtime(startTime); + } auto * j = m_payload.add_pendingarchivejobs(); j->set_address(archiveRequestAddress); j->set_size(fileSize); j->set_fileid(archiveFileId); j->set_copynb(job.copyNb); + j->set_maxdrivesallowed(policy.maxDrivesAllowed); + j->set_priority(policy.archivePriority); + j->set_minarchiverequestage(policy.archiveMinRequestAge); return true; } void ArchiveQueue::removeJob(const std::string& archiveToFileAddress) { - // TODO: remove the summary of the job from the queue's counts. checkPayloadWritable(); auto * jl=m_payload.mutable_pendingarchivejobs(); bool found = false; @@ -193,6 +235,12 @@ void ArchiveQueue::removeJob(const std::string& archiveToFileAddress) { for (size_t i=0; i<(size_t)jl->size(); i++) { if (jl->Get(i).address() == archiveToFileAddress) { found = true; + ValueCountMap maxDriveAllowedMap(m_payload.mutable_maxdrivesallowedmap()); + maxDriveAllowedMap.decCount(jl->Get(i).maxdrivesallowed()); + ValueCountMap priorityMap(m_payload.mutable_prioritymap()); + priorityMap.decCount(jl->Get(i).priority()); + ValueCountMap minArchiveRequestAgeMap(m_payload.mutable_minarchiverequestagemap()); + minArchiveRequestAgeMap.decCount(jl->Get(i).minarchiverequestage()); while (i+1 < (size_t)jl->size()) { jl->SwapElements(i, i+1); i++; @@ -224,7 +272,7 @@ bool ArchiveQueue::addOrphanedJobPendingNsCreation( const ArchiveRequest::JobDump& job, const std::string& archiveToFileAddress, uint64_t fileid, - uint64_t size) { + uint64_t size, const cta::common::dataStructures::MountPolicy & policy) { checkPayloadWritable(); auto & jl=m_payload.orphanedarchivejobsnscreation(); for (auto j=jl.begin(); j!= jl.end(); j++) { @@ -236,6 +284,9 @@ bool ArchiveQueue::addOrphanedJobPendingNsCreation( j->set_size(size); j->set_fileid(fileid); j->set_copynb(job.copyNb); + j->set_maxdrivesallowed(policy.maxDrivesAllowed); + j->set_priority(policy.archivePriority); + j->set_minarchiverequestage(policy.archiveMinRequestAge); return true; } diff --git a/objectstore/ArchiveQueue.hpp b/objectstore/ArchiveQueue.hpp index 71181e6dcf83ea36ac1b38776d6bb5701bc3b97e..5ffd34b5a3e2a7a6c37a8d75c7488fa84bd55978 100644 --- a/objectstore/ArchiveQueue.hpp +++ b/objectstore/ArchiveQueue.hpp @@ -49,6 +49,9 @@ public: // In memory initialiser void initialize(const std::string & name); + // Commit with sanity checks (override from ObjectOps + void commit(); + // Set/get tape pool void setTapePool(const std::string & name); std::string getTapePool(); @@ -56,17 +59,17 @@ public: // Archive jobs management =================================================== void addJob(const ArchiveRequest::JobDump & job, const std::string & archiveRequestAddress, uint64_t archiveFileId, - uint64_t fileSize, const cta::common::dataStructures::MountPolicy & priority, time_t startTime); + uint64_t fileSize, const cta::common::dataStructures::MountPolicy & policy, time_t startTime); /// This version will check for existence of the job in the queue before // returns true if a new job was actually inserted. bool addJobIfNecessary(const ArchiveRequest::JobDump & job, const std::string & archiveRequestAddress, uint64_t archiveFileId, - uint64_t fileSize, const cta::common::dataStructures::MountPolicy & priority, time_t startTime); + uint64_t fileSize, const cta::common::dataStructures::MountPolicy & policy, time_t startTime); /// This version will check for existence of the job in the queue before // returns true if a new job was actually inserted. bool addOrphanedJobPendingNsCreation(const ArchiveRequest::JobDump& job, const std::string& archiveToFileAddress, uint64_t fileid, - uint64_t size); + uint64_t size, const cta::common::dataStructures::MountPolicy & policy); /// This version will check for existence of the job in the queue before // returns true if a new job was actually inserted. bool addOrphanedJobPendingNsDeletion(const ArchiveRequest::JobDump& job, diff --git a/objectstore/ArchiveRequest.cpp b/objectstore/ArchiveRequest.cpp index 16b79b9cd5715d9e3940ebb39f55ec4ea88136c8..09ddd2db22d0c9c736e47fdbc5bb54cbbf79598a 100644 --- a/objectstore/ArchiveRequest.cpp +++ b/objectstore/ArchiveRequest.cpp @@ -337,7 +337,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer jd.tapePool = j->tapepool(); jd.owner = j->owner(); if (aq.addOrphanedJobPendingNsCreation(jd, getAddressIfSet(), - m_payload.archivefileid(), m_payload.filesize())) + m_payload.archivefileid(), m_payload.filesize(), getMountPolicy())) aq.commit(); j->set_owner(aq.getAddressIfSet()); commit(); @@ -361,7 +361,7 @@ void ArchiveRequest::garbageCollect(const std::string &presumedOwner, AgentRefer jd.tapePool = j->tapepool(); jd.owner = j->owner(); if (aq.addOrphanedJobPendingNsCreation(jd, getAddressIfSet(), - m_payload.archivefileid(), m_payload.filesize())) + m_payload.archivefileid(), m_payload.filesize(), getMountPolicy())) aq.commit(); j->set_owner(aq.getAddressIfSet()); j->set_status(serializers::AJS_PendingMount); diff --git a/objectstore/ValueCountMap.cpp b/objectstore/ValueCountMap.cpp index 712639a7a380849a10de90f6489fbd9264325fd9..010fda28d4b0c0fdda483def614fe1db9b39ea45 100644 --- a/objectstore/ValueCountMap.cpp +++ b/objectstore/ValueCountMap.cpp @@ -87,6 +87,11 @@ void ValueCountMap::incCount(uint64_t value) { } } +void ValueCountMap::clear() { + m_valueCountMap.Clear(); +} + + uint64_t ValueCountMap::maxValue() { if (!m_valueCountMap.size()) throw cta::exception::Exception("In ValueCountMap::maxValue: empty map"); diff --git a/objectstore/ValueCountMap.hpp b/objectstore/ValueCountMap.hpp index c2137490e0136961256efb66225a627625a53422..a46bd5af9f4534d6738fda7151c9a2f09462a91e 100644 --- a/objectstore/ValueCountMap.hpp +++ b/objectstore/ValueCountMap.hpp @@ -29,6 +29,7 @@ public: ValueCountMap (google::protobuf::RepeatedPtrField<serializers::ValueCountPair>* valueCountMap); void incCount(uint64_t value); void decCount(uint64_t value); + void clear(); uint64_t total(); uint64_t minValue(); uint64_t maxValue(); diff --git a/objectstore/cta.proto b/objectstore/cta.proto index 2a9de213f04e34fae395281ed56850e46d42665a..f15999b92b82f6e4e7b5d37093913fed07921ed9 100644 --- a/objectstore/cta.proto +++ b/objectstore/cta.proto @@ -143,6 +143,9 @@ message ArchiveJobPointer { required uint64 size = 3001; required string address = 3002; required uint32 copynb = 3003; + required uint64 priority = 3004; + required uint64 minarchiverequestage = 3005; + required uint64 maxdrivesallowed = 3006; } message RetrieveJobPointer { diff --git a/scheduler/TapeMountDummy.hpp b/scheduler/TapeMountDummy.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5602d9406f3ce9de58651219bca01455db1558ca --- /dev/null +++ b/scheduler/TapeMountDummy.hpp @@ -0,0 +1,47 @@ +/* + * The CERN Tape Archive (CTA) project + * Copyright (C) 2015 CERN + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "TapeMount.hpp" + +namespace cta { + +/** + * A dummy tape mount used on unit tests. + * Information reporting functions should not be needed and will throw exceptions. + * Null returning functions do nothing. + */ +class TapeMountDummy: public TapeMount { + void abort() override {}; + std::string getMountTransactionId() const override { + throw exception::Exception("In DummyTapeMount::getMountTransactionId() : not implemented"); + } + cta::common::dataStructures::MountType getMountType() const override { + throw exception::Exception("In DummyTapeMount::getMountType() : not implemented"); + } + uint32_t getNbFiles() const override { + throw exception::Exception("In DummyTapeMount::getNbFiles() : not implemented"); + } + std::string getVid() const override { + throw exception::Exception("In DummyTapeMount::getNbFiles() : not implemented"); + } + void setDriveStatus(cta::common::dataStructures::DriveStatus status) override {} +}; + +} \ No newline at end of file