Commit f502d01c authored by Michael Davis's avatar Michael Davis
Browse files

[os-generic-queues] Adds three RetrieveQueue types

parent 6ec8935b
/*
* The CERN Tape Archive (CTA) project
* Copyright (C) 2015 CERN
......@@ -35,8 +34,8 @@
namespace unitTests {
void fill_retrieve_requests(
typename cta::objectstore::ContainerAlgorithms<cta::objectstore::RetrieveQueue,cta::objectstore::RetrieveQueue>::InsertedElement::list &requests,
void fillRetrieveRequests(
typename cta::objectstore::ContainerAlgorithms<cta::objectstore::RetrieveQueue,cta::objectstore::RetrieveQueueToTransfer>::InsertedElement::list &requests,
cta::objectstore::BackendVFS &be,
cta::objectstore::AgentReference &agentRef)
{
......@@ -72,7 +71,7 @@ void fill_retrieve_requests(
rqc.mountPolicy.maxDrivesAllowed = 1;
rqc.mountPolicy.retrieveMinRequestAge = 1;
rqc.mountPolicy.retrievePriority = 1;
requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueue>::InsertedElement{
requests.emplace_back(ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement{
cta::make_unique<RetrieveRequest>(rrAddr, be), 1, i, 667, mp, serializers::RetrieveJobStatus::RJS_ToTransfer
});
auto &rr = *requests.back().retrieveRequest;
......@@ -186,8 +185,8 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
rel.release();
agent.initialize();
agent.insertAndRegisterSelf(lc);
ContainerAlgorithms<RetrieveQueue,RetrieveQueue>::InsertedElement::list requests;
fill_retrieve_requests(requests, be, agentRef);
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests;
fillRetrieveRequests(requests, be, agentRef);
{
// Second agent to test referenceAndSwitchOwnershipIfNecessary
......@@ -205,17 +204,17 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
rel2.release();
agent2.initialize();
agent2.insertAndRegisterSelf(lc);
ContainerAlgorithms<RetrieveQueue,RetrieveQueue>::InsertedElement::list requests2;
fill_retrieve_requests(requests2, be2, agentRef2);
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer>::InsertedElement::list requests2;
fillRetrieveRequests(requests2, be2, agentRef2);
auto a1 = agentRef2.getAgentAddress();
auto a2 = agentRef2.getAgentAddress();
ContainerAlgorithms<RetrieveQueue,RetrieveQueue> retrieveAlgos2(be2, agentRef2);
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos2(be2, agentRef2);
retrieveAlgos2.referenceAndSwitchOwnershipIfNecessary("VID", QueueType::JobsToTransfer,
a2, a1, requests2, lc);
}
ContainerAlgorithms<RetrieveQueue,RetrieveQueue> retrieveAlgos(be, agentRef);
ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> retrieveAlgos(be, agentRef);
try {
ASSERT_EQ(requests.size(), 10);
......@@ -223,19 +222,19 @@ TEST(ObjectStore, RetrieveQueueAlgorithms) {
agentRef.getAgentAddress(), requests, lc);
// Now get the requests back
ContainerTraits<RetrieveQueue,RetrieveQueue>::PopCriteria popCriteria;
ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PopCriteria popCriteria;
popCriteria.bytes = std::numeric_limits<decltype(popCriteria.bytes)>::max();
popCriteria.files = 100;
auto poppedJobs = retrieveAlgos.popNextBatch("VID", QueueType::JobsToTransfer, popCriteria, lc);
ASSERT_EQ(poppedJobs.summary.files, 10);
// Validate that the summary has the same information as the popped elements
ContainerTraits<RetrieveQueue,RetrieveQueue>::PoppedElementsSummary s;
ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::PoppedElementsSummary s;
for(auto &e: poppedJobs.elements) {
s += ContainerTraits<RetrieveQueue,RetrieveQueue>::getElementSummary(e);
s += ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::getElementSummary(e);
}
ASSERT_EQ(s, poppedJobs.summary);
} catch (ContainerTraits<RetrieveQueue,RetrieveQueue>::OwnershipSwitchFailure & ex) {
} catch (ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::OwnershipSwitchFailure & ex) {
for (auto & e: ex.failedElements) {
try {
throw e.failure;
......
......@@ -65,7 +65,9 @@ add_library (ctaobjectstore SHARED
ArchiveQueueFailedAlgorithms.cpp
RetrieveQueue.cpp
RetrieveQueueShard.cpp
RetrieveQueueAlgorithms.cpp
RetrieveQueueToTransferAlgorithms.cpp
RetrieveQueueToReportAlgorithms.cpp
RetrieveQueueFailedAlgorithms.cpp
QueueType.cpp
ArchiveRequest.cpp
RetrieveRequest.cpp
......
......@@ -40,6 +40,9 @@ struct ArchiveQueueToTransfer;
struct ArchiveQueueToReport;
struct ArchiveQueueFailed;
struct RetrieveQueue;
struct RetrieveQueueToTransfer;
struct RetrieveQueueToReport;
struct RetrieveQueueFailed;
class ObjectOpsBase {
friend class ScopedLock;
......@@ -50,7 +53,9 @@ class ObjectOpsBase {
friend ContainerTraits<ArchiveQueue,ArchiveQueueToTransfer>;
friend ContainerTraits<ArchiveQueue,ArchiveQueueToReport>;
friend ContainerTraits<ArchiveQueue,ArchiveQueueFailed>;
friend ContainerTraits<RetrieveQueue,RetrieveQueue>;
friend ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>;
friend ContainerTraits<RetrieveQueue,RetrieveQueueToReport>;
friend ContainerTraits<RetrieveQueue,RetrieveQueueFailed>;
protected:
ObjectOpsBase(Backend & os): m_nameSet(false), m_objectStore(os),
m_headerInterpreted(false), m_payloadInterpreted(false),
......
......@@ -145,4 +145,19 @@ private:
uint64_t m_maxShardSize = c_defaultMaxShardSize;
};
class RetrieveQueueToTransfer : public RetrieveQueue {
public:
template<typename...Ts> RetrieveQueueToTransfer(Ts&...args): RetrieveQueue(args...) {}
};
class RetrieveQueueToReport : public RetrieveQueue {
public:
template<typename...Ts> RetrieveQueueToReport(Ts&...args): RetrieveQueue(args...) {}
};
class RetrieveQueueFailed : public RetrieveQueue {
public:
template<typename...Ts> RetrieveQueueFailed(Ts&...args): RetrieveQueue(args...) {}
};
}}
......@@ -21,9 +21,9 @@
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<RetrieveQueue,RetrieveQueue>::c_containerTypeName = "RetrieveQueue";
const std::string ContainerTraits<RetrieveQueue,RetrieveQueueFailed>::c_containerTypeName = "RetrieveQueueFailed";
template<>
const std::string ContainerTraits<RetrieveQueue,RetrieveQueue>::c_identifierType = "vid";
const std::string ContainerTraits<RetrieveQueue,RetrieveQueueFailed>::c_identifierType = "vid";
}} // namespace cta::objectstore
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RetrieveQueueAlgorithms.hpp"
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<RetrieveQueue,RetrieveQueueToReport>::c_containerTypeName = "RetrieveQueueToReport";
template<>
const std::string ContainerTraits<RetrieveQueue,RetrieveQueueToReport>::c_identifierType = "vid";
}} // namespace cta::objectstore
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "RetrieveQueueAlgorithms.hpp"
namespace cta { namespace objectstore {
template<>
const std::string ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::c_containerTypeName = "RetrieveQueueToTransfer";
template<>
const std::string ContainerTraits<RetrieveQueue,RetrieveQueueToTransfer>::c_identifierType = "vid";
}} // namespace cta::objectstore
......@@ -1822,7 +1822,7 @@ const OStoreDB::RetrieveMount::MountInfo& OStoreDB::RetrieveMount::getMountInfo(
std::list<std::unique_ptr<SchedulerDatabase::RetrieveJob>> OStoreDB::RetrieveMount::
getNextJobBatch(uint64_t filesRequested, uint64_t bytesRequested, log::LogContext &logContext)
{
typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueue> RQAlgos;
typedef objectstore::ContainerAlgorithms<RetrieveQueue,RetrieveQueueToTransfer> RQAlgos;
RQAlgos rqAlgos(m_oStoreDB.m_objectStore, *m_oStoreDB.m_agentReference);
RQAlgos::PopCriteria popCriteria(filesRequested, bytesRequested);
auto jobs = rqAlgos.popNextBatch(mountInfo.vid, objectstore::QueueType::JobsToTransfer, popCriteria, logContext);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment