Commit 7039f5f6 authored by Cedric CAFFY's avatar Cedric CAFFY
Browse files

Queueing of Archive jobs now use the ContainerAlgorithms instead of the...

Queueing of Archive jobs now use the ContainerAlgorithms instead of the methods in the ArchiveQueue object
Created a base class (interface) that allows to add and remove object(s) from the ownership of an Agent (implementation : AgentWrapper) or from an AgentReference (implementation : AgentReference)
parent 1c4b90dc
......@@ -43,6 +43,7 @@ class Sorter;
class Agent: public ObjectOps<serializers::Agent, serializers::Agent_t> {
friend class AgentReference;
friend class GarbageCollector;
friend class AgentWrapper;
friend class Sorter;
public:
CTA_GENERATE_EXCEPTION_CLASS(AgentStillOwnsObjects);
......
......@@ -24,6 +24,7 @@
#include "common/threading/MutexLocker.hpp"
#include "common/log/Logger.hpp"
#include "common/log/LogContext.hpp"
#include "AgentReferenceInterface.hpp"
#include <atomic>
#include <string>
#include <future>
......@@ -41,7 +42,7 @@ class Agent;
* Agent object in the object store).
* A process
*/
class AgentReference {
class AgentReference: public AgentReferenceInterface{
public:
/**
* Constructor will implicitly generate the address of the Agent object.
......@@ -64,14 +65,14 @@ public:
* @param objectAddress
* @param backend reference to the backend to use.
*/
void addToOwnership(const std::string &objectAddress, objectstore::Backend& backend);
void addToOwnership(const std::string &objectAddress, objectstore::Backend& backend) override;
/**
* Adds a list of object addresses to the referenced agent. The addition is immediate.
* @param objectAdresses
* @param backend reference to the backend to use.
*/
void addBatchToOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend);
void addBatchToOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend) override;
/**
* Removes an object address from the referenced agent. The additions and removals
......@@ -79,14 +80,14 @@ public:
* The execution order is guaranteed.
* @param objectAddress
*/
void removeFromOwnership(const std::string &objectAddress, objectstore::Backend& backend);
void removeFromOwnership(const std::string &objectAddress, objectstore::Backend& backend) override;
/**
* Removes a list of object addresses to the referenced agent. The removal is immediate.
* @param objectAdresses
* @param backend reference to the backend to use.
*/
void removeBatchFromOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend);
void removeBatchFromOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend) override;
/**
* Bumps up the heart beat of the agent. This action is queued in memory like the
......@@ -98,7 +99,7 @@ public:
* Gets the address of the Agent object generated on construction.
* @return the agent object address.
*/
std::string getAgentAddress();
std::string getAgentAddress() override;
private:
static std::atomic<uint64_t> g_nextAgentId;
std::atomic<uint64_t> m_nextId;
......
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "AgentReferenceInterface.hpp"
namespace cta { namespace objectstore {
AgentReferenceInterface::AgentReferenceInterface() {
}
AgentReferenceInterface::~AgentReferenceInterface() {
}
}}
\ No newline at end of file
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef AGENTREFERENCEDECORATOR_HPP
#define AGENTREFERENCEDECORATOR_HPP
#include "Backend.hpp"
#include <string>
namespace cta { namespace objectstore {
class AgentReferenceInterface {
public:
AgentReferenceInterface();
virtual ~AgentReferenceInterface();
/**
* Adds an object address to the referenced agent. The additions and removals
* are queued in memory so that several threads can share the same access.
* The execution order is guaranteed.
* @param objectAddress
* @param backend reference to the backend to use.
*/
virtual void addToOwnership(const std::string &objectAddress, objectstore::Backend& backend) = 0;
/**
* Adds a list of object addresses to the referenced agent. The addition is immediate.
* @param objectAdresses
* @param backend reference to the backend to use.
*/
virtual void addBatchToOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend) = 0;
/**
* Removes an object address from the referenced agent. The additions and removals
* are queued in memory so that several threads can share the same access.
* The execution order is guaranteed.
* @param objectAddress
*/
virtual void removeFromOwnership(const std::string &objectAddress, objectstore::Backend& backend) = 0;
/**
* Removes a list of object addresses to the referenced agent. The removal is immediate.
* @param objectAdresses
* @param backend reference to the backend to use.
*/
virtual void removeBatchFromOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend) = 0;
/**
* Returns the agent address
* @return the agent address
*/
virtual std::string getAgentAddress() = 0;
};
}}
#endif /* AGENTREFERENCEDECORATOR_HPP */
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "AgentWrapper.hpp"
namespace cta { namespace objectstore {
AgentWrapper::AgentWrapper(Agent& agent):m_agent(agent) {
}
AgentWrapper::~AgentWrapper() {
}
/**
* Adds an object address to the current agent. The additions and removals
* are queued in memory so that several threads can share the same access.
* The execution order is guaranteed.
* @param objectAddress
* @param backend reference to the backend to use.
*/
void AgentWrapper::addToOwnership(const std::string &objectAddress, objectstore::Backend& backend){
ScopedExclusiveLock sel(m_agent);
m_agent.fetch();
m_agent.addToOwnership(objectAddress);
m_agent.commit();
sel.release();
}
/**
* Adds a list of object addresses to the current agent. The addition is immediate.
* @param objectAdresses
* @param backend reference to the backend to use.
*/
void AgentWrapper::addBatchToOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend){
ScopedExclusiveLock sel(m_agent);
m_agent.fetch();
for(auto& address: objectAdresses){
m_agent.addToOwnership(address);
}
m_agent.commit();
sel.release();
}
/**
* Removes an object address from the current agent. The additions and removals
* are queued in memory so that several threads can share the same access.
* The execution order is guaranteed.
* @param objectAddress
*/
void AgentWrapper::removeFromOwnership(const std::string &objectAddress, objectstore::Backend& backend){
ScopedExclusiveLock sel(m_agent);
m_agent.fetch();
m_agent.removeFromOwnership(objectAddress);
m_agent.commit();
sel.release();
}
/**
* Removes a list of object addresses to the current agent. The removal is immediate.
* @param objectAdresses
* @param backend reference to the backend to use.
*/
void AgentWrapper::removeBatchFromOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend){
ScopedExclusiveLock sel(m_agent);
m_agent.fetch();
for(auto& address: objectAdresses){
m_agent.removeFromOwnership(address);
}
m_agent.commit();
sel.release();
}
std::string AgentWrapper::getAgentAddress(){
return m_agent.getAddressIfSet();
}
}}
\ No newline at end of file
/**
* The CERN Tape Archive (CTA) project
* Copyright © 2018 CERN
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef AGENTWRAPPER_HPP
#define AGENTWRAPPER_HPP
#include "AgentReferenceInterface.hpp"
#include "Agent.hpp"
namespace cta { namespace objectstore {
class AgentWrapper: public AgentReferenceInterface {
public:
AgentWrapper(Agent& agent);
virtual ~AgentWrapper();
/**
* Adds an object address to the referenced agent. The additions and removals
* are queued in memory so that several threads can share the same access.
* The execution order is guaranteed.
* @param objectAddress
* @param backend reference to the backend to use.
*/
void addToOwnership(const std::string &objectAddress, objectstore::Backend& backend) override;
/**
* Adds a list of object addresses to the referenced agent. The addition is immediate.
* @param objectAdresses
* @param backend reference to the backend to use.
*/
void addBatchToOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend) override;
/**
* Removes an object address from the referenced agent. The additions and removals
* are queued in memory so that several threads can share the same access.
* The execution order is guaranteed.
* @param objectAddress
*/
void removeFromOwnership(const std::string &objectAddress, objectstore::Backend& backend) override;
/**
* Removes a list of object addresses to the referenced agent. The removal is immediate.
* @param objectAdresses
* @param backend reference to the backend to use.
*/
void removeBatchFromOwnership(const std::list<std::string> &objectAdresses, objectstore::Backend& backend) override;
std::string getAgentAddress() override;
private:
Agent& m_agent;
};
}}
#endif /* AGENTWRAPPER_HPP */
......@@ -59,6 +59,8 @@ add_library (ctaobjectstore SHARED
Agent.cpp
AgentHeartbeatThread.cpp
AgentReference.cpp
AgentReferenceInterface.cpp
AgentWrapper.cpp
AgentRegister.cpp
AgentWatchdog.cpp
ArchiveQueue.cpp
......
......@@ -27,16 +27,60 @@ Sorter::Sorter(AgentReference& agentReference, Backend& objectstore, catalogue::
Sorter::~Sorter() {
}
void Sorter::insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, ArchiveRequest::JobDump& jobToInsert, log::LogContext & lc){
std::shared_ptr<ArchiveJobQueueInfo> ajqi = std::make_shared<ArchiveJobQueueInfo>(ArchiveJobQueueInfo());
template <typename SpecificQueue>
void Sorter::executeArchiveAlgorithm(const std::string tapePool, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& jobs, log::LogContext& lc){
typedef ContainerAlgorithms<ArchiveQueue,SpecificQueue> Algo;
Algo algo(m_objectstore,m_agentReference);
typename Algo::InsertedElement::list jobsToAdd;
std::map<uint64_t,std::shared_ptr<ArchiveJobQueueInfo>> succeededJobs;
std::string previousOwner;
for(auto& jobToAdd: jobs){
Sorter::ArchiveJob job = std::get<0>(jobToAdd->jobToQueue);
succeededJobs[job.jobDump.copyNb] = jobToAdd;
//TODO, change the ownership by passing the previousOwner to this sorter
previousOwner = job.previousOwner->getAgentAddress();
jobsToAdd.push_back({ jobToAdd->archiveRequest.get(),job.jobDump.copyNb,job.archiveFile, job.mountPolicy,cta::nullopt });
}
try{
algo.referenceAndSwitchOwnershipIfNecessary(tapePool,previousOwner,queueAddress,jobsToAdd,lc);
} catch (typename Algo::OwnershipSwitchFailure &failure){
for(auto &failedAR: failure.failedElements){
try{
std::rethrow_exception(failedAR.failure);
} catch(cta::exception::Exception &e){
uint16_t copyNb = failedAR.element->copyNb;
std::get<1>(succeededJobs[copyNb]->jobToQueue).set_exception(std::current_exception());
succeededJobs.erase(copyNb);
}
}
}
for(auto& succeededJob: succeededJobs){
std::get<1>(succeededJob.second->jobToQueue).set_value();
}
}
void Sorter::dispatchArchiveAlgorithm(const std::string tapePool, const JobQueueType jobQueueType, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& jobs, log::LogContext &lc){
switch(jobQueueType){
case JobQueueType::JobsToReportToUser:
executeArchiveAlgorithm<ArchiveQueueToReport>(tapePool,queueAddress,jobs,lc);
break;
default:
executeArchiveAlgorithm<ArchiveQueueToTransfer>(tapePool,queueAddress,jobs,lc);
break;
}
}
void Sorter::insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface &previousOwner, ArchiveRequest::JobDump& jobToInsert, log::LogContext & lc){
auto ajqi = std::make_shared<ArchiveJobQueueInfo>();
ajqi->archiveRequest = archiveRequest;
Sorter::ArchiveJob jobToAdd;
jobToAdd.archiveRequest = archiveRequest;
jobToAdd.archiveFile = archiveRequest->getArchiveFile();
jobToAdd.archiveFileId = archiveRequest->getArchiveFile().archiveFileID;
jobToAdd.jobDump.copyNb = jobToInsert.copyNb;
jobToAdd.fileSize = archiveRequest->getArchiveFile().fileSize;
jobToAdd.mountPolicy = archiveRequest->getMountPolicy();
jobToAdd.jobDump.owner = jobToInsert.owner;//TODO : Maybe should be passed in parameter of this method
jobToAdd.previousOwner = &previousOwner;
jobToAdd.startTime = archiveRequest->getEntryLog().time;
jobToAdd.jobDump.tapePool = jobToInsert.tapePool;
ajqi->jobToQueue = std::make_tuple(jobToAdd,std::promise<void>());
......@@ -48,7 +92,7 @@ void Sorter::insertRetrieveRequest(std::shared_ptr<RetrieveRequest> retrieveRequ
std::set<std::string> candidateVids = getCandidateVids(*retrieveRequest);
//We need to select the best VID to queue the RetrieveJob in the best queue
if(candidateVids.empty()){
std::shared_ptr<RetrieveJobQueueInfo> rjqi = std::make_shared<RetrieveJobQueueInfo>(RetrieveJobQueueInfo());
auto rjqi = std::make_shared<RetrieveJobQueueInfo>();
rjqi->retrieveRequest = retrieveRequest;
//The first copy of the ArchiveFile will be queued
cta::common::dataStructures::TapeFile jobTapeFile = retrieveRequest->getArchiveFile().tapeFiles.begin()->second;
......@@ -158,16 +202,16 @@ Sorter::MapRetrieve Sorter::getAllRetrieve(){
}
void Sorter::queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc){
for(auto& archiveJobInfo: archiveJobInfos){
double queueLockFetchTime=0;
/* double queueLockFetchTime=0;
double queueProcessAndCommitTime=0;
double requestsUpdatePreparationTime=0;
double requestsUpdatingTime = 0;
utils::Timer t;
uint64_t filesBefore=0;
uint64_t bytesBefore=0;
utils::Timer t;*/
/*uint64_t filesBefore=0;
uint64_t bytesBefore=0;*/
ArchiveQueue aq(m_objectstore);
/*ArchiveQueue aq(m_objectstore);
ScopedExclusiveLock rql;
Helpers::getLockedAndFetchedJobQueue<ArchiveQueue>(aq,rql, m_agentReference, tapePool, jobQueueType, lc);
queueLockFetchTime = t.secs(utils::Timer::resetCounter);
......@@ -223,19 +267,25 @@ void Sorter::queueArchiveRequests(const std::string tapePool, const JobQueueType
jobPromise.set_exception(std::current_exception());
continue;
}
requestsUpdatingTime = t.secs(utils::Timer::resetCounter);
*/
std::string queueAddress;
this->dispatchArchiveAlgorithm(tapePool,jobQueueType,queueAddress,archiveJobInfos,lc);
archiveJobInfos.clear();
/*requestsUpdatingTime = t.secs(utils::Timer::resetCounter);
{
log::ScopedParamContainer params(lc);
ArchiveQueue aq(queueAddress,m_objectstore);
ScopedExclusiveLock aql(aq);
aq.fetch();
auto jobsSummary = aq.getJobsSummary();
params.add("tapePool", tapePool)
.add("archiveQueueObject", aq.getAddressIfSet())
/*.add("filesAdded", filesQueued)
.add("filesAdded", filesQueued)
.add("bytesAdded", bytesQueued)
.add("filesAddedInitially", filesQueued)
.add("bytesAddedInitially", bytesQueued)*/
/*.add("filesDequeuedAfterErrors", filesDequeued)
.add("bytesDequeuedAfterErrors", bytesDequeued)*/
.add("bytesAddedInitially", bytesQueued)
.add("filesDequeuedAfterErrors", filesDequeued)
.add("bytesDequeuedAfterErrors", bytesDequeued)
.add("filesBefore", filesBefore)
.add("bytesBefore", bytesBefore)
.add("filesAfter", jobsSummary.jobs)
......@@ -247,9 +297,7 @@ void Sorter::queueArchiveRequests(const std::string tapePool, const JobQueueType
//.add("queueRecommitTime", queueRecommitTime);
lc.log(log::INFO, "In Sorter::queueArchiveRequests(): "
"Queued an archiveRequest");
}
}
archiveJobInfos.clear();
}*/
}
void Sorter::queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& retrieveJobsInfo, log::LogContext &lc){
......
......@@ -33,18 +33,20 @@
#include "common/dataStructures/ArchiveJob.hpp"
#include "RetrieveQueue.hpp"
#include "ArchiveQueue.hpp"
#include "Algorithms.hpp"
#include "ArchiveQueueAlgorithms.hpp"
namespace cta { namespace objectstore {
struct ArchiveJobQueueInfo;
struct RetrieveJobQueueInfo;
struct ArchiveJobQueueInfo;
struct RetrieveJobQueueInfo;
class Sorter {
public:
CTA_GENERATE_EXCEPTION_CLASS(RetrieveRequestHasNoCopies);
Sorter(AgentReference& agentReference,Backend &objectstore, catalogue::Catalogue& catalogue);
virtual ~Sorter();
void insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, ArchiveRequest::JobDump& jobToInsert,log::LogContext & lc);
void insertArchiveJob(std::shared_ptr<ArchiveRequest> archiveRequest, AgentReferenceInterface &previousOwner, ArchiveRequest::JobDump& jobToInsert,log::LogContext & lc);
/**
*
* @param retrieveRequest
......@@ -62,6 +64,8 @@ public:
struct ArchiveJob{
std::shared_ptr<ArchiveRequest> archiveRequest;
ArchiveRequest::JobDump jobDump;
common::dataStructures::ArchiveFile archiveFile;
AgentReferenceInterface * previousOwner;
uint64_t archiveFileId;
time_t startTime;
uint64_t fileSize;
......@@ -90,6 +94,9 @@ private:
std::string getBestVidForQueueingRetrieveRequest(RetrieveRequest& retrieveRequest, std::set<std::string>& candidateVids, log::LogContext &lc);
void queueArchiveRequests(const std::string tapePool, const JobQueueType jobQueueType, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& requests, log::LogContext &lc);
void queueRetrieveRequests(const std::string vid, const JobQueueType jobQueueType, std::list<std::shared_ptr<RetrieveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc);
void dispatchArchiveAlgorithm(const std::string tapePool, const JobQueueType jobQueueType, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& archiveJobInfos, log::LogContext &lc);
template<typename SpecificQueue>
void executeArchiveAlgorithm(const std::string tapePool, std::string& queueAddress, std::list<std::shared_ptr<ArchiveJobQueueInfo>>& jobs, log::LogContext& lc);
};
struct ArchiveJobQueueInfo{
......@@ -106,4 +113,3 @@ struct RetrieveJobQueueInfo{
}}
#endif /* SORTER_HPP */
......@@ -92,7 +92,9 @@ namespace unitTests {
ar.setArchiveFile(aFile);
ar.addJob(1, "TapePool0", agentRef.getAgentAddress(), 1, 1, 1);
ar.addJob(2, "TapePool1", agentRef.getAgentAddress(), 1, 1, 1);
ar.addJob(3,"TapePool0",agentRef.getAgentAddress(),1,1,1);
ar.setJobStatus(1,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportForTransfer);
ar.setJobStatus(3,cta::objectstore::serializers::ArchiveJobStatus::AJS_ToReportForTransfer);
cta::common::dataStructures::MountPolicy mp;
ar.setMountPolicy(mp);
ar.setArchiveReportURL("");
......@@ -107,7 +109,7 @@ namespace unitTests {
cta::objectstore::Sorter sorter(agentRefSorter,be,catalogue);
std::shared_ptr<cta::objectstore::ArchiveRequest> arPtr = std::make_shared<cta::objectstore::ArchiveRequest>(ar);
for(auto& j: jobs){
sorter.insertArchiveJob(arPtr,j,lc);
sorter.insertArchiveJob(arPtr,agentRef,j,lc);
}
atfrl.release();
//Get the future
......@@ -135,10 +137,9 @@ namespace unitTests {
//Fetch the queue so that we can get the archiveRequests from it
cta::objectstore::ScopedExclusiveLock aql(aq);
aq.fetch();
ASSERT_EQ(aq.dumpJobs().size(),1);
ASSERT_EQ(aq.getTapePool(),"TapePool0");
ASSERT_EQ(aq.dumpJobs().size(),2);
for(auto &job: aq.dumpJobs()){
ASSERT_EQ(job.copyNb,1);
ASSERT_TRUE(job.copyNb == 1 || job.copyNb == 3);
ASSERT_EQ(job.size,667);
cta::objectstore::ArchiveRequest archiveRequest(job.address,be);
archiveRequest.fetchNoLock();
......
Subproject commit 7d3e73e9299652bc2fc74275faf2ca5ca0e0d08d
Subproject commit 5b38221715a8b601f493e958b340fd1ba54d000a
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment